tracing.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. package tracing
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "github.com/coroot/coroot-node-agent/config"
  7. "github.com/coroot/coroot-node-agent/utils"
  8. "github.com/coroot/coroot-node-agent/utils/try"
  9. "net/url"
  10. "sync"
  11. "time"
  12. "github.com/coroot/coroot-node-agent/common"
  13. "github.com/coroot/coroot-node-agent/ebpftracer"
  14. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  15. "github.com/coroot/coroot-node-agent/flags"
  16. klog "github.com/sirupsen/logrus"
  17. "go.opentelemetry.io/otel/attribute"
  18. "go.opentelemetry.io/otel/codes"
  19. "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
  20. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
  21. "go.opentelemetry.io/otel/sdk/resource"
  22. sdktrace "go.opentelemetry.io/otel/sdk/trace"
  23. semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
  24. "go.opentelemetry.io/otel/trace"
  25. "inet.af/netaddr"
  26. )
  27. const (
  28. MemcacheDBItemKeyName attribute.Key = "db.memcached.item"
  29. )
  30. var (
  31. tracer func(containerId string) trace.Tracer
  32. watchEndpointOnce sync.Once
  33. )
  34. func getTracesEndpoint() *url.URL {
  35. var u *url.URL
  36. if *flags.TracesEndpoint != nil {
  37. u = *flags.TracesEndpoint
  38. } else {
  39. dataServer, err := url.Parse(config.Cfg.GetString("dataServer"))
  40. if err == nil && dataServer != nil {
  41. u = dataServer.JoinPath(*flags.ServerPrefix + "/api/v2/data/receive")
  42. }
  43. }
  44. return u
  45. }
  46. func watchTracesEndpoint(machineId, hostname, version string) {
  47. for range config.Cfg.SubscribeConfigChange(config.ScbTypeTracesEndpointChg) {
  48. klog.Infof("OpenTelemetry endpoint changed (the configuration `dataServer`),new dataServer [%s]", config.Cfg.GetString("dataServer"))
  49. Init(machineId, hostname, version)
  50. klog.Infoln("OpenTelemetry endpoint change finished")
  51. }
  52. klog.Infof("SubscribeConfigChange ScbTypeTracesEndpointChg exit")
  53. }
  54. func Init(machineId, hostname, version string) {
  55. endpointUrl := getTracesEndpoint()
  56. if endpointUrl == nil {
  57. klog.Infoln("no OpenTelemetry traces collector endpoint configured")
  58. return
  59. }
  60. klog.Infoln("OpenTelemetry traces collector endpoint:", endpointUrl.String())
  61. path := endpointUrl.Path
  62. if path == "" {
  63. path = "/"
  64. }
  65. opts := []otlptracehttp.Option{
  66. otlptracehttp.WithEndpoint(endpointUrl.Host),
  67. otlptracehttp.WithURLPath(path),
  68. otlptracehttp.WithHeaders(common.AuthHeaders()),
  69. otlptracehttp.WithTLSClientConfig(&tls.Config{InsecureSkipVerify: *flags.InsecureSkipVerify}),
  70. }
  71. if endpointUrl.Scheme != "https" {
  72. opts = append(opts, otlptracehttp.WithInsecure())
  73. }
  74. client := otlptracehttp.NewClient(opts...)
  75. exporter, err := otlptrace.New(context.Background(), client)
  76. if err != nil {
  77. klog.Fatalln(err)
  78. }
  79. batcher := sdktrace.WithBatcher(exporter)
  80. tracer = func(containerId string) trace.Tracer {
  81. provider := sdktrace.NewTracerProvider(
  82. batcher,
  83. sdktrace.WithResource(resource.NewWithAttributes(
  84. semconv.SchemaURL,
  85. semconv.HostName(hostname),
  86. semconv.HostID(machineId),
  87. semconv.ServiceName(common.ContainerIdToOtelServiceName(containerId)),
  88. semconv.ContainerID(containerId),
  89. )),
  90. )
  91. return provider.Tracer("coroot-node-agent", trace.WithInstrumentationVersion(version))
  92. }
  93. watchEndpointOnce.Do(func() {
  94. if *flags.TracesEndpoint == nil {
  95. try.GoParams(watchTracesEndpoint, utils.CatchFn, machineId, hostname, version)
  96. }
  97. })
  98. }
  99. type Trace struct {
  100. containerId string
  101. destination netaddr.IPPort
  102. commonAttrs []attribute.KeyValue
  103. ctx context.Context
  104. span trace.Span
  105. lock sync.RWMutex
  106. stack []ebpftracer.StackFunEvent
  107. currenEventCount *uint32
  108. needEventCount uint32
  109. startEventReady bool
  110. endEventReady bool
  111. createAt time.Time
  112. }
  113. func NewTraceFromEvent(containerId string) *Trace {
  114. if tracer == nil {
  115. return nil
  116. }
  117. var currenEventCount uint32
  118. return &Trace{containerId: containerId, currenEventCount: &currenEventCount}
  119. }
  120. func NewTrace(containerId string, destination netaddr.IPPort) *Trace {
  121. if tracer == nil {
  122. return nil
  123. }
  124. return &Trace{containerId: containerId, destination: destination, commonAttrs: []attribute.KeyValue{
  125. semconv.NetPeerName(destination.IP().String()),
  126. semconv.NetPeerPort(int(destination.Port())),
  127. }}
  128. }
  129. func (t *Trace) createSpan(name string, duration time.Duration, error bool, attrs ...attribute.KeyValue) {
  130. end := time.Now()
  131. start := end.Add(-duration)
  132. _, span := tracer(t.containerId).Start(nil, name, trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
  133. span.SetAttributes(attrs...)
  134. span.SetAttributes(t.commonAttrs...)
  135. if error {
  136. span.SetStatus(codes.Error, "")
  137. }
  138. span.End(trace.WithTimestamp(end))
  139. }
  140. func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time.Duration) {
  141. if t == nil || method == "" {
  142. return
  143. }
  144. t.createSpan(method, duration, status >= 400,
  145. semconv.HTTPURL(fmt.Sprintf("http://%s%s", t.destination.String(), path)),
  146. semconv.HTTPMethod(method),
  147. semconv.HTTPStatusCode(int(status)),
  148. )
  149. }
  150. func (t *Trace) Http2Request(method, path, scheme string, status l7.Status, duration time.Duration) {
  151. if t == nil {
  152. return
  153. }
  154. if method == "" {
  155. method = "unknown"
  156. }
  157. if path == "" {
  158. path = "/unknown"
  159. }
  160. if scheme == "" {
  161. scheme = "unknown"
  162. }
  163. t.createSpan(method, duration, status > 400,
  164. semconv.HTTPURL(fmt.Sprintf("%s://%s%s", scheme, t.destination.String(), path)),
  165. semconv.HTTPMethod(method),
  166. semconv.HTTPStatusCode(int(status)),
  167. )
  168. }
  169. func (t *Trace) PostgresQuery(query string, error bool, duration time.Duration) {
  170. if t == nil || query == "" {
  171. return
  172. }
  173. t.createSpan("query", duration, error,
  174. semconv.DBSystemPostgreSQL,
  175. semconv.DBStatement(query),
  176. )
  177. }
  178. func (t *Trace) MysqlQuery(query string, error bool, duration time.Duration) {
  179. if t == nil || query == "" {
  180. return
  181. }
  182. t.createSpan("query", duration, error,
  183. semconv.DBSystemMySQL,
  184. semconv.DBStatement(query),
  185. )
  186. }
  187. func (t *Trace) MongoQuery(query string, error bool, duration time.Duration) {
  188. if t == nil || query == "" {
  189. return
  190. }
  191. t.createSpan("query", duration, error,
  192. semconv.DBSystemMongoDB,
  193. semconv.DBStatement(query),
  194. )
  195. }
  196. func (t *Trace) MemcachedQuery(cmd string, items []string, error bool, duration time.Duration) {
  197. if t == nil || cmd == "" {
  198. return
  199. }
  200. attrs := []attribute.KeyValue{
  201. semconv.DBSystemMemcached,
  202. semconv.DBOperation(cmd),
  203. }
  204. if len(items) == 1 {
  205. attrs = append(attrs, MemcacheDBItemKeyName.String(items[0]))
  206. } else if len(items) > 1 {
  207. attrs = append(attrs, MemcacheDBItemKeyName.StringSlice(items))
  208. }
  209. t.createSpan(cmd, duration, error, attrs...)
  210. }
  211. func (t *Trace) RedisQuery(cmd, args string, error bool, duration time.Duration) {
  212. if t == nil || cmd == "" {
  213. return
  214. }
  215. statement := cmd
  216. if args != "" {
  217. statement += " " + args
  218. }
  219. t.createSpan(cmd, duration, error,
  220. semconv.DBSystemRedis,
  221. semconv.DBOperation(cmd),
  222. semconv.DBStatement(statement),
  223. )
  224. }
  225. func (t *Trace) FunAdd(stackFun ebpftracer.StackFunEvent) {
  226. t.stack = append(t.stack, stackFun)
  227. }