Răsfoiți Sursa

Fixed #TASK_GK-2944 trace redis

Carl 2 ani în urmă
părinte
comite
7e008e0f2d

+ 13 - 3
containers/container_apm.go

@@ -111,21 +111,31 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 		query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
 		query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
 		//trace.MysqlQuery(query, r.Status.Error(), r.Duration)
 		//trace.MysqlQuery(query, r.Status.Error(), r.Duration)
 
 
-		trace2, ok := c.getTrace(r.TraceId)
+		apmTrace, ok := c.getTrace(r.TraceId)
 		fmt.Println("mysql r.TraceId:", r.TraceId)
 		fmt.Println("mysql r.TraceId:", r.TraceId)
 		fmt.Println("ok:", ok)
 		fmt.Println("ok:", ok)
 		fmt.Println("traceMap:", len(c.traceMap))
 		fmt.Println("traceMap:", len(c.traceMap))
 		if ok {
 		if ok {
-			trace2.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
+			apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
 		}
 		}
 	case l7.ProtocolMemcached:
 	case l7.ProtocolMemcached:
 		stats.observe(r.Status.String(), "", r.Duration)
 		stats.observe(r.Status.String(), "", r.Duration)
 		cmd, items := l7.ParseMemcached(r.Payload)
 		cmd, items := l7.ParseMemcached(r.Payload)
 		trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
 		trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
 	case l7.ProtocolRedis:
 	case l7.ProtocolRedis:
+		fmt.Println("redis redis")
 		stats.observe(r.Status.String(), "", r.Duration)
 		stats.observe(r.Status.String(), "", r.Duration)
 		cmd, args := l7.ParseRedis(r.Payload)
 		cmd, args := l7.ParseRedis(r.Payload)
-		trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
+		fmt.Println("cmd", cmd)
+		fmt.Println("args", args)
+		apmTrace, ok := c.getTrace(r.TraceId)
+		fmt.Println("redis r.TraceId:", r.TraceId)
+		fmt.Println("ok:", ok)
+		fmt.Println("traceMap:", len(c.traceMap))
+		if ok {
+			apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
+		}
+		//trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
 	case l7.ProtocolMongo:
 	case l7.ProtocolMongo:
 		stats.observe(r.Status.String(), "", r.Duration)
 		stats.observe(r.Status.String(), "", r.Duration)
 		query := l7.ParseMongo(r.Payload)
 		query := l7.ParseMongo(r.Payload)

+ 0 - 1
ebpftracer/ebpf/ebpf.c

@@ -4,7 +4,6 @@
 //#include <net/inet_sock.h>
 //#include <net/inet_sock.h>
 //#include <net/sock.h>
 //#include <net/sock.h>
 //#include <net/net_namespace.h>
 //#include <net/net_namespace.h>
-#include <stdbool.h>
 #include <uapi/linux/bpf.h>
 #include <uapi/linux/bpf.h>
 #include "vmlinux.h"
 #include "vmlinux.h"
 #include <bpf/bpf_helpers.h>
 #include <bpf/bpf_helpers.h>

+ 4 - 1
ebpftracer/ebpf/l7/l7.c

@@ -55,7 +55,7 @@
 #include "dubbo2.c"
 #include "dubbo2.c"
 #include "apm_trace.c"
 #include "apm_trace.c"
 
 
