Bladeren bron

Fixed #TASK_QT-9810 优化事件处理-数据结构调整(mysql/redis)

Carl 1 jaar geleden
bovenliggende
commit
cdc5940f4a

+ 37 - 35
containers/container_apm.go

@@ -145,15 +145,15 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 	trace := tracing.NewTrace(string(c.id), conn.ActualDest)
 	switch r.Protocol {
 	case l7.ProtocolHTTP:
-		fmt.Println("l7.ProtocolHTTP", r.TraceId)
-		//stats.observe(r.Status.Http(), "", r.Duration)
-		method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
-		//trace.HttpRequest(method, path, r.Status, r.Duration)
-
-		apmTrace, ok := c.getTrace(r.TraceId)
-		if ok {
-			apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
-		}
+		//fmt.Println("l7.ProtocolHTTP", r.TraceId)
+		////stats.observe(r.Status.Http(), "", r.Duration)
+		//method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
+		////trace.HttpRequest(method, path, r.Status, r.Duration)
+		//
+		//apmTrace, ok := c.getTrace(r.TraceId)
+		//if ok {
+		//	apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
+		//}
 
 	case l7.ProtocolHTTP2:
 		if conn.http2Parser == nil {
@@ -165,14 +165,14 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 			trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
 		}
 	case l7.ProtocolPostgres:
-		if r.Method != l7.MethodStatementClose {
-			stats.observe(r.Status.String(), "", r.Duration)
-		}
-		if conn.postgresParser == nil {
-			conn.postgresParser = l7.NewPostgresParser()
-		}
-		query := conn.postgresParser.Parse(r.Payload)
-		trace.PostgresQuery(query, r.Status.Error(), r.Duration)
+		//if r.Method != l7.MethodStatementClose {
+		//	stats.observe(r.Status.String(), "", r.Duration)
+		//}
+		//if conn.postgresParser == nil {
+		//	conn.postgresParser = l7.NewPostgresParser()
+		//}
+		//query := conn.postgresParser.Parse(r.Payload)
+		//trace.PostgresQuery(query, r.Status.Error(), r.Duration)
 	case l7.ProtocolMysql:
 		//fmt.Println("mysql mysql")
 		//fmt.Println(conn)
@@ -185,39 +185,41 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 		query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
 		//trace.MysqlQuery(query, r.Status.Error(), r.Duration)
 
-		apmTrace, ok := c.getTrace(r.TraceId)
+		//apmTrace, ok := c.getTrace(r.TraceId)
+		apmTrace, err := c.getOrInitTrace(r.TraceId)
 		//fmt.Println("mysql r.TraceId:", r.TraceId)
 		//fmt.Println("ok:", ok)
 		//fmt.Println("traceMap:", len(c.traceMap))
-		if ok {
-			apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
+		if err == nil {
+			//apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
+			apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
+			c.SendEvent(apmTrace, r.TraceId)
 		}
 	case l7.ProtocolMemcached:
-		stats.observe(r.Status.String(), "", r.Duration)
-		cmd, items := l7.ParseMemcached(r.Payload)
-		trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
+		//stats.observe(r.Status.String(), "", r.Duration)
+		//cmd, items := l7.ParseMemcached(r.Payload)
+		//trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
 	case l7.ProtocolRedis:
-		fmt.Println("redis redis")
 		stats.observe(r.Status.String(), "", r.Duration)
 		cmd, args := l7.ParseRedis(r.Payload)
 		fmt.Println("cmd", cmd)
 		fmt.Println("args", args)
-		apmTrace, ok := c.getTrace(r.TraceId)
-		fmt.Println("redis r.TraceId:", r.TraceId)
-		fmt.Println("ok:", ok)
-		fmt.Println("traceMap:", len(c.traceMap))
-		if ok {
-			apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
+		//apmTrace, ok := c.getTrace(r.TraceId)
+		apmTrace, err := c.getOrInitTrace(r.TraceId)
+		if err == nil {
+			//apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
+			apmTrace.RedisTraceQueryEvent(cmd, args, r)
+			c.SendEvent(apmTrace, r.TraceId)
 		}
 		//trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
 	case l7.ProtocolMongo:
-		stats.observe(r.Status.String(), "", r.Duration)
-		query := l7.ParseMongo(r.Payload)
-		trace.MongoQuery(query, r.Status.Error(), r.Duration)
+		//stats.observe(r.Status.String(), "", r.Duration)
+		//query := l7.ParseMongo(r.Payload)
+		//trace.MongoQuery(query, r.Status.Error(), r.Duration)
 	case l7.ProtocolKafka, l7.ProtocolCassandra:
-		stats.observe(r.Status.String(), "", r.Duration)
+		//stats.observe(r.Status.String(), "", r.Duration)
 	case l7.ProtocolRabbitmq, l7.ProtocolNats:
-		stats.observe(r.Status.String(), r.Method.String(), 0)
+		//stats.observe(r.Status.String(), r.Method.String(), 0)
 	}
 	return nil
 }

+ 1 - 1
ebpftracer/stack.go

@@ -35,7 +35,7 @@ func (t *Tracer) stack() error {
 	}
 
 	binType := "dotnet"
-	MatchString := ".*HandleFunc|.*main.*|testfun.*|.*serverHandler.*|.*ServeHTTP.*"
+	MatchString := ".*HandleFunc|.*main.*|testfun.*|.*serverHandler.*|.*ServeHTTP.*|.*database.*|.*redis.*"
 	dbgpath := ""
 
 	ENV_PID := os.Getenv("FILTER_PID")

+ 52 - 0
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -156,6 +156,12 @@ func tracetransformData(sdl []tracesdk.ReadOnlySpan) []RootDataT {
 				// http
 				case 1:
 					buildHttpMapFromEvent(&mNode, event)
+				// mysql
+				case 5:
+					buildMysqlMapEvent(&mNode, event)
+				// redis
+				case 3:
+					buildRedisMapEvent(&mNode, event)
 				}
 			}
 
