Browse Source

Feature #TASK_QT-12870 EBPF 达梦数据库协议适配-抓取基础SQL、耗时、报错

Tom.Li 1 year ago
parent
commit
5bb14326ac

+ 0 - 6
.vscode/settings.json

@@ -1,6 +0,0 @@
-{
-    "files.associations": {
-        "*.toml": "toml",
-        "numbers": "c"
-    }
-}

+ 4 - 3
containers/container_apm.go

@@ -198,18 +198,19 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 		}
 
 	case l7.ProtocolDM:
-		fmt.Println("dm dm")
+		fmt.Println("---- onL7RequestApm ProtocolDM  start ---->")
+		fmt.Println("-------dm r.Status :", r.Status)
 		if conn.dmParser == nil {
 			conn.dmParser = l7.NewDmParser()
 		}
 		query := conn.dmParser.Parse(r.Payload, r.StatementId)
 		apmTrace, err := c.getOrInitTrace(r.TraceId)
-		fmt.Println("dm r.TraceId:", r.TraceId)
+		fmt.Println("-------dm r.TraceId:", r.TraceId)
 		if err == nil {
 			apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest)
 			c.SendEvent(apmTrace, r.TraceId)
 		}
-
+		fmt.Println("---- onL7RequestApm ProtocolDM  end <----")
 	case l7.ProtocolMemcached:
 		//stats.observe(r.Status.String(), "", r.Duration)
 		//cmd, items := l7.ParseMemcached(r.Payload)

+ 4 - 2
containers/registry.go

@@ -286,12 +286,13 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 					}
 				}
 			case ebpftracer.EventTypeL7Request:
-				//fmt.Println("EventTypeL7Request")
-				//fmt.Println("e.L7Request Payload:", string(e.L7Request.Payload))
+				fmt.Println(" ========= EventTypeL7Request Begin===========")
+				fmt.Println(" ===========e.L7Request Payload:", string(e.L7Request.Payload))
 				if e.L7Request == nil {
 					continue
 				}
 				if c := r.containersByPid[e.Pid]; c != nil {
+					fmt.Println(" ========= EventTypeL7Request Doing===========")
 					ip2fqdn := c.onL7RequestApm(e.Pid, e.Fd, e.Timestamp, e.L7Request)
 					r.ip2fqdnLock.Lock()
 					for ip, fqdn := range ip2fqdn {
@@ -299,6 +300,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 					}
 					r.ip2fqdnLock.Unlock()
 				}
