Преглед на файлове

Fixed #TASK_QT-9810 适配PG

rock преди 1 година
родител
ревизия
88b8624fb8
променени са 5 файла, в които са добавени 91 реда и са изтрити 2 реда
  1. 2 2
      Makefile2
  2. 19 0
      containers/container_apm.go
  3. 9 0
      ebpftracer/ebpf/l7/l7.c
  4. 30 0
      pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go
  5. 31 0
      tracing/apm_tracing.go

+ 2 - 2
Makefile2

@@ -17,12 +17,12 @@ build:
 	CGO_ENABLED=1 go build -gcflags="all=-N -l" -buildvcs=false -o dist/package_dir/bin/euspace
 c:
 	#docker exec -it 9d928d96d4d0 sh -c 'cd /opt/github/euspace/ebpftracer && sh build.sh${PARAMS}'
-	docker exec -it 0fec3217d6da sh -c 'cd /opt/github/euspace/ebpftracer && make all ${PARAMS}'
+	docker exec -it 432002584cbf sh -c 'cd /opt/github/euspace/ebpftracer && make all ${PARAMS}'
 c-build: c
 
 go-build:
 	#ssh [email protected] 'export https_proxy=http://10.0.22.50:4780 && source ~/.g/env && cd /opt/github/euspace && make -f Makefile2 build'
-	docker exec -it 0fec3217d6da bash -c 'cd /opt/github/euspace && source ~/.g/env && make -f Makefile2 build'
+	docker exec -it 432002584cbf bash -c 'cd /opt/github/euspace && source ~/.g/env && make -f Makefile2 build'
 go: go-build
 
 run:

+ 19 - 0
containers/container_apm.go

@@ -176,6 +176,25 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 		//}
 		//query := conn.postgresParser.Parse(r.Payload)
 		//trace.PostgresQuery(query, r.Status.Error(), r.Duration)