@@ -726,6 +732,29 @@ func buildMysqlMap(mNode *MapInfoT, sd apmTraceSpan) {
 	}
 }
 
+func buildMysqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
+	mNode.Dbn = "unknown"
+	mNode.ServiceName = MYSQL_SERVICE_NAME
+	mNode.ServiceType = SQL_SERVICE_TYPE
+	mNode.MethodName = "database/sql.Query()"
+	for _, attr := range event.Attributes {
+		//fmt.Println(attr.Key, ":", attr.Value.AsInterface())
+		switch attr.Key {
+		case "net.peer.name":
+			mNode.Ip = attr.Value.AsString()
+		case "net.peer.port":
+			mNode.Port = attr.Value.AsInt64()
+		case "db.statement":
+			query := attr.Value.AsString()
+			mNode.Ps = []string{query}
+			words := strings.Fields(query)
+			if len(words) > 0 {
+				mNode.OperType = strings.ToUpper(words[0])
+			}
+		}
+	}
+}
+
 func buildRedisMap(mNode *MapInfoT, sd apmTraceSpan) {
 	mNode.ServiceName = REDIS_SERVICE_NAME
 	mNode.ServiceType = NOSQL_SERVICE_TYPE
@@ -749,6 +778,29 @@ func buildRedisMap(mNode *MapInfoT, sd apmTraceSpan) {
 	}
 }
 
+func buildRedisMapEvent(mNode *MapInfoT, event tracesdk.Event) {
+	mNode.ServiceName = REDIS_SERVICE_NAME
+	mNode.ServiceType = NOSQL_SERVICE_TYPE
+	//mNode.MethodName = span(sd).Name + " query"
+	mNode.MethodName = "redis.Do()"
+	for _, attr := range event.Attributes {
+		//fmt.Println(attr.Key, ":", attr.Value.AsInterface())
+		switch attr.Key {
+		case "net.peer.name":
+			mNode.Ip = attr.Value.AsString()
+		case "net.peer.port":
+			mNode.Port = attr.Value.AsInt64()
+		case "db.statement":
+			query := attr.Value.AsString()
+			mNode.Ps = []string{query}
+			words := strings.Fields(query)
+			if len(words) > 0 {
+				mNode.OperType = strings.ToUpper(words[0])
+			}
+		}
+	}
+}
+
 func isEnter(_type string) bool {
 	if _type == "APPLICATION" {
 		return true

+ 35 - 0
tracing/apm_tracing.go

@@ -219,6 +219,22 @@ func (t *Trace) MysqlTraceQuery(query string, error bool, duration time.Duration
 	)
 }
 
+func (t *Trace) MysqlTraceQueryEvent(query string, r *l7.RequestData, destination netaddr.IPPort) {
+	if t == nil || query == "" {
+		return
+	}
+
+	var attr []attribute.KeyValue
+	attr = append(attr,
+		semconv.DBSystemMySQL,
+		semconv.DBStatement(query),
+		semconv.NetPeerName(destination.IP().String()),
+		semconv.NetPeerPort(int(destination.Port())),
+	)
+	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
+	t.createTraceEvent(l7.ProtocolHTTP.String(), int(ebpftracer.EventTypeL7Request), int(l7.ProtocolMysql), attr...)
+}
+
 func (t *Trace) RedisTraceQuery(cmd, args string, error bool, duration time.Duration) {
 	if t == nil || cmd == "" {
 		return
@@ -234,6 +250,25 @@ func (t *Trace) RedisTraceQuery(cmd, args string, error bool, duration time.Dura
 	)
 }
 
+func (t *Trace) RedisTraceQueryEvent(cmd, args string, r *l7.RequestData) {
+	if t == nil || cmd == "" {
+		return
+	}
+	statement := cmd
+	if args != "" {
+		statement += " " + args
+	}
+
+	var attr []attribute.KeyValue
+	attr = append(attr,
+		semconv.DBSystemRedis,
+		semconv.DBOperation(cmd),
+		semconv.DBStatement(statement),
+	)
+	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
+	t.createTraceEvent(l7.ProtocolHTTP.String(), int(ebpftracer.EventTypeL7Request), int(l7.ProtocolRedis), attr...)
+}
+
 func (t *Trace) HttpTraceRequest(method, path, ip string, port uint16, r *l7.RequestData) {
 	if t == nil || method == "" {
 		return