+				fmt.Println(" ========= EventTypeL7Request End===========")
 			case ebpftracer.EventTypeFunEnt:
 				if e.StackEvent == nil {
 					continue

+ 131 - 86
ebpftracer/ebpf/l7/dm.c

@@ -1,108 +1,153 @@
-#define DM_HEADER_TYPE_0 0x0 // java
-#define DM_HEADER_TYPE_1 0x1 // go
-#define DM_PAYLOAD_OFFSET 0x40 // 64
-
-#define DM_COM_QUERY 0x05
-#define DM_COM_LOGIN 0xa3
-#define DM_COM_VERSION 0xc8
-//#define DM_COM_STMT_CLOSE 0x04
-#define DM_RESPONSE_EOF   0xbb
-
-// TODO EXEC模式 分包
-// TODO DM_COM_LOGIN 可解析库名
+#define DM_HEADER_SIZE 64 // 64
+
+#define DM_PAYLOAD_OFFSET DM_HEADER_SIZE
+//#define DM_SVR_ENCODING_OFFSET 28
+
+//#define DM_VERSION_CLI 0xc8
+//#define DM_DO_LOGIN  0x01
+
+//#define DM_STMT_BEGIN 0x03 //bb
+#define DM_QUERY 0x05 //bb
+//#define DM_STMT_PREPARE DM_QUERY //00
+#define DM_STMT_EXECUTE 0x0d //bb
+//#define DM_STMT_CLOSE 0x04  // 00
+
+#define DM_RESPONSE_OK    0xbb
+#define DM_RESPONSE_EOF   0x00
+
+//#define DM_RESPONSE_ERROR 0xa0 、0x9d ...
+//10-13 4位 响应码 小于0的为错误
+//跳过'-7036'这个错误码, 参考go-dm: EC_RN_EXCEED_ROWSET_SIZE = newDmError(-7036, "")
+#define DM_SVR_ERR_CODE_EC_RN_EXCEED_ROWSET_SIZE -7036
+
 /*
  * DM packet:
- *          |                  |0        3|4     5|6     7|
+ *          |                  |0        3|4     5|6     9|
  *          |        TCP       |  Herder  | type  |  len  |
  * 0x0020:  5018 01f5 6210 0000 0100 0000  0500    5000    P...b.........P.
  *
- * */
+*/
+
 static __always_inline
 int is_dm_query(char *buf, __u64 buf_size, __u8 *request_type) {
-    if (buf_size < 8) {
+//    bpf_printk("===== DM-req Begin ...");
+
+//    bpf_printk("===== DM-req buf_size :%d",buf_size);
+    //是否满足header大小
+    if (buf_size < DM_HEADER_SIZE) {
         return 0;
     }
-    __u8 b[8];
+
+    //读取10个字节
+    __u8 b[10];
     bpf_read(buf, b);
-	bpf_printk("buf0:[0x%x]",b[0]);
-	bpf_printk("buf1:[0x%x]",b[1]);
-	bpf_printk("buf2:[0x%x]",b[2]);
-	bpf_printk("buf3:[0x%x]",b[3]);
-	bpf_printk("buf4:[0x%x]",b[4]);
-	bpf_printk("buf5:[0x%x]",b[5]);
-	bpf_printk("buf6:[0x%x]",b[6]);
-	bpf_printk("buf7:[0x%x]",b[7]);
-
-    int length = (int)b[6] | (int)b[7] << 8 ;
-	bpf_printk("=====data length [%d] ", length);
-	bpf_printk("=====buf_size :%d",buf_size);
-	bpf_printk("payload:<%s>",buf+DM_PAYLOAD_OFFSET);
-
-    if ( (b[0] != DM_HEADER_TYPE_0 && b[0] != DM_HEADER_TYPE_1 )|| length + DM_PAYLOAD_OFFSET != buf_size ) { // type must be 1
+
+//	bpf_printk("buf0:[0x%x]",b[0]);
+//	bpf_printk("buf1:[0x%x]",b[1]);
+//	bpf_printk("buf2:[0x%x]",b[2]);
+//	bpf_printk("buf3:[0x%x]",b[3]);
+//
+//	bpf_printk("buf4:[0x%x]",b[4]);
+//	bpf_printk("buf5:[0x%x]",b[5]);
+//
+//	bpf_printk("buf6:[0x%x]",b[6]);
+//	bpf_printk("buf7:[0x%x]",b[7]);
+//	bpf_printk("buf8:[0x%x]",b[8]);
+//	bpf_printk("buf9:[0x%x]",b[9]);
+
+    __s32 body_size = (__s32)(b[6]& 0xff) | ((__s32)b[7]&0xff) << 8 | ((__s32)b[8]&0xff) << 16 | ((__s32)b[9]&0xff) << 24 ;
+//    bpf_printk("===== DM-req body_size [%d] ", body_size);
+
+    if( DM_HEADER_SIZE + body_size < buf_size ) {
         return 0;
-    }
+     }
+
+//     if(body_size > 0) {
+//      __u8 bb[4];
+//      bpf_read(buf+DM_PAYLOAD_OFFSET, bb);
+//
+//      __s32 tmp_len = (__s32)(bb[0]& 0xff) | ((__s32)bb[1]&0xff) << 8 | ((__s32)bb[2]&0xff) << 16 | ((__s32)bb[3]&0xff) << 24 ;
+//      if(tmp_len>0){
+//        char* bbb[10];
+//        bpf_read(buf+68, bbb);
+//        bpf_printk("===== DM-req payload:%s",bbb);
+//      }
+//     }
 
-	int packet_header_type = (int) b[4] | (int) b[5] << 8;
-	bpf_printk("=====packet_header_type [0x%x] End\n", packet_header_type);
 
-    if (packet_header_type ==  DM_COM_QUERY) {
-        return 1;
+   __s16 pack_type =  (__s16)(b[4]& 0xff) | ((__s16)b[5]&0xff) << 8 ;
+//    bpf_printk("===== DM-req pack_type <0x%x> [%d] ", pack_type,pack_type);
+    if (pack_type == DM_QUERY || pack_type == DM_STMT_EXECUTE) {
+            *request_type = pack_type;
+            return 1;
     }
-//    if (packet_header_type ==  DM_COM_STMT_CLOSE) {
-//        *request_type = DM_COM_STMT_CLOSE;
-//        return 1;
-//    }
-
-//    if (b[4] == DM_COM_STMT_PREPARE) {
-//        *request_type = DM_COM_STMT_PREPARE;
-//        return 1;
-//    }
+
+//    bpf_printk("===== DM-req end not dm-req ...\n");
     return 0;
 }
 
-// TODO 响应部分
+
 static __always_inline
-int is_dm_response(char *buf, __u64 buf_size, __u8 request_type, __u32 *statement_id, __u32 *status) {
-	__u8 b[8];
+int is_dm_response(char *buf, __u64 buf_size, __u8 request_type, __u32 *status) {
+
+//    bpf_printk("===== DM-resp  Begin ...");
+
+//	bpf_printk("===== DM-resp buf_size :%d",buf_size);
+     if (buf_size < DM_HEADER_SIZE) {
+           return 0;
+     }
+	__u8 b[14];
     bpf_read(buf, b);
-//    if (b[3] < 1) { // sequence must be > 0
-//        return 0;
-//    }
-
-	bpf_printk("buf0:[0x%x]",b[0]);
-	bpf_printk("buf1:[0x%x]",b[1]);
-	bpf_printk("buf2:[0x%x]",b[2]);
-	bpf_printk("buf3:[0x%x]",b[3]);
-	bpf_printk("buf4:[0x%x]",b[4]);
-	bpf_printk("buf5:[0x%x]",b[5]);
-	bpf_printk("buf6:[0x%x]",b[6]);
-	bpf_printk("buf7:[0x%x]",b[7]);
-	int length = (int)b[6] | (int)b[7] << 8 ;
-	bpf_printk("=====data length [%d] ", length);
-	bpf_printk("=====buf_size :%d",buf_size);
-	bpf_printk("=====payload:<%s>",buf+0x60);
-
-	if (b[0] != DM_HEADER_TYPE_0 && b[0] != DM_HEADER_TYPE_1) { // type must be 1
-		return 0;
-	}
-
-	int packet_header_type = (int) b[4] | (int) b[5] << 8;
-	bpf_printk("=====[dm]resp packet_header_type [0x%x] End\n", packet_header_type);
-
-    if (packet_header_type == DM_RESPONSE_EOF) {
-        *status = STATUS_OK;
-        return 1;
+
+//	bpf_printk("buf0:[0x%x]",b[0]);
+//	bpf_printk("buf1:[0x%x]",b[1]);
+//	bpf_printk("buf2:[0x%x]",b[2]);
+//	bpf_printk("buf3:[0x%x]",b[3]);
+//
+//	bpf_printk("buf4:[0x%x]",b[4]);
+//	bpf_printk("buf5:[0x%x]",b[5]);
+//
+//	bpf_printk("buf6:[0x%x]",b[6]);
+//	bpf_printk("buf7:[0x%x]",b[7]);
+//	bpf_printk("buf8:[0x%x]",b[8]);
+//	bpf_printk("buf9:[0x%x]",b[9]);
+//
+//    bpf_printk("buf10:[0x%x]",b[10]);
+//    bpf_printk("buf11:[0x%x]",b[11]);
+//    bpf_printk("buf12:[0x%x]",b[12]);
+//    bpf_printk("buf13:[0x%x]",b[13]);
+
+  __s32 body_size = (__s32)(b[6]& 0xff) | ((__s32)b[7]&0xff) << 8 | ((__s32)b[8]&0xff) << 16 | ((__s32)b[9]&0xff) << 24 ;
+//	bpf_printk("===== DM-resp body_size [%d] ", body_size);
+
+	//0xbb响应时要考虑数据分段传输的情况(DM_HEADER_SIZE + body_size > buf_size)
+	if( DM_HEADER_SIZE + body_size  < buf_size ) {
+            return 0;
     }
-//    if (b[4] == DM_RESPONSE_OK ) {
-//        if (request_type == DM_COM_STMT_PREPARE) {
-//            bpf_read(buf+5, *statement_id);
-//        }
-//        *status = STATUS_OK;
-//        return 1;
-//    }
-//    if (b[4] == DM_RESPONSE_ERROR) {
-//        *status = STATUS_FAILED;
-//        return 1;
-//    }
+
+    __s16 pack_type =  (__s16)(b[4]& 0xff) | ((__s16)b[5]&0xff) << 8 ;
+//    bpf_printk("===== DM-resp pack_type <0x%x> [%d] ", pack_type,pack_type);
+//    bpf_printk("===== DM-resp request_type <0x%x> [%d] ", request_type,request_type);
+
+
+    if (pack_type == DM_RESPONSE_OK) {
+        //收到响应后查询请求流程结束
+        if(request_type == DM_QUERY || request_type == DM_STMT_EXECUTE ) {
+           *status = STATUS_OK;
+           return 1;
+        }
+        return 0;
+    }
+
+    //判断是否报错 10-13 4位 响应码 小于0的 为错误
+    __s32 svr_msg_code= (__s32)(b[10]& 0xff) | ((__s32)b[11]&0xff) << 8 | ((__s32)b[12]&0xff) << 16 | ((__s32)b[13]&0xff) << 24 ;
+    if( svr_msg_code < 0 &&  svr_msg_code != DM_SVR_ERR_CODE_EC_RN_EXCEED_ROWSET_SIZE ){
+       //检测到错误后,请求流程结束
+//       bpf_printk("===== DM-resp svr_msg_code request_type <0x%x>, svr_msg_code [%d]",request_type,svr_msg_code);
+       *status = STATUS_FAILED;
+       return 1;
+     }
+
+//    bpf_printk("===== DM-resp End  not dm-resp...");
     return 0;
 }

+ 207 - 7
ebpftracer/ebpf/l7/l7.c

@@ -145,7 +145,6 @@ struct l7_request {
     __u64 payload_size;
     char payload[MAX_PAYLOAD_SIZE];
 };
-
 struct {
      __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
      __type(key, int);
@@ -160,6 +159,37 @@ struct {
     __uint(max_entries, 32768);
 } active_l7_requests SEC(".maps");
 
+struct l7_request_dm_ctx {
+//    __u8 request_type;
+    __u64 req_start_at_ns;
+
+//    char db_name[128] ;
+//    __s32 srv_encoding ;
+
+    __u64 query_sql_payload_size;
+    char query_sql_payload[MAX_PAYLOAD_SIZE];
+
+    __u32 status;
+//    __s32 svr_err_code;
+//    __u64 svr_err_msg_size;
+//    char svr_err_msg[MAX_PAYLOAD_SIZE];
+};
+
+struct {
+     __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+     __type(key, int);
+     __type(value, struct l7_request_dm_ctx);
+     __uint(max_entries, 1);
+} l7_request_dm_ctx_heap SEC(".maps");
+
+struct {
+    __uint(type, BPF_MAP_TYPE_LRU_HASH);
+    __uint(key_size, sizeof(struct l7_request_key));
+    __uint(value_size, sizeof(struct l7_request_dm_ctx));
+    __uint(max_entries, 32768);
+} active_l7_requests_dm_ctx SEC(".maps");
+
+
 struct {
      __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
      __type(key, int);
@@ -414,8 +444,38 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         }
         req->protocol = PROTOCOL_MYSQL;
     } else if (is_dm_query(payload, size,&req->request_type)) {
-	    bpf_printk("[Enter][DM]:req->request_type:%d\n",req->request_type);
-	    req->protocol = PROTOCOL_DM;
+        bpf_printk("[Request][DM] start -------->");
+        req->protocol = PROTOCOL_DM;
+
+        struct l7_request_dm_ctx *dm_ctx ;
+        dm_ctx = bpf_map_lookup_elem(&active_l7_requests_dm_ctx, &k);
+        if (!dm_ctx) {
+           dm_ctx = bpf_map_lookup_elem(&l7_request_dm_ctx_heap, &zero);
+           if (!dm_ctx) {
+               return 0;
+           }
+           dm_ctx->req_start_at_ns = 0 ;
+           dm_ctx->status = 0;
+           bpf_map_update_elem(&active_l7_requests_dm_ctx, &k, dm_ctx, BPF_NOEXIST);
+           bpf_printk("[Request][DM]  init active_l7_requests_dm_ctx,request_type <0x%x> [%d]",req->request_type,req->request_type);
+       }
+
+       if (req->request_type == DM_QUERY) {
+
+           dm_ctx->req_start_at_ns=bpf_ktime_get_ns();
+           req->ns = dm_ctx->req_start_at_ns ;
+
+           dm_ctx->query_sql_payload_size = size;
+           COPY_PAYLOAD(dm_ctx->query_sql_payload, size, payload);
+           bpf_map_update_elem(&active_l7_requests_dm_ctx, &k, dm_ctx, BPF_ANY);
+        }
+
+        if(req->request_type == DM_STMT_EXECUTE) {
+            bpf_map_update_elem(&active_l7_requests, &k, req, BPF_ANY);
+            return 0 ;
+        }
+        bpf_printk("[Request][DM] is request ,request_type <0x%x> [%d]",req->request_type,req->request_type);
+        bpf_printk("[Request][DM] end <--------");
     } else if (is_mongo_query(payload, size)) {
         req->protocol = PROTOCOL_MONGO;
     } else if (is_rabbitmq_produce(payload, size)) {
@@ -729,12 +789,152 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
             e->method = METHOD_STATEMENT_PREPARE;
         }
     } else if (e->protocol == PROTOCOL_DM) {
+
+        bpf_printk("[Response][DM] start -------->");
+
+        struct l7_request_dm_ctx *dm_ctx = bpf_map_lookup_elem(&active_l7_requests_dm_ctx, &k);
+        if (!dm_ctx) {
+            return 0;
+        }
+        //获取server端的字符集类型(用来编码数据库名称,暂时不用)
+//        if(req->request_type == DM_VERSION_CLI) {
+//            __u8 srv_encode_buf[4];
+//            bpf_read(payload+DM_SVR_ENCODING_OFFSET, srv_encode_buf);
+//
+//            bpf_printk(" srv_encode_buf:[0x%x]",srv_encode_buf[0]);
+//            bpf_printk(" srv_encode_buf:[0x%x]",srv_encode_buf[1]);
+//            bpf_printk(" srv_encode_buf:[0x%x]",srv_encode_buf[2]);
+//            bpf_printk(" srv_encode_buf:[0x%x]",srv_encode_buf[3]);
+//            dm_ctx->srv_encoding = (__s32)(srv_encode_buf[0]& 0xff) | ((__s32)srv_encode_buf[1]&0xff) << 8 | ((__s32)srv_encode_buf[2]&0xff) << 16 | ((__s32)srv_encode_buf[3]&0xff) << 24 ;
+//            bpf_printk(" srv_encode_buf:[%d]",dm_ctx->srv_encoding);
+//            bpf_map_update_elem(&active_l7_requests_dm_ctx, &k, dm_ctx, BPF_ANY);
+//           return 0;
+//        }
+            //获取数据库名称(暂时不用)
+//        if(req->request_type == DM_DO_LOGIN ) {
+////            //todo 拷贝payload 不进行处理,留给用户空间处理
+////            //pack_type == DM_LOGIN_ACK
+//            if (ret == DM_HEADER_SIZE) {
+//                 bpf_map_update_elem(&active_l7_requests, &k, req, BPF_ANY);
+//                 return 0;
+//            }
+////            //获取数据库名称
+//            // bpf_read(buf+DM_PAYLOAD_OFFSET, svr_encode_buf);
+//            __u8 tmp_1[4];
+//    //        __u32 tmp_offset = DM_PAYLOAD_OFFSET ;
+//            __u32 tmp_offset = 0 ;
+//            //1
+//            bpf_read(payload+tmp_offset, tmp_1);
+//            __s32 tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
+//            tmp_offset = tmp_offset + 4 ;
+//            tmp_offset = tmp_offset+tmp_len ;
+//            bpf_printk("=====    DM_LOGIN_ACK ----1 tmp_len:%d",tmp_len);
+//
+//            //2
+//            bpf_read(payload+tmp_offset,tmp_1);
+//            tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
+//            tmp_offset = tmp_offset + 4 ;
+//            tmp_offset = tmp_offset+tmp_len ;
+//              bpf_printk("=====    DM_LOGIN_ACK ----2 tmp_len:%d",tmp_len);
+//            //3
+//            bpf_read(payload+tmp_offset,tmp_1);
+//            tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
+//            tmp_offset = tmp_offset + 4 ;
+//            tmp_offset = tmp_offset+tmp_len ;
+//            bpf_printk("=====    DM_LOGIN_ACK ----3 tmp_len:%d",tmp_len);
+//            //4
+//            bpf_read(payload+tmp_offset,tmp_1);
+//            tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
+//            tmp_offset = tmp_offset + 4 ;
+//            tmp_offset = tmp_offset+tmp_len ;
+//            bpf_printk("=====   DM_LOGIN_ACK ----4 tmp_len:%d",tmp_len);
+//            //5
+//            bpf_read(payload+tmp_offset,tmp_1);
+//            tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
+//            tmp_offset = tmp_offset + 4 ;
+//            tmp_offset = tmp_offset+tmp_len ;
+//            bpf_printk("=====    DM_LOGIN_ACK ----5 tmp_len:%d",tmp_len);
+//            //6
+//            bpf_read(payload+tmp_offset,tmp_1);
+//            tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
+//            tmp_offset = tmp_offset + 4 ;
+//            tmp_offset = tmp_offset+tmp_len ;
+//            bpf_printk("=====   DM_LOGIN_ACK ----6 tmp_len:%d",tmp_len);
+//            //7
+//            bpf_read(payload+tmp_offset,tmp_1);
+//            tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
+//            tmp_offset = tmp_offset + 4 ;
+//            tmp_offset = tmp_offset+tmp_len ;
+//            bpf_printk("=====  DM_LOGIN_ACK ----7 tmp_len:%d",tmp_len);
+//            //8
+//            bpf_read(payload+tmp_offset,tmp_1);
+//            tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
+//            tmp_offset = tmp_offset + 4 ;
+//            tmp_offset = tmp_offset+tmp_len ;
+//            bpf_printk("=====  DM_LOGIN_ACK ----8 tmp_len:%d",tmp_len);
+//
+//            //9
+//            tmp_offset = tmp_offset + 4 ;
+//            //10
+//            tmp_offset = tmp_offset + 4 ;
+//            //11
+//            tmp_offset = tmp_offset + 4 ;
+//            //12
+//            bpf_read(payload+tmp_offset,tmp_1);
+//            tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
+//            tmp_offset = tmp_offset + 4 ;
+//            tmp_offset = tmp_offset+tmp_len ;
+//            bpf_printk("=====    DM_LOGIN_ACK ----12 tmp_len:%d",tmp_len);
+//            //13
+//            bpf_read(payload+tmp_offset,tmp_1);
+//            tmp_offset = tmp_offset + 4 ;
+//            tmp_len = (__s32)(tmp_1[0]& 0xff) | ((__s32)tmp_1[1]&0xff) << 8 | ((__s32)tmp_1[2]&0xff) << 16 | ((__s32)tmp_1[3]&0xff) << 24 ;
+//            bpf_printk("=====    DM_LOGIN_ACK ----13 tmp_len:%d",tmp_len);
+//
+//            bpf_printk("=====   DM_LOGIN_ACK  dbName tmp_offset:%d",tmp_offset);
+//            if(tmp_len>0){
+////                  char dbNameBuff[7] ;
+//                  char dbNameBuff[128] ;
+//                  if(tmp_len > 128){
+//                    tmp_len = 128;
+//                  }
+//                  bpf_read(payload+tmp_offset,dbNameBuff);
+//                  dbNameBuff[tmp_len-1]='\0';
+////                      bpf_printk("=====  DM-resp  DM_LOGIN_ACK dbName[0]:%c",dbNameBuff[0]);
+////                      bpf_printk("=====  DM-resp  DM_LOGIN_ACK dbName[1]:%c",dbNameBuff[1]);
+////                      bpf_printk("=====  DM-resp  DM_LOGIN_ACK dbName[2]:%c",dbNameBuff[2]);
+////                      bpf_printk("=====  DM-resp  DM_LOGIN_ACK dbName[3]:%c",dbNameBuff[3]);
+////                      bpf_printk("=====  DM-resp  DM_LOGIN_ACK dbName[4]:%c",dbNameBuff[4]);
+////                      bpf_printk("=====  DM-resp  DM_LOGIN_ACK dbName[5]:%c",dbNameBuff[5]);
+//                  bpf_printk("===== DM_LOGIN_ACK dbName:%s",dbNameBuff);
+//
+//                __u32 dn_name_size = (__u32)tmp_len + 1 ;
+//                COPY_PAYLOAD(dm_ctx->db_name,dn_name_size, dbNameBuff);
+//                bpf_printk("===== DM_LOGIN_ACK strcpy(db_name,dbNameBuff):%s",dm_ctx->db_name);
+//                bpf_map_update_elem(&active_l7_requests_dm_ctx, &k, dm_ctx, BPF_ANY);
+//            }
+//          return 0;
+//        }
+
 	    __u64 trace_id = get_apm_trace_id(pid, tid);
 	    e->trace_id = trace_id;
-	    bpf_printk("[DM] trace_id:%llu", trace_id);
-	    bpf_printk("[Response][DM] ---------start");
-	    response = is_dm_response(payload, ret, req->request_type, &e->statement_id, &e->status);
-	    bpf_printk("[Response][DM]end ---------- %d\n",e->status);
+//	    bpf_printk("[Response][DM] trace_id:%llu", trace_id);
+
+	    response = is_dm_response(payload, ret, req->request_type, &dm_ctx->status);
+//	    bpf_printk("[Response][DM]  is_dm_response status ---------- %d",dm_ctx->status);
+
+	    if(response) {
+	         req->ns = dm_ctx->req_start_at_ns;
+	         e->status = dm_ctx->status;
+	         e->payload_size = dm_ctx->query_sql_payload_size;
+	         COPY_PAYLOAD(e->payload, dm_ctx->query_sql_payload_size, dm_ctx->query_sql_payload);
+             bpf_map_delete_elem(&active_l7_requests_dm_ctx, &k);
+             bpf_printk("[Response][DM]  is response ,delete  active_l7_requests_dm_ctx -- req->request_type<0x%x> , e->payload_size:[%d]",req->request_type,e->payload_size);
+        } else {
+             bpf_printk("[Response][DM]  not response req->request_type <0x%x>",req->request_type);
+        }
+
+        bpf_printk("[Response][DM] end <---------\n");
     } else if (e->protocol == PROTOCOL_MONGO) {
         response = is_mongo_response(payload, ret, req->partial);
         if (response == 2) { // partial

+ 50 - 52
ebpftracer/l7/dm.go

@@ -3,71 +3,69 @@ package l7
 import "fmt"
 
 const (
-	minHeaderSize   = 8
-	dmMsgHeaderSize = 64
+	dmMsgHeaderSize     = 64
+	dmMsgOffsetPackType = 4
+	dmMsgOffsetBodyLen  = 6
+	dmMsgPackTypeQuery  = 0x05
 )
 
 type DmParser struct {
-	preparedStatements map[string]string
+	//preparedStatements map[string]string
 }
 
 func NewDmParser() *DmParser {
-	return &DmParser{preparedStatements: map[string]string{}}
+	//return &DmParser{preparedStatements: map[string]string{}}
+	return &DmParser{}
 }
 
 func (p *DmParser) Parse(payload []byte, statementId uint32) string {
-	payloadSize := len(payload)
-	fmt.Println("payloadSize", payloadSize)
-	if payloadSize < minHeaderSize {
+
+	payloadSize := uint32(len(payload))
+	fmt.Println("-------dm payloadSize", payloadSize)
+
+	if payloadSize <= dmMsgHeaderSize {
 		return ""
 	}
-	msgSize := int(payload[6]) | int(payload[7])<<8
-	fmt.Println("msgSize", msgSize)
-	if payloadSize < dmMsgHeaderSize+msgSize {
+
+	//msgHeader := payload[:dmMsgHeaderSize]
+
+	packType := p.getUint32By2ByteFromPayload(payload, dmMsgOffsetPackType)
+	if packType != dmMsgPackTypeQuery {
+		fmt.Println("-------dm packType != dmMsgPackTypeQuery ", packType)
 		return ""
 	}
-	to := dmMsgHeaderSize + msgSize - 1
-	query := string(payload[dmMsgHeaderSize:to])
-	fmt.Printf("[%s]\n", query)
+
+	bodySize := p.getUint32By4ByteFromPayload(payload, dmMsgOffsetBodyLen)
+	if dmMsgHeaderSize+bodySize < payloadSize {
+		return ""
+	}
+	queryTail := ""
+	if dmMsgHeaderSize+bodySize > payloadSize {
+		//partial
+		queryTail = "..."
+	}
+	query := string(payload[dmMsgHeaderSize:]) + queryTail
+	fmt.Printf("-------dm query is [%s] \n", query)
 	return query
-	//msgSize := int(payload[0]) | int(payload[1])<<8 | int(payload[2])<<16
+}
+
+func (p *DmParser) getUint32By2ByteFromPayload(payload []byte, offset int) uint32 {
+	var result uint32
+	result = uint32(payload[offset] & 0xff)
+	offset++
+	result |= uint32(payload[offset]&0xff) << 8
+	offset++
+	return result
+}
 
-	//cmd := payload[4]
-	//readQuery := func() (query string) {
-	//	to := mysqlMsgHeaderSize + msgSize
-	//	partial := false
-	//	if to > payloadSize {
-	//		to = payloadSize
-	//		partial = true
-	//	}
-	//	query = string(payload[mysqlMsgHeaderSize+1 : to])
-	//	if partial {
-	//		query += "..."
-	//	}
-	//	return query
-	//}
-	//readStatementId := func() string {
-	//	return strconv.FormatUint(uint64(binary.LittleEndian.Uint32(payload[mysqlMsgHeaderSize+1:])), 10)
-	//}
-	//
-	//switch cmd {
-	//case DmComQuery:
-	//	return readQuery()
-	//case DmComStmtExecute:
-	//	statementIdStr := readStatementId()
-	//	statement, ok := p.preparedStatements[statementIdStr]
-	//	if !ok {
-	//		statement = fmt.Sprintf(`EXECUTE %s /* unknown */`, statementIdStr)
-	//	}
-	//	return statement
-	//case DmComStmtPrepare:
-	//	query := readQuery()
-	//	statementIdStr := strconv.FormatUint(uint64(statementId), 10)
-	//	p.preparedStatements[statementIdStr] = query
-	//	return fmt.Sprintf("PREPARE %s FROM %s", statementIdStr, query)
-	//case MysqlComStmtClose:
-	//	statementIdStr := readStatementId()
-	//	delete(p.preparedStatements, statementIdStr)
-	//}
-	return ""
+func (p *DmParser) getUint32By4ByteFromPayload(payload []byte, offset int) uint32 {
+	var result uint32
+	result = uint32(payload[offset] & 0xff)
+	offset++
+	result |= uint32(payload[offset]&0xff) << 8
+	offset++
+	result |= uint32(payload[offset]&0xff) << 16
+	offset++
+	result |= uint32(payload[offset]&0xff) << 24
+	return result
 }

+ 2 - 0
ebpftracer/l7/l7.go

@@ -56,6 +56,8 @@ func (p Protocol) String() string {
 		return "Dubbo2"
 	case ProtocolDNS:
 		return "DNS"
+	case ProtocolDM:
+		return "DM"
 	}
 	return "UNKNOWN:" + strconv.Itoa(int(p))
 }

+ 6 - 0
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -850,6 +850,12 @@ func buildDMMapEvent(mNode *MapInfoT, event tracesdk.Event) {
 			if len(words) > 0 {
 				mNode.OperType = strings.ToUpper(words[0])
 			}
+		case "sql.exception":
+			if attr.Value.AsBool() {
+				mNode.Exception = 1
+			} else {
+				mNode.Exception = 0
+			}
 		}
 	}
 }

+ 4 - 0
tracing/apm_tracing.go

@@ -326,6 +326,10 @@ func (t *Trace) DmTraceQueryEvent(query string, r *l7.RequestData, destination n
 		semconv.NetPeerPort(int(destination.Port())),
 	)
 	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
+
+	attr = append(attr,
+		attribute.Bool("sql.exception", r.Status.Error()),
+	)
 	t.createTraceEvent(l7.ProtocolHTTP.String(), int(ebpftracer.EventTypeL7Request), l7Type, attr...)
 }