Procházet zdrojové kódy

Fixed #TASK_QT-9810 DM适配

Carl před 1 rokem
rodič
revize
5d2c494f61

+ 3 - 0
containers/container.go

@@ -84,6 +84,7 @@ type ActiveConnection struct {
 	http2Parser    *l7.Http2Parser
 	postgresParser *l7.PostgresParser
 	mysqlParser    *l7.MysqlParser
+	dmParser       *l7.DmParser
 }
 
 type ListenDetails struct {
@@ -1153,6 +1154,8 @@ func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) {
 		if filterPid != int64(pid) {
 			return
 		}
+	} else {
+		return
 	}
 	p := c.processes[pid]
 	if p == nil {

+ 16 - 1
containers/container_apm.go

@@ -181,7 +181,7 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 		//query := conn.postgresParser.Parse(r.Payload)
 		//trace.PostgresQuery(query, r.Status.Error(), r.Duration)
 	case l7.ProtocolMysql:
-		//fmt.Println("mysql mysql")
+		fmt.Println("l7.ProtocolMysql")
 		//fmt.Println(conn)
 		if r.Method != l7.MethodStatementClose {
 			stats.observe(r.Status.String(), "", r.Duration)
@@ -194,6 +194,7 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 
 		//apmTrace, ok := c.getTrace(r.TraceId)
 		apmTrace, err := c.getOrInitTrace(r.TraceId)
+		fmt.Println(err)
 		//fmt.Println("mysql r.TraceId:", r.TraceId)
 		//fmt.Println("ok:", ok)
 		//fmt.Println("traceMap:", len(c.traceMap))
@@ -202,6 +203,20 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 			apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
 			c.SendEvent(apmTrace, r.TraceId)
 		}
+
+	case l7.ProtocolDM:
+		fmt.Println("dm dm")
+		if conn.dmParser == nil {
+			conn.dmParser = l7.NewDmParser()
+		}
+		query := conn.dmParser.Parse(r.Payload, r.StatementId)
+		apmTrace, err := c.getOrInitTrace(r.TraceId)
+		fmt.Println("dm r.TraceId:", r.TraceId)
+		if err == nil {
+			apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest)
+			c.SendEvent(apmTrace, r.TraceId)
+		}
+
 	case l7.ProtocolMemcached:
 		//stats.observe(r.Status.String(), "", r.Duration)
 		//cmd, items := l7.ParseMemcached(r.Payload)

+ 108 - 0
ebpftracer/ebpf/l7/dm.c