-__u32 filterPid = 69161;
+__u32 filterPid = 88028;
 
 
 struct l7_event {
 struct l7_event {
     __u64 fd;
     __u64 fd;
@@ -553,6 +553,9 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
         }
         }
     } else if (e->protocol == PROTOCOL_REDIS) {
     } else if (e->protocol == PROTOCOL_REDIS) {
         bpf_printk("[Response][Redis]:TGID:%d|type:%s|FD:%d\n",k.pid,"",k.fd);
         bpf_printk("[Response][Redis]:TGID:%d|type:%s|FD:%d\n",k.pid,"",k.fd);
+        __u64 trace_id = get_trace_id(pid, tid);
+        bpf_printk("[Redis] trace_id:%llu", trace_id);
+        e->trace_id = trace_id;
         response = is_redis_response(payload, ret, &e->status);
         response = is_redis_response(payload, ret, &e->status);
     } else if (e->protocol == PROTOCOL_MEMCACHED) {
     } else if (e->protocol == PROTOCOL_MEMCACHED) {
         response = is_memcached_response(payload, ret, &e->status);
         response = is_memcached_response(payload, ret, &e->status);

+ 56 - 19
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -7,6 +7,7 @@ import (
 	tracesdk "go.opentelemetry.io/otel/sdk/trace"
 	tracesdk "go.opentelemetry.io/otel/sdk/trace"
 	tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
 	tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
 	"strings"
 	"strings"
+	"sync"
 )
 )
 
 
 type RootData struct {
 type RootData struct {
@@ -75,8 +76,16 @@ type MapInfo struct {
 }
 }
 
 
 type TraceMapT struct {
 type TraceMapT struct {
-	RootData RootData `json:"root_data"`
-	Index    int      `json:"index"`
+	RootData RootData
+	Index    int
+	lock     *sync.RWMutex
+	TheEnd   bool
+}
+
+var TraceRootMap map[string]*TraceMapT
+
+func init() {
+	TraceRootMap = make(map[string]*TraceMapT)
 }
 }
 
 
 func tracetransformData(sdl []tracesdk.ReadOnlySpan) []RootData {
 func tracetransformData(sdl []tracesdk.ReadOnlySpan) []RootData {
@@ -87,28 +96,35 @@ func tracetransformData(sdl []tracesdk.ReadOnlySpan) []RootData {
 	//traceMap := make(map[string][]MapInfo)
 	//traceMap := make(map[string][]MapInfo)
 	//traceIndexMap := make(map[string]int)
 	//traceIndexMap := make(map[string]int)
 
 
-	traceRoot := make(map[string]*TraceMapT)
-	sendData := []RootData{}
+	//traceRoot := make(map[string]*TraceMapT)
 	for _, sd := range sdl {
 	for _, sd := range sdl {
 		if sd == nil {
 		if sd == nil {
 			continue
 			continue
 		}
 		}
 		traceId := sd.SpanContext().TraceID().String()
 		traceId := sd.SpanContext().TraceID().String()
-		if _, ok := traceRoot[traceId]; !ok {
-			traceRoot[traceId] = &TraceMapT{RootData: initRootData(traceId), Index: 1}
+		if _, ok := TraceRootMap[traceId]; !ok {
+			TraceRootMap[traceId] = &TraceMapT{RootData: initRootData(traceId), Index: 1}
 		}
 		}
-
-		traceRoot[traceId].Index++
-		initMapInfo(sd, traceRoot[traceId])
+		TraceRootMap[traceId].Index++
+		initMapInfo(sd, TraceRootMap[traceId])
 	}
 	}
-	for _, v := range traceRoot {
-		sendData = append(sendData, v.RootData)
+	// 发送完整数据 | 大量长耗时请求会增加内存占用
+	sendData := []RootData{}
+	for traceId, v := range TraceRootMap {
+		if v.TheEnd {
+			sendData = append(sendData, v.RootData)
+			delete(TraceRootMap, traceId)
+			fmt.Println("the end!")
+		} else {
+			fmt.Println("not end!")
+		}
 	}
 	}
 
 
 	// Transform the categorized map into a slice
 	// Transform the categorized map into a slice
 	aa, _ := json.Marshal(sendData)
 	aa, _ := json.Marshal(sendData)
 	fmt.Println(string(aa))
 	fmt.Println(string(aa))
 	fmt.Println(len(sendData))
 	fmt.Println(len(sendData))
+	fmt.Println(len(TraceRootMap))
 	return sendData
 	return sendData
 }
 }
 
 
@@ -197,12 +213,32 @@ func initMapInfo(sd tracesdk.ReadOnlySpan, traceRoot *TraceMapT) MapInfo {
 			case "db.statement":
 			case "db.statement":
 				query := attr.Value.AsString()
 				query := attr.Value.AsString()
 				mNode.Ps = []string{query}
 				mNode.Ps = []string{query}
-				upperOperType := strings.ToUpper(query[:6])
-				if upperOperType == "SELECT" ||
-					upperOperType == "UPDATE" ||
-					upperOperType == "INSERT" ||
-					upperOperType == "DELETE" {
-					mNode.OperType = upperOperType
+				words := strings.Fields(query)
+				if len(words) > 0 {
+					mNode.OperType = strings.ToUpper(words[0])
+				}
+			}
+		}
+		traceRoot.RootData.Maps = append(traceRoot.RootData.Maps, mNode)
+	case "REDIS":
+		fmt.Println("REDIS")
+		mNode.ServiceName = mapType
+		mNode.ServiceType = "NOSQL"
+		mNode.MethodName = "(redis)"
+		mNode.MethodDesc = "(redis)"
+		for _, attr := range sd.Attributes() {
+			fmt.Println(attr.Key, ":", attr.Value.AsInterface())
+			switch attr.Key {
+			case "net.peer.name":
+				mNode.Ip = attr.Value.AsString()
+			case "net.peer.port":
+				mNode.Port = attr.Value.AsInt64()
+			case "db.statement":
+				query := attr.Value.AsString()
+				mNode.Ps = []string{query}
+				words := strings.Fields(query)
+				if len(words) > 0 {
+					mNode.OperType = strings.ToUpper(words[0])
 				}
 				}
 			}
 			}
 		}
 		}
