Browse Source

Feature #TASK_QT-18250 grpc-client监控代码编写完成,但是在塞header头时出现问题。

rock 7 months ago
parent
commit
a9de0830c2
2 changed files with 116 additions and 13 deletions
  1. 65 12
      ebpftracer/ebpf/utrace/go/net/grpc.client.probe.bpf.c
  2. 51 1
      ebpftracer/tls.go

+ 65 - 12
ebpftracer/ebpf/utrace/go/net/grpc.client.probe.bpf.c

@@ -19,6 +19,9 @@ struct grpc_client_request_t {
     char method[MAX_SIZE];
     char target[MAX_SIZE];
     u32 status_code;
+
+    struct apm_span_context apm_sc;
+    struct apm_span_context apm_psc;
 };
 
 // struct hpack_header_field {
@@ -53,7 +56,8 @@ struct {
 volatile const u64 clientconn_target_ptr_pos;
 volatile const u64 httpclient_nextid_pos;
 volatile const u64 headerFrame_streamid_pos;
-volatile const u64 headerFrame_hf_pos;
+// volatile const u64 headerFrame_hf_pos;
+u64 headerFrame_hf_pos = 8;
 volatile const u64 error_status_pos;
 volatile const u64 status_s_pos;
 volatile const u64 status_message_pos;
@@ -65,6 +69,7 @@ volatile const bool write_status_supported;
 // func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error
 SEC("uprobe/ClientConn_Invoke")
 int uprobe_ClientConn_Invoke(struct pt_regs *ctx) {
+    // bpf_printk("enter the uprobe_ClientConn_Invoke \n");
     // positions
     u64 clientconn_pos = 1;
     u64 method_ptr_pos = 4;
@@ -134,6 +139,7 @@ int uprobe_ClientConn_Invoke(struct pt_regs *ctx) {
 // func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error
 SEC("uprobe/ClientConn_Invoke")
 int uprobe_ClientConn_Invoke_Returns(struct pt_regs *ctx) {
+    // bpf_printk("enter the uprobe_ClientConn_Invoke_Returns \n");
     void *key = (void *)GOROUTINE(ctx);
     struct grpc_client_request_t *grpc_span = bpf_map_lookup_elem(&grpc_client_events, &key);
     if (grpc_span == NULL) {
@@ -189,39 +195,85 @@ done:
 // func (l *loopyWriter) headerHandler(h *headerFrame) error
 SEC("uprobe/loopyWriter_headerHandler")
 int uprobe_LoopyWriter_HeaderHandler(struct pt_regs *ctx) {
+    bpf_printk("enter the uprobe_LoopyWriter_HeaderHandler \n");
     void *headerFrame_ptr = get_argument(ctx, 2);
-    u32 stream_id = 0;
-    bpf_probe_read(
-        &stream_id, sizeof(stream_id), (void *)(headerFrame_ptr + (headerFrame_streamid_pos)));
-    void *sc_ptr = bpf_map_lookup_elem(&streamid_to_span_contexts, &stream_id);
-    if (sc_ptr == NULL) {
+    // u32 stream_id = 0;
+    // bpf_probe_read(
+    //     &stream_id, sizeof(stream_id), (void *)(headerFrame_ptr + (headerFrame_streamid_pos)));
+    // void *sc_ptr = bpf_map_lookup_elem(&streamid_to_span_contexts, &stream_id);
+    // if (sc_ptr == NULL) {
+    //     return 0;
+    // }
+    bpf_printk("enter the uprobe_LoopyWriter_HeaderHandler1111 \n");
+    __u64 pid_tgid = bpf_get_current_pid_tgid();
+	__u32 tgid = pid_tgid >> 32;
+	struct ebpf_proc_info *proc_info =
+			bpf_map_lookup_elem(&proc_info_map, &tgid);
+    if(!proc_info)
+    {
         return 0;
     }
 
-    struct span_context current_span_context = {};
-    bpf_probe_read(&current_span_context, sizeof(current_span_context), sc_ptr);
+    bpf_printk("enter the uprobe_LoopyWriter_HeaderHandler2222222\n");
+    // struct span_context current_span_context = {};
+    // bpf_probe_read(&current_span_context, sizeof(current_span_context), sc_ptr);
 
-    char tp_key[11] = "traceparent";
+    char tp_key[CW_HEADER_KEY_LENGTH] = CW_HEADER_KEY_VAL;
     struct go_string_ot key_str = write_user_go_string(tp_key, sizeof(tp_key));
     if (key_str.len == 0) {
         bpf_printk("key write failed, aborting ebpf probe");
         goto done;
     }
 
+
+    bpf_printk("enter the uprobe_LoopyWriter_HeaderHandler333333\n");
+    u32 map_id = 0;
+    struct grpc_client_request_t *grpcClientReq = bpf_map_lookup_elem(&grpc_client_storage_map, &map_id);
+    if (grpcClientReq == NULL)
+    {
+        cw_bpf_debug("uprobe_LoopyWriter_HeaderHandler: grpcClientReq is NULL");
+        return 0;
+    }
+
+    __builtin_memset(grpcClientReq, 0, sizeof(struct grpc_client_request_t));
+    grpcClientReq->start_time = bpf_ktime_get_ns();
+
+    bpf_printk("enter the uprobe_LoopyWriter_HeaderHandler444444\n");
+	struct apm_span_context *cw_psc = cw_get_parent_tracking_span();
+	if(cw_psc){
+		bpf_probe_read(&grpcClientReq->apm_psc, sizeof(grpcClientReq->apm_psc), cw_psc);
+		copy_byte_arrays(grpcClientReq->apm_psc.trace_id, grpcClientReq->apm_sc.trace_id, APM_TRACE_ID_SIZE);
+		generate_random_bytes(grpcClientReq->apm_sc.span_id, APM_SPAN_ID_SIZE);
+	}
+
+    u32 k0 = 0;
+	struct trace_conf_t *trace_conf = trace_conf_map__lookup(&k0);
+	if (trace_conf) {
+		copy_byte_arrays(trace_conf->host_id, grpcClientReq->apm_sc.host_id, APM_HOST_ID_SIZE);
+	}
+
+    copy_byte_arrays(proc_info->instance_id, grpcClientReq->apm_sc.instance_id, APM_APP_ID_SIZE);
+	copy_byte_arrays(proc_info->app_id, grpcClientReq->apm_sc.app_id, APM_APP_ID_SIZE);
+
+    // set assumed_app_id
+	// set_assumed_app_id_arrays(httpReq->host, httpReq->apm_sc.assumed_app_id, APM_ASSUMED_APP_ID_STRING_SIZE);
+	cw_save_current_tracking_span(&grpcClientReq->apm_sc);
+
     // Write headers
-    char val[SPAN_CONTEXT_STRING_SIZE];
-    span_context_to_w3c_string(&current_span_context, val);
+    char val[CW_HEADER_VAL_LENGTH];
+	span_context_to_cw_string(&grpcClientReq->apm_sc, val);
     struct go_string_ot val_str = write_user_go_string(val, sizeof(val));
     if (val_str.len == 0) {
         bpf_printk("val write failed, aborting ebpf probe");
         goto done;
     }
+    bpf_printk("enter the uprobe_LoopyWriter_HeaderHandler6666\n");
     struct hpack_header_field hf = {};
     hf.name = key_str;
     hf.value = val_str;
     append_item_to_slice(&hf, sizeof(hf), (void *)(headerFrame_ptr + (headerFrame_hf_pos)));
 done:
-    bpf_map_delete_elem(&streamid_to_span_contexts, &stream_id);
+    // bpf_map_delete_elem(&streamid_to_span_contexts, &stream_id);
 
     return 0;
 }
@@ -229,6 +281,7 @@ done:
 SEC("uprobe/http2Client_NewStream")
 // func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
 int uprobe_http2Client_NewStream(struct pt_regs *ctx) {
+    // bpf_printk("enter the uprobe_http2Client_NewStream \n");
     // struct go_iface go_context = {0};
     // get_Go_context(ctx, 2, 0, true);
     // void *httpclient_ptr = get_argument(ctx, 1);

+ 51 - 1
ebpftracer/tls.go

@@ -34,6 +34,9 @@ const (
 	goGrpcServerHandleStream = "google.golang.org/grpc.(*Server).handleStream"
 	goGrpcHttp2OperateHeader = "google.golang.org/grpc/internal/transport.(*http2Server).operateHeaders"
 	goGrpcServerWritestatus = "google.golang.org/grpc/internal/transport.(*http2Server).WriteStatus"
+	goGrpcClientConnInvoke = "google.golang.org/grpc.(*ClientConn).Invoke"
+	goGrpcClientLoopyHeaderHandler = "google.golang.org/grpc/internal/transport.(*loopyWriter).headerHandler"
+	goGrpcHttp2ClientNewStream = "google.golang.org/grpc/internal/transport.(*http2Client).NewStream"
 )
 
 var (
@@ -297,7 +300,7 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 		}
 		switch s.Name {
 		case goTlsWriteSymbol, goTlsReadSymbol:
-		case goExecute, goNewproc1, goRunqget, goServeHTTP, goTransport, goGrpcHttp2OperateHeader,goGrpcServerHandleStream,goGrpcServerWritestatus:
+		case goExecute, goNewproc1, goRunqget, goServeHTTP, goTransport, goGrpcHttp2OperateHeader,goGrpcServerHandleStream,goGrpcServerWritestatus,goGrpcClientConnInvoke,goGrpcClientLoopyHeaderHandler,goGrpcHttp2ClientNewStream:
 		default:
 			continue
 		}
@@ -379,6 +382,53 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 			//	}
 			//	links = append(links, l)
 			//}
+		case goGrpcClientConnInvoke:
+			l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_ClientConn_Invoke"], &link.UprobeOptions{Address: address})
+			if err != nil {
+				klog.WithError(err).Errorln("failed to attach uprobe_ClientConn_Invoke uprobe")
+				continue
+			}
+			klog.Infoln("uprobe_ClientConn_Invoke ok")
+			links = append(links, l)
+			sStart := s.Value - textSection.Addr
+			sEnd := sStart + s.Size
+			klog.Infoln("uprobe_ClientConn_Invoke ok----111111")
+			if sEnd > textSectionLen {
+				continue
+			}
+			klog.Infoln("uprobe_ClientConn_Invoke ok----2222")
+			sBytes := textSectionData[sStart:sEnd]
+			returnOffsets := getReturnOffsets(ef.Machine, sBytes)
+			if len(returnOffsets) == 0 {
+				err = fmt.Errorf("failed to attach uprobe_ClientConn_Invoke no return offsets found")
+				klog.Errorln(err)
+				return nil, err
+			}
+			for _, offset := range returnOffsets {
+				l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_ClientConn_Invoke_Returns"], &link.UprobeOptions{Address: address, Offset: uint64(offset)})
+				if err != nil {
+					klog.WithError(err).Errorln(fmt.Errorf("failed to attach uprobe_ClientConn_Invoke_Returns uprobe"))
+					return nil, err
+				}
+				klog.Infoln("uprobe_ClientConn_Invoke_Returns ok----")
+				links = append(links, l)
+			}
+		case goGrpcClientLoopyHeaderHandler:
+			l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_LoopyWriter_HeaderHandler"], &link.UprobeOptions{Address: address})
+			if err != nil {
+				klog.WithError(err).Errorln("failed to attach uprobe_LoopyWriter_HeaderHandler uprobe")
+				continue
+			}
+			klog.Infoln("uprobe_LoopyWriter_HeaderHandler ok")
+			links = append(links, l)
+		case goGrpcHttp2ClientNewStream:
+			l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_http2Client_NewStream"], &link.UprobeOptions{Address: address})
+			if err != nil {
+				klog.WithError(err).Errorln("failed to attach uprobe_http2Client_NewStream uprobe")
+				continue
+			}
+			klog.Infoln("uprobe_http2Client_NewStream ok")
+			links = append(links, l)
 		case goGrpcHttp2OperateHeader:
 			l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_http2Server_operateHeader"], &link.UprobeOptions{Address: address})
 			if err != nil {