@@ -0,0 +1,108 @@
+#define DM_HEADER_TYPE_0 0x0 // java
+#define DM_HEADER_TYPE_1 0x1 // go
+#define DM_PAYLOAD_OFFSET 0x40 // 64
+
+#define DM_COM_QUERY 0x05
+#define DM_COM_LOGIN 0xa3
+#define DM_COM_VERSION 0xc8
+//#define DM_COM_STMT_CLOSE 0x04
+#define DM_RESPONSE_EOF   0xbb
+
+// TODO EXEC模式 分包
+// TODO DM_COM_LOGIN 可解析库名
+/*
+ * DM packet:
+ *          |                  |0        3|4     5|6     7|
+ *          |        TCP       |  Herder  | type  |  len  |
+ * 0x0020:  5018 01f5 6210 0000 0100 0000  0500    5000    P...b.........P.
+ *
+ * */
+static __always_inline
+int is_dm_query(char *buf, __u64 buf_size, __u8 *request_type) {
+    if (buf_size < 8) {
+        return 0;
+    }
+    __u8 b[8];
+    bpf_read(buf, b);
+	bpf_printk("buf0:[0x%x]",b[0]);
+	bpf_printk("buf1:[0x%x]",b[1]);
+	bpf_printk("buf2:[0x%x]",b[2]);
+	bpf_printk("buf3:[0x%x]",b[3]);
+	bpf_printk("buf4:[0x%x]",b[4]);
+	bpf_printk("buf5:[0x%x]",b[5]);
+	bpf_printk("buf6:[0x%x]",b[6]);
+	bpf_printk("buf7:[0x%x]",b[7]);
+
+    int length = (int)b[6] | (int)b[7] << 8 ;
+	bpf_printk("=====data length [%d] ", length);
+	bpf_printk("=====buf_size :%d",buf_size);
+	bpf_printk("payload:<%s>",buf+DM_PAYLOAD_OFFSET);
+
+    if ( (b[0] != DM_HEADER_TYPE_0 && b[0] != DM_HEADER_TYPE_1 )|| length + DM_PAYLOAD_OFFSET != buf_size ) { // type must be 1
+        return 0;
+    }
+
+	int packet_header_type = (int) b[4] | (int) b[5] << 8;
+	bpf_printk("=====packet_header_type [0x%x] End\n", packet_header_type);
+
+    if (packet_header_type ==  DM_COM_QUERY) {
+        return 1;
+    }
+//    if (packet_header_type ==  DM_COM_STMT_CLOSE) {
+//        *request_type = DM_COM_STMT_CLOSE;
+//        return 1;
+//    }
+
+//    if (b[4] == DM_COM_STMT_PREPARE) {
+//        *request_type = DM_COM_STMT_PREPARE;
+//        return 1;
+//    }
+    return 0;
+}
+
+// TODO 响应部分
+static __always_inline
+int is_dm_response(char *buf, __u64 buf_size, __u8 request_type, __u32 *statement_id, __u32 *status) {
+	__u8 b[8];
+    bpf_read(buf, b);
+//    if (b[3] < 1) { // sequence must be > 0
+//        return 0;
+//    }
+
+	bpf_printk("buf0:[0x%x]",b[0]);
+	bpf_printk("buf1:[0x%x]",b[1]);
+	bpf_printk("buf2:[0x%x]",b[2]);
+	bpf_printk("buf3:[0x%x]",b[3]);
+	bpf_printk("buf4:[0x%x]",b[4]);
+	bpf_printk("buf5:[0x%x]",b[5]);
+	bpf_printk("buf6:[0x%x]",b[6]);
+	bpf_printk("buf7:[0x%x]",b[7]);
+	int length = (int)b[6] | (int)b[7] << 8 ;
+	bpf_printk("=====data length [%d] ", length);
+	bpf_printk("=====buf_size :%d",buf_size);
+	bpf_printk("=====payload:<%s>",buf+0x60);
+
+	if (b[0] != DM_HEADER_TYPE_0 && b[0] != DM_HEADER_TYPE_1) { // type must be 1
+		return 0;
+	}
+
+	int packet_header_type = (int) b[4] | (int) b[5] << 8;
+	bpf_printk("=====[dm]resp packet_header_type [0x%x] End\n", packet_header_type);
+
+    if (packet_header_type == DM_RESPONSE_EOF) {
+        *status = STATUS_OK;
+        return 1;
+    }
+//    if (b[4] == DM_RESPONSE_OK ) {
+//        if (request_type == DM_COM_STMT_PREPARE) {
+//            bpf_read(buf+5, *statement_id);
+//        }
+//        *status = STATUS_OK;
+//        return 1;
+//    }
+//    if (b[4] == DM_RESPONSE_ERROR) {
+//        *status = STATUS_FAILED;
+//        return 1;
+//    }
+    return 0;
+}

+ 23 - 3
ebpftracer/ebpf/l7/l7.c

@@ -13,6 +13,7 @@
 #define PROTOCOL_HTTP2	   11
 #define PROTOCOL_DUBBO2    12
 #define PROTOCOL_DNS       13
+#define PROTOCOL_DM        14
 
 
 
@@ -57,6 +58,7 @@
 #include "http2.c"
 #include "dubbo2.c"
 #include "dns.c"
+#include "dm.c"
 #include "apm_trace.c"
 
 // go type l7Event struct && type RequestData struct