+		if c.l7Attach && c.valuableTrace(r.TraceId) {
+			if conn.postgresParser == nil {
+				conn.postgresParser = l7.NewPostgresParser()
+			}
+			query := conn.postgresParser.Parse(r.Payload)
+			//trace.MysqlQuery(query, r.Status.Error(), r.Duration)
+
+			//apmTrace, ok := c.getTrace(r.TraceId)
+			apmTrace, err := c.getOrInitTrace(r.TraceId)
+			fmt.Println(err)
+			//fmt.Println("mysql r.TraceId:", r.TraceId)
+			//fmt.Println("ok:", ok)
+			//fmt.Println("traceMap:", len(c.traceMap))
+			if err == nil {
+				//apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
+				apmTrace.PostGreSqlTraceQueryEvent(query, r, conn.ActualDest)
+				c.SendEvent(apmTrace, r.TraceId)
+			}
+		}
 	case l7.ProtocolMysql:
 		if r.Method != l7.MethodStatementClose {
 			stats.observe(r.Status.String(), "", r.Duration)

+ 9 - 0
ebpftracer/ebpf/l7/l7.c

@@ -541,6 +541,8 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
             if (!e) {
                 return 0;
             }
+            __u64 trace_id = get_apm_trace_id(pid, tid);
+	        e->trace_id = trace_id;
             e->protocol = PROTOCOL_POSTGRES;
             e->method = METHOD_STATEMENT_CLOSE;
             e->payload_size = size;
@@ -939,6 +941,13 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
 //	    cw_bpf_debug("[Kernel End][HTTP]:pid:[%d]|CURRENT-GOID:[%llu]|trace_id:[%llu]---------\n", tid, get_current_goroutine(),e->trace_id);
 
 	} else if (e->protocol == PROTOCOL_POSTGRES) {
+        __u64 trace_id = get_apm_trace_id(pid, tid);
+//        cw_bpf_debug("[postgres sql] trace_id:%llu", trace_id);
+		e->trace_id = trace_id;
+        e->component_sport = conn->sport;
+        e->component_dport = conn->dport;
+        __builtin_memcpy(&e->component_saddr, &conn->saddr, sizeof(e->component_saddr));
+        __builtin_memcpy(&e->component_daddr, &conn->daddr, sizeof(e->component_daddr));
 		response = is_postgres_response(payload, ret, &e->status);
 		if (req->request_type == POSTGRES_FRAME_PARSE) {
 			e->method = METHOD_STATEMENT_PREPARE;

+ 30 - 0
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -32,6 +32,7 @@ const (
 	DM_SERVICE_NAME    = "DM"
 	REDIS_SERVICE_NAME = "REDIS"
 	HTTP_SERVICE_NAME  = "HTTPCLIENT"
+	POSTGRESQL_SERVICE_NAME = "POSTGRESQL"
 )
 
 type apmTraceSpan tracesdk.ReadOnlySpan
@@ -175,6 +176,8 @@ func tracetransformData(sdl []tracesdk.ReadOnlySpan) map[int][]RootDataT {
 				// dm
 				case l7.ProtocolDM:
 					buildDMMapEvent(&mNode, event)
+				case l7.ProtocolPostgres:
+					buildPostGreSqlMapEvent(&mNode, event)
 				}
 			}
 
@@ -824,6 +827,33 @@ func buildHttpMapFromEvent(mNode *MapInfoT, event tracesdk.Event) {
 //	}
 //}
 
+func buildPostGreSqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
+	mNode.Dbn = "unknown"
+	mNode.ServiceName = POSTGRESQL_SERVICE_NAME
+	mNode.ServiceType = SQL_SERVICE_TYPE
+	mNode.MethodName = "database/sql.Query()"
+	for _, attr := range event.Attributes {
+		//fmt.Println(attr.Key, ":", attr.Value.AsInterface())
+		switch attr.Key {
+		case "net.peer.name":
+			mNode.Ip = attr.Value.AsString()
+		case "net.peer.port":
+			mNode.Port = attr.Value.AsInt64()
+		case "db.statement":
+			query := attr.Value.AsString()
+			mNode.Ps = []string{query}
+			words := strings.Fields(query)
+			if len(words) > 0 {
+				mNode.OperType = strings.ToUpper(words[0])
+			}
+		case "sql.src_addr":
+			mNode.SrcAddr = attr.Value.AsString()
+		case "sql.destination_addr":
+			mNode.DestinationAddr = attr.Value.AsString()
+		}
+	}
+}
+
 func buildMysqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
 	mNode.Dbn = "unknown"
 	mNode.ServiceName = MYSQL_SERVICE_NAME

+ 31 - 0
tracing/apm_tracing.go

@@ -327,6 +327,37 @@ func (t *Trace) MysqlTraceQueryEvent(query string, r *l7.RequestData, destinatio
 	t.createTraceEvent(l7.ProtocolMysql.String(), int(ebpftracer.EventTypeL7Request), int(l7.ProtocolMysql), attr...)
 }
 
+func (t *Trace) PostGreSqlTraceQuery(query string, error bool, duration time.Duration, destination netaddr.IPPort) {
+	if t == nil || query == "" {
+		return
+	}
+	t.createTraceSpan(l7.ProtocolPostgres.String(), duration, error,
+		semconv.DBSystemPostgreSQL,
+		semconv.DBStatement(query),
+		semconv.NetPeerName(destination.IP().String()),
+		semconv.NetPeerPort(int(destination.Port())),
+	)
+}
+
+func (t *Trace) PostGreSqlTraceQueryEvent(query string, r *l7.RequestData, destination netaddr.IPPort) {
+	fmt.Println("query", query)
+	if t == nil {
+		return
+	}
+
+	var attr []attribute.KeyValue
+	attr = append(attr,
+		semconv.DBSystemPostgreSQL,
+		semconv.DBStatement(query),
+		semconv.NetPeerName(destination.IP().String()),
+		semconv.NetPeerPort(int(destination.Port())),
+		attribute.String("sql.src_addr", r.ComponentSAddr.String()),
+		attribute.String("sql.destination_addr", r.ComponentDAddr.String()),
+	)
+	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
+	t.createTraceEvent(l7.ProtocolPostgres.String(), int(ebpftracer.EventTypeL7Request), int(l7.ProtocolPostgres), attr...)
+}
+
 func (t *Trace) DmTraceQueryEvent(query string, r *l7.RequestData, destination netaddr.IPPort) {
 	if t == nil || query == "" {
 		return