package tracing import ( "context" "fmt" "sync" "time" "github.com/coroot/coroot-node-agent/common" "github.com/coroot/coroot-node-agent/ebpftracer" "github.com/coroot/coroot-node-agent/ebpftracer/l7" "github.com/coroot/coroot-node-agent/flags" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.18.0" "go.opentelemetry.io/otel/trace" "inet.af/netaddr" "k8s.io/klog/v2" ) const ( MemcacheDBItemKeyName attribute.Key = "db.memcached.item" ) var ( tracer func(containerId string) trace.Tracer ) func Init(machineId, hostname, version string) { endpointUrl := *flags.TracesEndpoint if endpointUrl == nil { klog.Infoln("no OpenTelemetry traces collector endpoint configured") return } klog.Infoln("OpenTelemetry traces collector endpoint:", endpointUrl.String()) path := endpointUrl.Path if path == "" { path = "/" } opts := []otlptracehttp.Option{ otlptracehttp.WithEndpoint(endpointUrl.Host), otlptracehttp.WithURLPath(path), otlptracehttp.WithHeaders(common.AuthHeaders()), } if endpointUrl.Scheme != "https" { opts = append(opts, otlptracehttp.WithInsecure()) } client := otlptracehttp.NewClient(opts...) exporter, err := otlptrace.New(context.Background(), client) if err != nil { klog.Exitln(err) } batcher := sdktrace.WithBatcher(exporter) tracer = func(containerId string) trace.Tracer { provider := sdktrace.NewTracerProvider( batcher, sdktrace.WithResource(resource.NewWithAttributes( semconv.SchemaURL, semconv.HostName(hostname), semconv.HostID(machineId), semconv.ServiceName(common.ContainerIdToOtelServiceName(containerId)), semconv.ContainerID(containerId), )), ) return provider.Tracer("coroot-node-agent", trace.WithInstrumentationVersion(version)) } } type Trace struct { containerId string destination netaddr.IPPort commonAttrs []attribute.KeyValue ctx context.Context span trace.Span lock sync.RWMutex stack []ebpftracer.StackFunEvent currenEventCount *uint32 needEventCount uint32 startEventReady bool endEventReady bool createAt time.Time } func NewTraceFromEvent(containerId string) *Trace { if tracer == nil { return nil } var currenEventCount uint32 return &Trace{containerId: containerId, currenEventCount: ¤EventCount} } func NewTrace(containerId string, destination netaddr.IPPort) *Trace { if tracer == nil { return nil } return &Trace{containerId: containerId, destination: destination, commonAttrs: []attribute.KeyValue{ semconv.NetPeerName(destination.IP().String()), semconv.NetPeerPort(int(destination.Port())), }} } func (t *Trace) createSpan(name string, duration time.Duration, error bool, attrs ...attribute.KeyValue) { end := time.Now() start := end.Add(-duration) _, span := tracer(t.containerId).Start(nil, name, trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient)) span.SetAttributes(attrs...) span.SetAttributes(t.commonAttrs...) if error { span.SetStatus(codes.Error, "") } span.End(trace.WithTimestamp(end)) } func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time.Duration) { if t == nil || method == "" { return } t.createSpan(method, duration, status >= 400, semconv.HTTPURL(fmt.Sprintf("http://%s%s", t.destination.String(), path)), semconv.HTTPMethod(method), semconv.HTTPStatusCode(int(status)), ) } func (t *Trace) Http2Request(method, path, scheme string, status l7.Status, duration time.Duration) { if t == nil { return } if method == "" { method = "unknown" } if path == "" { path = "/unknown" } if scheme == "" { scheme = "unknown" } t.createSpan(method, duration, status > 400, semconv.HTTPURL(fmt.Sprintf("%s://%s%s", scheme, t.destination.String(), path)), semconv.HTTPMethod(method), semconv.HTTPStatusCode(int(status)), ) } func (t *Trace) PostgresQuery(query string, error bool, duration time.Duration) { if t == nil || query == "" { return } t.createSpan("query", duration, error, semconv.DBSystemPostgreSQL, semconv.DBStatement(query), ) } func (t *Trace) MysqlQuery(query string, error bool, duration time.Duration) { if t == nil || query == "" { return } t.createSpan("query", duration, error, semconv.DBSystemMySQL, semconv.DBStatement(query), ) } func (t *Trace) MongoQuery(query string, error bool, duration time.Duration) { if t == nil || query == "" { return } t.createSpan("query", duration, error, semconv.DBSystemMongoDB, semconv.DBStatement(query), ) } func (t *Trace) MemcachedQuery(cmd string, items []string, error bool, duration time.Duration) { if t == nil || cmd == "" { return } attrs := []attribute.KeyValue{ semconv.DBSystemMemcached, semconv.DBOperation(cmd), } if len(items) == 1 { attrs = append(attrs, MemcacheDBItemKeyName.String(items[0])) } else if len(items) > 1 { attrs = append(attrs, MemcacheDBItemKeyName.StringSlice(items)) } t.createSpan(cmd, duration, error, attrs...) } func (t *Trace) RedisQuery(cmd, args string, error bool, duration time.Duration) { if t == nil || cmd == "" { return } statement := cmd if args != "" { statement += " " + args } t.createSpan(cmd, duration, error, semconv.DBSystemRedis, semconv.DBOperation(cmd), semconv.DBStatement(statement), ) } func (t *Trace) FunAdd(stackFun ebpftracer.StackFunEvent) { t.stack = append(t.stack, stackFun) }