@@ -210,8 +246,8 @@ func initMapInfo(sd tracesdk.ReadOnlySpan, traceRoot *TraceMapT) MapInfo {
 	case "APPLICATION":
 	case "APPLICATION":
 		mNode.ServiceName = "GO"
 		mNode.ServiceName = "GO"
 		mNode.ServiceType = "APPLICATION"
 		mNode.ServiceType = "APPLICATION"
-		mNode.MethodName = "main"
-		mNode.MethodDesc = "main"
+		mNode.MethodName = "endpoint"
+		mNode.MethodDesc = "endpoint"
 		mNode.Level = 1
 		mNode.Level = 1
 		mNode.Nid = 1
 		mNode.Nid = 1
 		mNode.Pid = 0
 		mNode.Pid = 0
@@ -235,6 +271,7 @@ func initMapInfo(sd tracesdk.ReadOnlySpan, traceRoot *TraceMapT) MapInfo {
 				traceRoot.RootData.LocalPort = attr.Value.AsInt64()
 				traceRoot.RootData.LocalPort = attr.Value.AsInt64()
 			}
 			}
 		}
 		}
+		traceRoot.TheEnd = true
 		traceRoot.RootData.Maps = append([]MapInfo{mNode}, traceRoot.RootData.Maps...)
 		traceRoot.RootData.Maps = append([]MapInfo{mNode}, traceRoot.RootData.Maps...)
 	}
 	}
 	return mNode
 	return mNode

+ 0 - 5
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/exporter.go

@@ -45,11 +45,6 @@ func (e *Exporter) ExportSpans(ctx context.Context, ss []tracesdk.ReadOnlySpan)
 	if len(protoSpans) == 0 {
 	if len(protoSpans) == 0 {
 		return nil
 		return nil
 	}
 	}
-	//for _, s := range ss {
-	//	fmt.Println(s.ChildSpanCount())
-	//}
-	//a, _ := json.Marshal(protoSpans)
-	//fmt.Println(string(a))
 	sendData := tracetransformData(ss)
 	sendData := tracetransformData(ss)
 	//tracetransformData(ss)
 	//tracetransformData(ss)
 	err := e.client.UploadApmTraces(ctx, sendData)
 	err := e.client.UploadApmTraces(ctx, sendData)

+ 102 - 0
tracing/apm_tracing.go

