فهرست منبع

Feature #TASK_QT-18250 完成go-grpc多版本问题,并删除部分无用日志。

rock 7 ماه پیش
والد
کامیت
d144171df7

+ 5 - 0
ebpftracer/ebpf/include/socket_trace_common.h

@@ -203,6 +203,11 @@ struct ebpf_proc_info {
 	__u64 ctx_ptr_pos;
 	__u64 headers_ptr_pos;
 	__u64 buckets_ptr_pos;
+	//gRPC
+	__u64 httpclient_nextid_pos;
+	__u64 stream_method_ptr_pos;
+	__u64 stream_ctx_pos;
+	__u8 is_new_frame_pos;
 } __attribute__((packed));
 
 enum {

+ 20 - 12
ebpftracer/ebpf/utrace/go/net/grpc.client.probe.bpf.c

@@ -72,23 +72,23 @@ struct {
 
 // Injected in init
 // volatile const u64 clientconn_target_ptr_pos;
-u64 clientconn_target_ptr_pos = 24;
+u64 clientconn_target_ptr_pos = 24; //使用固定值24即可,不再处理多版本场景。
 // volatile const u64 httpclient_nextid_pos;
-u64 httpclient_nextid_pos = 404;
+// u64 httpclient_nextid_pos = 404;    //处理多版本,通过变量获取吧
 // volatile const u64 headerFrame_streamid_pos;
-u64 headerFrame_streamid_pos = 0;
+u64 headerFrame_streamid_pos = 0;   //使用固定值0即可,不再处理多版本场景。
 // volatile const u64 headerFrame_hf_pos;
-u64 headerFrame_hf_pos = 8;
+u64 headerFrame_hf_pos = 8;          //使用固定值8即可,不再处理多版本场景。
 // volatile const u64 error_status_pos;
-u64 error_status_pos = 0;
+u64 error_status_pos = 0;           //使用固定值0即可,不再处理多版本场景。
 // volatile const u64 status_s_pos;
 // static u64 status_s_pos = 0;
 // volatile const u64 status_message_pos;
-u64 status_message_pos = 48;
+u64 status_message_pos = 48;        //使用固定值48即可,不再处理多版本场景。
 // volatile const u64 status_code_pos;
 // static u64 status_code_pos = 40;
 
-volatile const bool write_status_supported;
+// volatile const bool write_status_supported;
 
 // This instrumentation attaches uprobe to the following function:
 // func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error
@@ -186,9 +186,9 @@ int uprobe_ClientConn_Invoke_Returns(struct pt_regs *ctx) {
         return 0;
     }
 
-    if (!write_status_supported) {
-        goto done;
-    }
+    // if (!write_status_supported) {
+    //     goto done;
+    // }
     // Getting the returned response (error)
     // The status code is embedded 3 layers deep:
     // Invoke() error
@@ -265,7 +265,7 @@ done:
     e->statement_id = 0;
     e->payload_size = grpc_span->method_size;
     if (trace_info) {
-        bpf_printk("trace_info->trace_id is %llu\n", trace_info->trace_id);
+        // bpf_printk("trace_info->trace_id is %llu\n", trace_info->trace_id);
         e->trace_id = trace_info->trace_id;
     }
     if(e->trace_id == 0){
@@ -505,11 +505,19 @@ SEC("uprobe/http2Client_NewStream")
 // func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
 int uprobe_http2Client_NewStream(struct pt_regs *ctx) {
     // bpf_printk("enter the uprobe_http2Client_NewStream \n");
+    __u32 tgid = (__u32)(bpf_get_current_pid_tgid() >> 32);
+	struct ebpf_proc_info *info =
+		bpf_map_lookup_elem(&proc_info_map, &tgid);
+	if (!info) {
+		return -1;
+	}
+    // bpf_printk("info->httpclient_nextid_pos is %d\n", info->httpclient_nextid_pos);
+
     struct go_iface go_context = {0};
     get_Go_context(ctx, 2, 0, true);
     void *httpclient_ptr = get_argument(ctx, 1);
     u32 nextid = 0;
-    bpf_probe_read(&nextid, sizeof(nextid), (void *)(httpclient_ptr + (httpclient_nextid_pos)));
+    bpf_probe_read(&nextid, sizeof(nextid), (void *)(httpclient_ptr + (info->httpclient_nextid_pos)));
     // Get the span context from go context. The mapping is created in the Invoke probe,
     // the context here is derived from the Invoke context.
     // struct span_context *current_span_context = get_parent_span_context(&go_context);

+ 93 - 65
ebpftracer/ebpf/utrace/go/net/grpc.server.probe.bpf.c

@@ -57,29 +57,29 @@ struct hpack_header_field {
 
 // Injected in init
 // volatile const u64 stream_method_ptr_pos;
-u64 stream_method_ptr_pos = 80;
+// u64 stream_method_ptr_pos = 80; //需要处理多版本场景
 // volatile const u64 frame_fields_pos;
-u64 frame_fields_pos = 8;
+u64 frame_fields_pos = 8;       //使用固定值8即可,不再处理多版本场景。
 // volatile const u64 frame_stream_id_pod;
-u64 frame_stream_id_pod = 8;
+u64 frame_stream_id_pod = 8;    //使用固定值8即可,不再处理多版本场景。
 // volatile const u64 stream_id_pos;
-u64 stream_id_pos = 0;
+u64 stream_id_pos = 0;          //使用固定值0即可,不再处理多版本场景。
 // volatile const u64 stream_ctx_pos;
-u64 stream_ctx_pos = 32;
+// u64 stream_ctx_pos = 32;        //需要做多版本处理  
 // volatile const u64 server_stream_stream_pos;
-u64 server_stream_stream_pos;
+u64 server_stream_stream_pos = 0;   //1.69之前版本用不到,1.69之后版本用这个
 // volatile const bool is_new_frame_pos;
-bool is_new_frame_pos;
+// bool is_new_frame_pos;          // < 1.60 为false,>= 1.60 为true,直接在 用户态赋值即可,不再做处理。
 // volatile const u64 status_s_pos;
-static u64 status_s_pos = 0;
+static u64 status_s_pos = 0;    //使用固定值即可,不再处理多版本场景。
 // volatile const u64 status_code_pos;
-static u64 status_code_pos = 40;
+static u64 status_code_pos = 40;    //使用固定值即可,不再做多版本处理
 // volatile const u64 http2server_peer_pos;
-u64 http2server_peer_pos;
+// u64 http2server_peer_pos;
 // volatile const u64 peer_local_addr_pos;
-u64 peer_local_addr_pos;
+// u64 peer_local_addr_pos;
 
-volatile const bool server_addr_supported;
+// volatile const bool server_addr_supported;
 
 static __always_inline long
 dummy_extract_span_context_from_headers(void *stream_id, struct span_context *parent_span_context) {
@@ -96,11 +96,17 @@ dummy_extract_span_context_from_headers(void *stream_id, struct span_context *pa
 // Returns 0 on success, otherwise a negative error value in case of failure.
 static __always_inline int
 handleStream(struct pt_regs *ctx, void *stream_ptr, struct go_iface *go_context) {
+    __u32 tgid = (__u32)(bpf_get_current_pid_tgid() >> 32);
+	struct ebpf_proc_info *info =
+		bpf_map_lookup_elem(&proc_info_map, &tgid);
+	if (!info) {
+		return -1;
+	}
     if (go_context == NULL) {
         bpf_printk("grpc:server:handleStream: NULL go_context");
         return -1;
     }
-
+    // bpf_printk("info->stream_method_ptr_pos is %d\n", info->stream_method_ptr_pos);
     if (stream_ptr == NULL) {
         bpf_printk("grpc:server:handleStream: NULL stream_ptr");
         return -1;
@@ -149,7 +155,7 @@ handleStream(struct pt_regs *ctx, void *stream_ptr, struct go_iface *go_context)
     // start_span(&start_span_params);
 
     // Set attributes
-    void *method_ptr = stream_ptr + stream_method_ptr_pos;
+    void *method_ptr = stream_ptr + info->stream_method_ptr_pos;
     bool parsed_method =
         get_go_string_from_user_ptr(method_ptr, grpcReq->method, sizeof(grpcReq->method));
     if (!parsed_method) {
@@ -158,20 +164,20 @@ handleStream(struct pt_regs *ctx, void *stream_ptr, struct go_iface *go_context)
         return -3;
     }
 
-    bpf_printk("grpc:server:handleStream: get the method is %s\n", grpcReq->method);
-
-    if (server_addr_supported) {
-        void *http2server = get_argument(ctx, 3);
-        if (http2server != NULL) {
-            void *local_addr_ptr = 0;
-            void *local_addr_pos = http2server + http2server_peer_pos + peer_local_addr_pos;
-            bpf_probe_read_user(
-                &local_addr_ptr, sizeof(local_addr_ptr), get_go_interface_instance(local_addr_pos));
-            get_tcp_net_addr_from_tcp_addr(ctx, &grpcReq->local_addr, (void *)(local_addr_ptr));
-        } else {
-            bpf_printk("grpc:server:handleStream: failed to get http2server arg");
-        }
-    }
+    // bpf_printk("grpc:server:handleStream: get the method is %s\n", grpcReq->method);
+
+    // if (server_addr_supported) {
+    //     void *http2server = get_argument(ctx, 3);
+    //     if (http2server != NULL) {
+    //         void *local_addr_ptr = 0;
+    //         void *local_addr_pos = http2server + http2server_peer_pos + peer_local_addr_pos;
+    //         bpf_probe_read_user(
+    //             &local_addr_ptr, sizeof(local_addr_ptr), get_go_interface_instance(local_addr_pos));
+    //         get_tcp_net_addr_from_tcp_addr(ctx, &grpcReq->local_addr, (void *)(local_addr_ptr));
+    //     } else {
+    //         bpf_printk("grpc:server:handleStream: failed to get http2server arg");
+    //     }
+    // }
 
     // Write event
     rc = bpf_map_update_elem(&grpc_events, &key, grpcReq, 0);
@@ -237,7 +243,7 @@ handleStream(struct pt_regs *ctx, void *stream_ptr, struct go_iface *go_context)
     e->payload_size = method_len;
     COPY_PAYLOAD(e->payload, method_len, grpcReq->method);
 
-    bpf_printk("grpc:server:handleStream: get the payload size is %d\n", e->payload_size);
+    // bpf_printk("grpc:server:handleStream: get the payload size is %d\n", e->payload_size);
 
     struct apm_trace_info_t trace_info = cw_save_trace_info(id,pid, k.fd);
     
@@ -300,7 +306,7 @@ handleStream(struct pt_regs *ctx, void *stream_ptr, struct go_iface *go_context)
 // This is only compatible with versions < 1.69.0 of the Server.
 SEC("uprobe/server_handleStream")
 int uprobe_server_handleStream(struct pt_regs *ctx) {
-    bpf_printk("enter the uprobe_server_handleStream");
+    // bpf_printk("enter the uprobe_server_handleStream");
     u64 stream_pos = 4;
     void *stream_ptr = get_argument(ctx, stream_pos);
     // bpf_printk("enter uprobe_server_handleStream\n");
@@ -323,7 +329,7 @@ int uprobe_server_handleStream(struct pt_regs *ctx) {
 // UPROBE_RETURN(server_handleStream, struct grpc_request_t, grpc_events) 
 SEC("uprobe/server_handleStream")
 int uprobe_server_handleStream_Returns(struct pt_regs *ctx) {  
-    bpf_printk("enter the uprobe_server_handleStream return");     
+    // bpf_printk("enter the uprobe_server_handleStream return");     
     void *key = (void *)GOROUTINE(ctx);
     __u64 id = bpf_get_current_pid_tgid();
 	__u32 zero = 0;
@@ -434,6 +440,14 @@ int uprobe_server_handleStream_Returns(struct pt_regs *ctx) {
 // This is only compatible with versions > 1.69.0 of the Server.
 SEC("uprobe/server_handleStream2")
 int uprobe_server_handleStream2(struct pt_regs *ctx) {
+
+    __u32 tgid = (__u32)(bpf_get_current_pid_tgid() >> 32);
+	struct ebpf_proc_info *info =
+		bpf_map_lookup_elem(&proc_info_map, &tgid);
+	if (!info) {
+		return -1;
+	}
+    bpf_printk("info->stream_ctx_pos is %d\n", info->stream_ctx_pos);
     u64 server_stream_pos = 4;
     // bpf_printk("enter uprobe_server_handleStream2\n");
     void *server_stream_ptr = get_argument(ctx, server_stream_pos);
@@ -452,7 +466,7 @@ int uprobe_server_handleStream2(struct pt_regs *ctx) {
 
     struct go_iface go_context = {0};
     rc = bpf_probe_read_user(
-        &go_context.type, sizeof(go_context.type), (void *)(stream_ptr + stream_ctx_pos));
+        &go_context.type, sizeof(go_context.type), (void *)(stream_ptr + info->stream_ctx_pos));
     if (rc != 0) {
         bpf_printk("grpc:server:uprobe/server_handleStream2: failed to read context type");
         return -3;
@@ -460,7 +474,7 @@ int uprobe_server_handleStream2(struct pt_regs *ctx) {
 
     rc = bpf_probe_read_user(&go_context.data,
                              sizeof(go_context.data),
-                             get_go_interface_instance(stream_ptr + stream_ctx_pos));
+                             get_go_interface_instance(stream_ptr + info->stream_ctx_pos));
     if (rc != 0) {
         bpf_printk("grpc:server:uprobe/server_handleStream2: failed to read context data");
         return -4;
@@ -477,22 +491,6 @@ int uprobe_server_handleStream2(struct pt_regs *ctx) {
 SEC("uprobe/server_handleStream2")
 int uprobe_server_handleStream2_Returns(struct pt_regs *ctx) {
     u64 server_stream_pos = 4;
-    __u64 id = bpf_get_current_pid_tgid();
-	__u32 zero = 0;
-    __u32 fd = 0;
-	__u32 pid, tid;
-	__u32 http_status = 200;
-
-	pid = id >> 32;
-	tid =  (__u32)id;
-    // bpf_printk("enter uprobe_server_handleStream2_Returns\n");
-
-    struct l7_request_key k = {};
-    k.pid = pid;
-    k.fd = fd;
-    k.is_tls = 0;
-    k.stream_id = -1;
-
     void *server_stream_ptr = get_argument(ctx, server_stream_pos);
     void *key = NULL;
     if (server_stream_ptr == NULL) {
@@ -521,11 +519,28 @@ lookup:
     stop_tracking_span(&event->sc, &event->psc);
     bpf_map_delete_elem(&grpc_events, &key);
 
+    __u64 id = bpf_get_current_pid_tgid();
+	__u32 zero = 0;
+    __u32 fd = 0;
+	__u32 pid, tid;
+	__u32 http_status = 200;
+
+	pid = id >> 32;
+	tid =  (__u32)id;
+    // bpf_printk("enter uprobe_server_handleStream_Returns\n");
 
-    struct apm_trace_info_t * start_trace_info = get_trace_info_by_fd(pid, fd);
-	if (!start_trace_info) {
+    struct l7_request_key k = {};
+    k.pid = pid;
+    k.fd = fd;
+    k.is_tls = 0;
+    k.stream_id = -1;    
+
+    struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
+	struct apm_trace_info_t * start_trace_info = get_apm_trace_info_by_trace_key(trace_key);
+    if (!start_trace_info) {
 		return -1;
 	}
+
     __u64 trace_id = start_trace_info->trace_id;
 	__u32 event_count = cw_get_event_count(trace_id);
     cw_bpf_debug("[uprobeThread/pidpidpidpid][Trace End in l7][HTTP]pid:[%d]--[%lld]", tid, bpf_ktime_get_ns());
@@ -548,14 +563,15 @@ lookup:
 		cw_copy_byte_arrays(cw_psc->span_id, e->span_id_from, APM_SPAN_ID_SIZE);
         cw_copy_byte_arrays(cw_psc->type_from, e->type_from, APM_TYPE_FROM_SIZE);
 	}
-    struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
-    if (!req)
-    {
-	    cw_clear_trace(pid, tid, fd);
-        return 0;
-    }
-	e->start_at = req->ns;
-	cw_bpf_debug("req->ns:%llu",req->ns);
+    // struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
+    // if (!req)
+    // {
+	//     cw_clear_trace(pid, tid, fd);
+    //     return 0;
+    // }
+	// e->start_at = req->ns;
+    e->start_at = event->start_time;
+	// cw_bpf_debug("req->ns:%llu",req->ns);
 	e->end_at = bpf_ktime_get_ns();
     e->duration = e->end_at - e->start_at;
     e->protocol = PROTOCOL_TRACE;
@@ -569,8 +585,10 @@ lookup:
     e->trace_id = trace_id;
     e->payload_size = 0;
     e->event_count = event_count;
-    // COPY_PAYLOAD(e->payload, size, payload);
-    bpf_map_delete_elem(&active_l7_requests, &k);
+    
+    e->payload_size = event->method_size;
+    COPY_PAYLOAD(e->payload, event->method_size, event->method);
+    // bpf_map_delete_elem(&active_l7_requests, &k);
 	// 清除事件计数
 	bpf_map_delete_elem(&trace_event_count_heap, &trace_id);
 	// 清除业务层trace信息
@@ -587,6 +605,10 @@ lookup:
     //     __builtin_memcpy(&e->saddr, &accept_conn->saddr, sizeof(e->saddr));
     //     __builtin_memcpy(&e->daddr, &accept_conn->daddr, sizeof(e->daddr));
     // }
+    //不发送payload
+    bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+
+    // bpf_printk("stop get apm data\n");
     return 0;
 }
 
@@ -595,11 +617,17 @@ lookup:
 // func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*Stream)) error
 SEC("uprobe/http2Server_operateHeader")
 int uprobe_http2Server_operateHeader(struct pt_regs *ctx) {
-    bpf_printk("enter the uprobe_http2Server_operateHeader");
+    // bpf_printk("enter the uprobe_http2Server_operateHeader");
+    __u32 tgid = (__u32)(bpf_get_current_pid_tgid() >> 32);
+	struct ebpf_proc_info *info =
+		bpf_map_lookup_elem(&proc_info_map, &tgid);
+	if (!info) {
+		return -1;
+	}
     s32 find_w3c = 0;
     void *arg4 = get_argument(ctx, 4);
     void *arg2 = get_argument(ctx, 2);
-    void *frame_ptr = is_new_frame_pos ? arg4 : arg2;
+    void *frame_ptr = info->is_new_frame_pos ? arg4 : arg2;
     struct go_slice header_fields = {};
     bpf_probe_read(&header_fields, sizeof(header_fields), (void *)(frame_ptr + frame_fields_pos));
     char key[CW_HEADER_KEY_LENGTH + 1] = "cwtrace";
@@ -644,10 +672,10 @@ int uprobe_http2Server_operateHeader(struct pt_regs *ctx) {
         // if (bpf_memcmp(key, current_key, 6) == 0) {
         if (current_key[0] == 'c' && current_key[1] == 'w' && current_key[2] == 't' && current_key[3] == 'r' && current_key[4] == 'a' && current_key[5] == 'c' && current_key[6] == 'e') {
             find_w3c = 1;
-            bpf_printk("found traceparent header");
+            // bpf_printk("found traceparent header");
             // 执行字符串到span context的转换
             cw_string_to_span_context(hf.value.str, cw_parent_span_context);
-            bpf_printk("11111found traceparent header value is %s", hf.value.str);
+            // bpf_printk("11111found traceparent header value is %s", hf.value.str);
             break; // 找到后立即退出
         }
     }

+ 8 - 3
ebpftracer/tls.go

@@ -282,6 +282,11 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 			info.InstanceId = instanceID
 			info.AppId = appID
 			info.CodeType = codeType
+			if grpcMajorVersion >= 1 && grpcMinorVersion >= 60 {
+				info.IsNewFramePos = 1
+			} else {
+				info.IsNewFramePos = 0
+			}
 			// go
 			info.BucketsPtrPos = bucketsOff
 			fields := map[*uint64]tracer.ID{
@@ -293,6 +298,9 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 				&info.ProtoPos:       tracer.NewID("std", "net/http", "Request", "Proto"),
 				&info.CtxPtrPos:      tracer.NewID("std", "net/http", "Request", "ctx"),
 				&info.HeadersPtrPos:  tracer.NewID("std", "net/http", "Request", "Header"),
+				&info.HttpClientNextidPos: tracer.NewID("google.golang.org/grpc","google.golang.org/grpc/internal/transport","http2Client","nextID"),
+				&info.StreamMethodPtrPos: tracer.NewID("google.golang.org/grpc","google.golang.org/grpc/internal/transport","Stream","method"),
+				&info.StreamCtxPos: tracer.NewID("google.golang.org/grpc","google.golang.org/grpc/internal/transport","Stream","ctx"),
 			}
 
 			for field, id := range fields {
@@ -433,11 +441,9 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 			links = append(links, l)
 			sStart := s.Value - textSection.Addr
 			sEnd := sStart + s.Size
-			klog.Infoln("uprobe_ClientConn_Invoke ok----111111")
 			if sEnd > textSectionLen {
 				continue
 			}
-			klog.Infoln("uprobe_ClientConn_Invoke ok----2222")
 			sBytes := textSectionData[sStart:sEnd]
 			returnOffsets := getReturnOffsets(ef.Machine, sBytes)
 			if len(returnOffsets) == 0 {
@@ -451,7 +457,6 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 					klog.WithError(err).Errorln(fmt.Errorf("failed to attach uprobe_ClientConn_Invoke_Returns uprobe"))
 					return nil, err
 				}
-				klog.Infoln("uprobe_ClientConn_Invoke_Returns ok----")
 				links = append(links, l)
 			}
 		case goGrpcClientLoopyHeaderHandler:

+ 10 - 0
utils/modelse/bpf_struct.go

@@ -176,6 +176,10 @@ type EbpfTraceConf struct {
 		__u64 ctx_ptr_pos;
 		__u64 headers_ptr_pos;
 		__u64 buckets_ptr_pos;
+		__u64 httpclient_nextid_pos;
+		__u64 stream_method_ptr_pos;
+		__u64 stream_ctx_pos;
+		__u8 is_new_frame_pos;
 	} __attribute__((packed));
 */
 type EbpfProcInfo struct {
@@ -199,6 +203,12 @@ type EbpfProcInfo struct {
 	CtxPtrPos      uint64
 	HeadersPtrPos  uint64
 	BucketsPtrPos  uint64
+
+	// gRPC
+	HttpClientNextidPos uint64
+	StreamMethodPtrPos  uint64
+	StreamCtxPos        uint64
+	IsNewFramePos       uint8
 }
 
 type allowPortBitmap struct {