@@ -202,7 +204,7 @@ void send_event(void *ctx, struct l7_event *e, __u32 pid, __u64 fd) {
     __u64 *timestamp = bpf_map_lookup_elem(&connection_timestamps, &sk);
     if (timestamp) {
         if (*timestamp == 0) {
-//	        bpf_printk("timestamp=0");
+	        bpf_printk("timestamp=0");
             return;
         }
         e->connection_timestamp = *timestamp;
@@ -449,10 +451,13 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
     } else if (is_mysql_query(payload, size, &req->request_type)) {
         cw_bpf_debug("[Enter][Mysql]:thread_id:%d\n",tid);
         if (req->request_type == MYSQL_COM_STMT_CLOSE) {
+	        return 0;
             struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
             if (!e) {
                 return 0;
             }
+	        __u64 trace_id = get_apm_trace_id(pid, tid);
+	        e->trace_id = trace_id;
             e->protocol = PROTOCOL_MYSQL;
             e->method = METHOD_STATEMENT_CLOSE;
             e->payload_size = size;
@@ -462,6 +467,9 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
             return 0;
         }
         req->protocol = PROTOCOL_MYSQL;
+    } else if (is_dm_query(payload, size,&req->request_type)) {
+	    bpf_printk("[Enter][DM]:req->request_type:%d\n",req->request_type);
+	    req->protocol = PROTOCOL_DM;
     } else if (is_mongo_query(payload, size)) {
         req->protocol = PROTOCOL_MONGO;
     } else if (is_rabbitmq_produce(payload, size)) {
@@ -587,7 +595,9 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
     e->method = METHOD_UNKNOWN;
     e->statement_id = 0;
     e->payload_size = 0;
-
+	__u8 b[8];
+	bpf_read(payload, b);
+	bpf_printk("payload44444:[0x%x]",b[4]);
 //    __u32 k0 = 0;
 //    struct member_fields_offset *offset = members_offset__lookup(&k0);
 //    if (!offset)
@@ -675,6 +685,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
     struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
     int response = 0;
     if (!req) {
+	    bpf_printk("no req? 6:[0x%x] k.pid:%d, k.fd:%d",b[4],k.pid,k.fd);
         if (is_dns_response(payload, ret, &k.stream_id, &e->status)) {
 	        bpf_printk("dns");
             req = bpf_map_lookup_elem(&active_l7_requests, &k);
@@ -704,6 +715,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
             send_event(ctx, e, k.pid, k.fd);
             return 0;
         } else {
+	        bpf_printk("bb 6:[0x%x] k.pid:%d, k.fd:%d",b[4],k.pid,k.fd);
             return 0;
         }
     }
@@ -713,6 +725,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
     COPY_PAYLOAD(e->payload, req->payload_size, req->payload);
 
     bpf_map_delete_elem(&active_l7_requests, &k);
+	bpf_printk("delete req--------:[0x%x] k.pid:%d, k.fd:%d",b[4],k.pid,k.fd);
     if (e->protocol == PROTOCOL_HTTP) {
         __u64 trace_id = get_apm_trace_id(pid, tid);
         e->trace_id = trace_id;
@@ -765,7 +778,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
     } else if (e->protocol == PROTOCOL_MEMCACHED) {
         response = is_memcached_response(payload, ret, &e->status);
     } else if (e->protocol == PROTOCOL_MYSQL) {
-        cw_bpf_debug("[Response][Mysql]:thread_id:%d\n",tid);
+	    bpf_printk("[Response][Mysql]:thread_id:%d\n",tid);
         __u64 trace_id = get_apm_trace_id(pid, tid);
 //        cw_bpf_debug("[Mysql] trace_id:%llu", trace_id);
         e->trace_id = trace_id;
@@ -773,6 +786,13 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
         if (req->request_type == MYSQL_COM_STMT_PREPARE) {
             e->method = METHOD_STATEMENT_PREPARE;
         }
+    } else if (e->protocol == PROTOCOL_DM) {
+	    __u64 trace_id = get_apm_trace_id(pid, tid);
+	    e->trace_id = trace_id;
+	    bpf_printk("[DM] trace_id:%llu", trace_id);
+	    bpf_printk("[Response][DM] ---------start");
+	    response = is_dm_response(payload, ret, req->request_type, &e->statement_id, &e->status);
+	    bpf_printk("[Response][DM]end ---------- %d\n",e->status);
     } else if (e->protocol == PROTOCOL_MONGO) {
         response = is_mongo_response(payload, ret, req->partial);
         if (response == 2) { // partial

+ 4 - 3
ebpftracer/ebpf/l7/postgres.c

@@ -17,15 +17,16 @@ int is_postgres_query(char *buf, __u64 buf_size, __u8 *request_type) {
     bpf_read(buf+1, f_length);
     f_length = bpf_htonl(f_length);
 
-    *request_type = f_cmd;
     if ((f_cmd == POSTGRES_FRAME_SIMPLE_QUERY || f_cmd == POSTGRES_FRAME_CLOSE) && f_length+1 == buf_size) {
-        return 1;
+	    *request_type = f_cmd;
+	    return 1;
     }
     char sync[5];
     TRUNCATE_PAYLOAD_SIZE(buf_size);
     bpf_read(buf+buf_size-5, sync);
     if (sync[0] == 'S' && sync[1] == 0 && sync[2] == 0 && sync[3] == 0 && sync[4] == 4) {
-        return 1;
+	    *request_type = f_cmd;
+	    return 1;
     }
     return 0;
 }

+ 73 - 0
ebpftracer/l7/dm.go

@@ -0,0 +1,73 @@
+package l7
+
+import "fmt"
+
+const (
+	minHeaderSize   = 8
+	dmMsgHeaderSize = 64
+)
+
+type DmParser struct {
+	preparedStatements map[string]string
+}
+
+func NewDmParser() *DmParser {
+	return &DmParser{preparedStatements: map[string]string{}}
+}
+
+func (p *DmParser) Parse(payload []byte, statementId uint32) string {
+	payloadSize := len(payload)
+	fmt.Println("payloadSize", payloadSize)
+	if payloadSize < minHeaderSize {
+		return ""
+	}
+	msgSize := int(payload[6]) | int(payload[7])<<8
+	fmt.Println("msgSize", msgSize)
+	if payloadSize < dmMsgHeaderSize+msgSize {
+		return ""
+	}
+	to := dmMsgHeaderSize + msgSize - 1
+	query := string(payload[dmMsgHeaderSize:to])
+	fmt.Printf("[%s]\n", query)
+	return query
+	//msgSize := int(payload[0]) | int(payload[1])<<8 | int(payload[2])<<16
+
+	//cmd := payload[4]
+	//readQuery := func() (query string) {
+	//	to := mysqlMsgHeaderSize + msgSize
+	//	partial := false
+	//	if to > payloadSize {
+	//		to = payloadSize
+	//		partial = true
+	//	}
+	//	query = string(payload[mysqlMsgHeaderSize+1 : to])
+	//	if partial {
+	//		query += "..."
+	//	}
+	//	return query
+	//}
+	//readStatementId := func() string {
+	//	return strconv.FormatUint(uint64(binary.LittleEndian.Uint32(payload[mysqlMsgHeaderSize+1:])), 10)
+	//}
+	//
+	//switch cmd {
+	//case DmComQuery:
+	//	return readQuery()
+	//case DmComStmtExecute:
+	//	statementIdStr := readStatementId()
+	//	statement, ok := p.preparedStatements[statementIdStr]
+	//	if !ok {
+	//		statement = fmt.Sprintf(`EXECUTE %s /* unknown */`, statementIdStr)
+	//	}
+	//	return statement
+	//case DmComStmtPrepare:
+	//	query := readQuery()
+	//	statementIdStr := strconv.FormatUint(uint64(statementId), 10)
+	//	p.preparedStatements[statementIdStr] = query
+	//	return fmt.Sprintf("PREPARE %s FROM %s", statementIdStr, query)
+	//case MysqlComStmtClose:
+	//	statementIdStr := readStatementId()
+	//	delete(p.preparedStatements, statementIdStr)
+	//}
+	return ""
+}

+ 1 - 0
ebpftracer/l7/l7.go

@@ -23,6 +23,7 @@ const (
 	ProtocolHTTP2     Protocol = 11
 	ProtocolDubbo2    Protocol = 12
 	ProtocolDNS       Protocol = 13
+	ProtocolDM        Protocol = 14
 )
 
 func (p Protocol) String() string {

+ 39 - 18
ebpftracer/tracer/inject/inject_linux_amd64.go

@@ -14,7 +14,6 @@ import (
 	"debug/elf"
 	"fmt"
 	"golang.org/x/arch/x86/x86asm"
-	"log"
 	"os"
 	"strings"
 	"syscall"
@@ -474,6 +473,33 @@ func (j *JvmInjector) findLibBaseFromProcMaps(libName string) (uint64, string, e
 	return 1, "", fmt.Errorf("library %s not found", libName)
 }
 
+func (j *JvmInjector) findLibBaseByPathFromProcMaps(libPath string) (uint64, string, error) {
+	mapsFile := fmt.Sprintf("/proc/%d/maps", j.Pid)
+	file, err := os.Open(mapsFile)
+	if err != nil {
+		return 0, "", err
+	}
+	defer file.Close()
+	var start, end uint64
+	scanner := bufio.NewScanner(file)
+	for scanner.Scan() {
+		line := scanner.Text()
+		if strings.Contains(line, libPath) {
+			fmt.Sscanf(line, "%x-%x", &start, &end)
+			fields := strings.Fields(line)
+			if len(fields) > 5 {
+				path := fields[5]
+				if strings.HasSuffix(path, ".so") {
+					fmt.Printf("Found library %s\n", path)
+					return start, path, nil
+				}
+			}
+		}
+	}
+
+	return 1, "", fmt.Errorf("library %s not found", libPath)
+}
+
 func (j *JvmInjector) getFunctionOffset(libPath, functionName string) (elf.Symbol, error) {
 	elfFile, err := elf.Open(libPath)
 	if err != nil {
@@ -517,8 +543,7 @@ func (j *JvmInjector) findReleaseFuncContextFromLibPath() error {
 	j.ReleaseLibNetInfo.LibPath = libPath
 	libName := j.ReleaseLibNetInfo.LibName
 	if err != nil {
-		log.Fatalf("Error finding base addresses: %v", err)
-		return err
+		return fmt.Errorf("Error finding base addresses: %v", err)
 	}
 	fmt.Printf("Base address of (%s)%s: %x\n", "", libName, baseAddress)
 
@@ -529,8 +554,7 @@ func (j *JvmInjector) findReleaseFuncContextFromLibPath() error {
 	j.ReleaseLibNetInfo.FuncSymbol.SymAddr = baseAddress + functionSym.Value
 	j.ReleaseLibNetInfo.FuncSymbol.SymSize = functionSym.Size
 	if err != nil {
-		log.Fatalf("Error getting function offset: %v", err)
-		return err
+		return fmt.Errorf("Error getting function offset: %v", err)
 	}
 	fmt.Printf("Actual memory address of %s at base 0x%x: 0x%x\n", functionName, baseAddress, j.ReleaseLibNetInfo.FuncSymbol.SymAddr)
 	err = j.findReleaseAddressInfoFromMem()
@@ -545,15 +569,14 @@ func (j *JvmInjector) findReleaseFuncContextFromLibPath() error {
 }
 
 func (j *JvmInjector) findDebugFuncContextFromLibPath() error {
-	libName := j.DebugLibNetInfo.LibName
+	//libName := j.DebugLibNetInfo.LibPath
 
 	// 获取release库的基地址
-	baseAddress, libPath, err := j.findLibBaseFromProcMaps(libName)
-	fmt.Println(libPath)
+	baseAddress, libPath, err := j.findLibBaseByPathFromProcMaps(j.DebugLibNetInfo.LibPath)
+	fmt.Println("debug libPath", libPath)
 	functionName := j.DebugLibNetInfo.FuncSymbol.SymName
 	j.DebugLibNetInfo.LibPath = libPath
 	if err != nil {
-		log.Fatalf("Error finding base addresses: %v", err)
 		return err
 	}
 
@@ -564,15 +587,12 @@ func (j *JvmInjector) findDebugFuncContextFromLibPath() error {
 	j.DebugLibNetInfo.FuncSymbol.SymSize = functionSym.Size
 
 	if err != nil {
-		log.Fatalf("Error getting function offset: %v", err)
-		return err
+		return fmt.Errorf("Error getting function offset: %v", err)
 	}
-	fmt.Printf("Actual memory address of %s at base 0x%x: 0x%x\n", functionName, baseAddress, j.DebugLibNetInfo.FuncSymbol.SymAddr)
 
 	_, err = j.findDebugAddressInfoFromMem()
 	if err != nil {
-		log.Printf("Error finding first CALL instuction: %v", err)
-		return err
+		return fmt.Errorf("Error finding first CALL instuction: %v", err)
 	}
 	fmt.Printf("First CALL instuction o1f %s at base 0x%x\n", functionName, baseAddress)
 	return nil
@@ -757,7 +777,7 @@ func JvmInject(jvmInjector *JvmInjector) error {
 	}
 
 	if err != nil {
-		log.Fatalf("Error message during release phase: %v", err)
+		//log.Fatalf("Error message during release phase: %v", err)
 		return err
 	}
 
@@ -766,7 +786,7 @@ func JvmInject(jvmInjector *JvmInjector) error {
 		return err
 	}
 	printCodeData(jvmInjector.ReleaseLibNetInfo)
-	_type, _, err := jvmInjector.findLibBaseFromProcMaps(jvmInjector.DebugLibNetInfo.LibName)
+	_type, _, err := jvmInjector.findLibBaseByPathFromProcMaps(jvmInjector.DebugLibNetInfo.LibPath)
 	if err != nil {
 		// load so
 		if _type == 1 {
@@ -787,12 +807,13 @@ func JvmInject(jvmInjector *JvmInjector) error {
 	}
 
 	err = jvmInjector.findDebugFuncContextFromLibPath()
+	fmt.Println("find debug Context", err)
 	if err != nil {
-		log.Fatalf("Failed to find debug Context: %v", err)
+		return err
 	}
 
 	if !jvmInjector.validateAllPreCheck() {
-		fmt.Println("failed validateAllPreCheck ")
+		fmt.Println("failed validateAllPreCheck ", jvmInjector.PreCheck)
 		return err
 	}
 	// 修改

+ 49 - 3
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -17,6 +17,25 @@ import (
 	tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
 )
 
+const (
+	ProtocolTrace int = 200
+
+	ProtocolHTTP      int = 1
+	ProtocolPostgres  int = 2
+	ProtocolRedis     int = 3
+	ProtocolMemcached int = 4
+	ProtocolMysql     int = 5
+	ProtocolMongo     int = 6
+	ProtocolKafka     int = 7
+	ProtocolCassandra int = 8
+	ProtocolRabbitmq  int = 9
+	ProtocolNats      int = 10
+	ProtocolHTTP2     int = 11
+	ProtocolDubbo2    int = 12
+	ProtocolDNS       int = 13
+	ProtocolDM        int = 14
+)
+
 const (
 	APP_SERVICE_TYPE   = "APPLICATION"
 	SQL_SERVICE_TYPE   = "SQL"
@@ -27,6 +46,7 @@ const (
 const (
 	GO_SERVICE_NAME    = "GO"
 	MYSQL_SERVICE_NAME = "MYSQL"
+	DM_SERVICE_NAME    = "DM"
 	REDIS_SERVICE_NAME = "REDIS"
 	HTTP_SERVICE_NAME  = "HTTPCLIENT"
 )
@@ -161,14 +181,17 @@ func tracetransformData(sdl []tracesdk.ReadOnlySpan) []RootDataT {
 			case 10:
 				switch event.ProtocolType {
 				// http
-				case 1:
+				case ProtocolHTTP:
 					buildHttpMapFromEvent(&mNode, event)
 				// mysql
-				case 5:
+				case ProtocolMysql:
 					buildMysqlMapEvent(&mNode, event)
 				// redis
-				case 3:
+				case ProtocolRedis:
 					buildRedisMapEvent(&mNode, event)
+				// dm
+				case ProtocolDM:
+					buildDMMapEvent(&mNode, event)
 				}
 			}
 
@@ -811,6 +834,29 @@ func buildMysqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
 	}
 }
 
+func buildDMMapEvent(mNode *MapInfoT, event tracesdk.Event) {
+	mNode.Dbn = "TEST"
+	mNode.ServiceName = DM_SERVICE_NAME
+	mNode.ServiceType = SQL_SERVICE_TYPE
+	mNode.MethodName = "database/sql.Query()"
+	for _, attr := range event.Attributes {
+		//fmt.Println(attr.Key, ":", attr.Value.AsInterface())
+		switch attr.Key {
+		case "net.peer.name":
+			mNode.Ip = attr.Value.AsString()
+		case "net.peer.port":
+			mNode.Port = attr.Value.AsInt64()
+		case "db.statement":
+			query := attr.Value.AsString()
+			mNode.Ps = []string{query}
+			words := strings.Fields(query)
+			if len(words) > 0 {
+				mNode.OperType = strings.ToUpper(words[0])
+			}
+		}
+	}
+}
+
 func buildRedisMap(mNode *MapInfoT, sd apmTraceSpan) {
 	mNode.ServiceName = REDIS_SERVICE_NAME
 	mNode.ServiceType = NOSQL_SERVICE_TYPE

+ 2 - 0
pkg/go.opentelemetry.io/otel/semconv/v1.18.0/trace.go

@@ -472,6 +472,8 @@ var (
 	DBSystemClickhouse = DBSystemKey.String("clickhouse")
 	// Cloud Spanner
 	DBSystemSpanner = DBSystemKey.String("spanner")
+	// DaMengDB
+	DBSystemDaMengDB = DBSystemKey.String("dameng")
 )
 
 // DBConnectionString returns an attribute KeyValue conforming to the

+ 20 - 1
tracing/apm_tracing.go

@@ -265,6 +265,8 @@ func (t *Trace) GetSpan() trace.Span {
 }
 
 func (t *Trace) createTraceEvent(name string, eventType int, l7Type int, attrs ...attribute.KeyValue) {
+	fmt.Println("t.currenEventCount", &t.currenEventCount)
+
 	t.span.AddEventApm(name, eventType, l7Type, trace.WithAttributes(attrs...))
 	atomic.AddUint32(t.currenEventCount, 1)
 }
@@ -295,7 +297,8 @@ func (t *Trace) MysqlTraceQuery(query string, error bool, duration time.Duration
 }
 
 func (t *Trace) MysqlTraceQueryEvent(query string, r *l7.RequestData, destination netaddr.IPPort) {
-	if t == nil || query == "" {
+	fmt.Println("query", query)
+	if t == nil {
 		return
 	}
 
@@ -310,6 +313,22 @@ func (t *Trace) MysqlTraceQueryEvent(query string, r *l7.RequestData, destinatio
 	t.createTraceEvent(l7.ProtocolMysql.String(), int(ebpftracer.EventTypeL7Request), int(l7.ProtocolMysql), attr...)
 }
 
+func (t *Trace) DmTraceQueryEvent(query string, r *l7.RequestData, destination netaddr.IPPort) {
+	if t == nil || query == "" {
+		return
+	}
+	l7Type := int(l7.ProtocolDM)
+	var attr []attribute.KeyValue
+	attr = append(attr,
+		semconv.DBSystemDaMengDB,
+		semconv.DBStatement(query),
+		semconv.NetPeerName(destination.IP().String()),
+		semconv.NetPeerPort(int(destination.Port())),
+	)
+	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
+	t.createTraceEvent(l7.ProtocolHTTP.String(), int(ebpftracer.EventTypeL7Request), l7Type, attr...)
+}
+
 func (t *Trace) RedisTraceQuery(cmd, args string, error bool, duration time.Duration) {
 	if t == nil || cmd == "" {
 		return