@@ -0,0 +1,102 @@
+package tracing
+
+import (
+	"context"
+	"fmt"
+	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/codes"
+	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
+	"go.opentelemetry.io/otel/trace"
+	"inet.af/netaddr"
+	"time"
+)
+
+/**
+ * Trace
+ */
+
+func (t *Trace) setContext(ctx context.Context) {
+	t.lock.Lock()
+	defer t.lock.Unlock()
+	t.ctx = ctx
+}
+
+func (t *Trace) setSpan(span trace.Span) {
+	t.lock.Lock()
+	defer t.lock.Unlock()
+	t.span = span
+}
+
+func (t *Trace) TraceStart(method, path string, status l7.Status, duration time.Duration) {
+	if t == nil || method == "" {
+		return
+	}
+	t.createParentSpan("APPLICATION", duration, status >= 400,
+		semconv.HTTPURL(fmt.Sprintf("http://%s%s", t.destination.String(), path)),
+		semconv.HTTPMethod(method),
+		//semconv.HTTPStatusCode(int(status)),
+		attribute.String("http.uri", path),
+	)
+}
+
+func (t *Trace) TraceEnd(r *l7.RequestData) {
+	if t == nil {
+		return
+	}
+	t.span.SetAttributes(semconv.HTTPStatusCode(int(r.Status)))
+	t.span.End(trace.WithTimestamp(time.Now()))
+}
+
+func (t *Trace) createParentSpan(name string, duration time.Duration, error bool, attrs ...attribute.KeyValue) {
+	end := time.Now()
+	start := end.Add(-duration)
+	ctx, span := tracer(t.containerId).Start(context.Background(), name, trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
+	span.SetAttributes(attrs...)
+	span.SetAttributes(t.commonAttrs...)
+	if error {
+		span.SetStatus(codes.Error, "")
+	}
+	t.setContext(ctx)
+	t.setSpan(span)
+}
+
+func (t *Trace) createTraceSpan(name string, duration time.Duration, error bool, attrs ...attribute.KeyValue) {
+	end := time.Now()
+	start := end.Add(-duration)
+	fmt.Println("createTraceSpan:", t.ctx)
+	_, span := tracer(t.containerId).Start(t.ctx, name, trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
+	span.SetAttributes(t.commonAttrs...)
+	span.SetAttributes(attrs...)
+	if error {
+		span.SetStatus(codes.Error, "")
+	}
+	span.End(trace.WithTimestamp(end))
+}
+
+func (t *Trace) MysqlTraceQuery(query string, error bool, duration time.Duration, destination netaddr.IPPort) {
+	if t == nil || query == "" {
+		return
+	}
+	t.createTraceSpan("MYSQL", duration, error,
+		semconv.DBSystemMySQL,
+		semconv.DBStatement(query),
+		semconv.NetPeerName(destination.IP().String()),
+		semconv.NetPeerPort(int(destination.Port())),
+	)
+}
+
+func (t *Trace) RedisTraceQuery(cmd, args string, error bool, duration time.Duration) {
+	if t == nil || cmd == "" {
+		return
+	}
+	statement := cmd
+	if args != "" {
+		statement += " " + args
+	}
+	t.createTraceSpan("REDIS", duration, error,
+		semconv.DBSystemRedis,
+		semconv.DBOperation(cmd),
+		semconv.DBStatement(statement),
+	)
+}

+ 0 - 73
tracing/tracing.go

@@ -193,76 +193,3 @@ func (t *Trace) RedisQuery(cmd, args string, error bool, duration time.Duration)
 		semconv.DBStatement(statement),
 		semconv.DBStatement(statement),
 	)
 	)
 }
 }
-
-/**
- * Trace
- */
-
-func (t *Trace) setContext(ctx context.Context) {
-	t.lock.Lock()
-	defer t.lock.Unlock()
-	t.ctx = ctx
-}
-
-func (t *Trace) setSpan(span trace.Span) {
-	t.lock.Lock()
-	defer t.lock.Unlock()
-	t.span = span
-}
-
-func (t *Trace) TraceStart(method, path string, status l7.Status, duration time.Duration) {
-	if t == nil || method == "" {
-		return
-	}
-	t.createParentSpan("APPLICATION", duration, status >= 400,
-		semconv.HTTPURL(fmt.Sprintf("http://%s%s", t.destination.String(), path)),
-		semconv.HTTPMethod(method),
-		//semconv.HTTPStatusCode(int(status)),
-		attribute.String("http.uri", path),
-	)
-}
-
-func (t *Trace) TraceEnd(r *l7.RequestData) {
-	if t == nil {
-		return
-	}
-	t.span.SetAttributes(semconv.HTTPStatusCode(int(r.Status)))
-	t.span.End(trace.WithTimestamp(time.Now()))
-}
-
-func (t *Trace) createParentSpan(name string, duration time.Duration, error bool, attrs ...attribute.KeyValue) {
-	end := time.Now()
-	start := end.Add(-duration)
-	ctx, span := tracer(t.containerId).Start(context.Background(), name, trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
-	span.SetAttributes(attrs...)
-	span.SetAttributes(t.commonAttrs...)
-	if error {
-		span.SetStatus(codes.Error, "")
-	}
-	t.setContext(ctx)
-	t.setSpan(span)
-}
-
-func (t *Trace) MysqlTraceQuery(query string, error bool, duration time.Duration, destination netaddr.IPPort) {
-	if t == nil || query == "" {
-		return
-	}
-	t.createTraceSpan("MYSQL", duration, error,
-		semconv.DBSystemMySQL,
-		semconv.DBStatement(query),
-		semconv.NetPeerName(destination.IP().String()),
-		semconv.NetPeerPort(int(destination.Port())),
-	)
-}
-func (t *Trace) createTraceSpan(name string, duration time.Duration, error bool, attrs ...attribute.KeyValue) {
-	end := time.Now()
-	start := end.Add(-duration)
-	fmt.Println("createTraceSpan:", t.ctx)
-	_, span := tracer(t.containerId).Start(t.ctx, name, trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
-	span.SetAttributes(t.commonAttrs...)
-	span.SetAttributes(attrs...)
-	if error {
-		span.SetStatus(codes.Error, "")
-	}
-	span.End(trace.WithTimestamp(end))
-}