Browse Source

Feature #TASK_QT-18250 端到端功能完成

rock 7 tháng trước cách đây
mục cha
commit
c40d860655

+ 1 - 1
containers/container_apm.go

@@ -131,7 +131,7 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 
 					apmTrace, err := c.getOrInitTrace(r.TraceId)
 					if err == nil {
-						apmTrace.GrpcServerTraceQueryEvent(r)
+						apmTrace.GrpcServerTraceQueryEvent(r,c.GetAppInfo())
 						c.SendEvent(apmTrace, r.TraceId)
 					}
 				}

+ 1 - 0
ebpftracer/ebpf/l7/l7.c

@@ -96,6 +96,7 @@ struct l7_event {
 	unsigned char app_id_from[APM_APP_ID_SIZE];
 	unsigned char span_id_from[APM_SPAN_ID_SIZE];
     unsigned char type_from[APM_TYPE_FROM_SIZE];
+    unsigned char rpc_target[64];
 
 //    __u32 test_id;
     char payload[MAX_PAYLOAD_SIZE];

+ 21 - 7
ebpftracer/ebpf/utrace/go/net/grpc.client.probe.bpf.c

@@ -145,8 +145,16 @@ int uprobe_ClientConn_Invoke(struct pt_regs *ctx) {
         bpf_printk("target write failed, aborting ebpf probe");
         return 0;
     }
+
+
     grpcReq->target_size = sizeof(grpcReq->target);
 
+    struct apm_span_context *cw_psc = cw_get_parent_tracking_span();
+	if(cw_psc){
+        set_assumed_app_id_arrays(grpcReq->target, cw_psc->assumed_app_id, APM_ASSUMED_APP_ID_STRING_SIZE);
+        cw_save_parent_tracking_span(cw_psc);
+	}
+
     // bpf_printk("grpcReq->target is %s\n", grpcReq->target);
 
     // start_span_params_t start_span_params = {
@@ -264,6 +272,15 @@ done:
         e->trace_id = get_apm_trace_id(pid,pid_tgid);
         bpf_printk("e->trace_id is %llu\n", e->trace_id);
     }
+    // 使用固定长度读取 target 信息,避免验证器复杂性
+    if (grpc_span->target_size > 0) {
+        // 使用固定的小长度来避免验证器问题
+        u32 fixed_size = 32; // 固定32字节,足够大多数target
+        if (grpc_span->target_size < fixed_size) {
+            fixed_size = (u32)grpc_span->target_size;
+        }
+        bpf_probe_read(&e->rpc_target, fixed_size, grpc_span->target);
+    }
     COPY_PAYLOAD(e->payload, grpc_span->method_size, grpc_span->method);
 
     // bpf_printk("e->payload is %s\n", e->payload);
@@ -371,10 +388,6 @@ cw_append_item_to_slice(void *new_item, u32 item_size, void *slice_user_ptr) {
 
 SEC("uprobe/loopyWriter_headerHandler")
 int uprobe_LoopyWriter_HeaderHandler(struct pt_regs *ctx) {
-
-    void *key = (void *)GOROUTINE(ctx);
-    // bpf_printk("enter the uprobe_LoopyWriter_HeaderHandler key is 0x%llx\n", (u64)key);
-
     void *headerFrame_ptr = get_argument(ctx, 2);
     // bpf_printk("enter the get header handler storage\n");
 
@@ -426,6 +439,7 @@ int uprobe_LoopyWriter_HeaderHandler(struct pt_regs *ctx) {
 		bpf_probe_read(&grpcClientReq->apm_psc, sizeof(grpcClientReq->apm_psc), cw_psc);
 		copy_byte_arrays(grpcClientReq->apm_psc.trace_id, grpcClientReq->apm_sc.trace_id, APM_TRACE_ID_SIZE);
 		generate_random_bytes(grpcClientReq->apm_sc.span_id, APM_SPAN_ID_SIZE);
+        copy_byte_arrays(grpcClientReq->apm_psc.assumed_app_id, grpcClientReq->apm_sc.assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
 	}
 
     u32 k0 = 0;
@@ -436,10 +450,10 @@ int uprobe_LoopyWriter_HeaderHandler(struct pt_regs *ctx) {
 
     copy_byte_arrays(proc_info->instance_id, grpcClientReq->apm_sc.instance_id, APM_APP_ID_SIZE);
 	copy_byte_arrays(proc_info->app_id, grpcClientReq->apm_sc.app_id, APM_APP_ID_SIZE);
+    // bpf_printk("grpcClientReq->apm_sc.app_id is %s\n", grpcClientReq->apm_sc.app_id);
 
-    // set assumed_app_id
-	// set_assumed_app_id_arrays(httpReq->host, httpReq->apm_sc.assumed_app_id, APM_ASSUMED_APP_ID_STRING_SIZE);
-	cw_save_current_tracking_span(&grpcClientReq->apm_sc);
+    bpf_map_update_elem(&apm_current_span_context_map, sc_ptr, &grpcClientReq->apm_sc, BPF_ANY);
+	// cw_save_current_tracking_span(&grpcClientReq->apm_sc);
 
     // Strategy: Write key and value separately with manual offset
     // First write the key

+ 1 - 0
ebpftracer/l7/l7.go

@@ -206,6 +206,7 @@ type RequestData struct {
 	DAddr             netaddr.IPPort
 	ComponentSAddr    netaddr.IPPort
 	ComponentDAddr    netaddr.IPPort
+	RPCTarget         string
 	ParentSpanContext struct {
 		TraceIdFrom    string
 		CalledId       string

+ 2 - 0
ebpftracer/tracer.go

@@ -552,6 +552,7 @@ type l7Event struct {
 	AppIdFrom           HashByte
 	SpanIdFrom          HashByte
 	TypeFrom          	[1]byte
+	RPCTarget           [64]byte
 }
 
 type SocketDataBufferddd struct {
@@ -781,6 +782,7 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				EndAt:          v.EndtAt,
 				ComponentSAddr: ipPort(v.ComponentSAddr, v.ComponentSport),
 				ComponentDAddr: ipPort(v.ComponentDAddr, v.ComponentDport),
+				RPCTarget:      string(v.RPCTarget[:]),
 			}
 			if req.Protocol == l7.ProtocolHTTP {
 				klog.Debugf("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())

+ 43 - 7
tracing/apm_tracing.go

@@ -557,7 +557,7 @@ func (t *Trace) RedisTraceQueryEvent(cmd, args string, r *l7.RequestData, destin
 	t.createTraceEvent(l7.ProtocolRedis.String(), ebpftracer.EventTypeL7Request.Int(), l7.ProtocolRedis.Int(), attr...)
 }
 
-func (t *Trace) GrpcServerTraceQueryEvent(r *l7.RequestData) {
+func (t *Trace) GrpcServerTraceQueryEvent(r *l7.RequestData, appInfo AppInfo) {
 	if t == nil {
 		return
 	}
@@ -578,8 +578,8 @@ func (t *Trace) GrpcServerTraceQueryEvent(r *l7.RequestData) {
 		attribute.String("rpc.schema", "grpc"),
 		attribute.Int64("rpc.assumed_app_id", assumedAppID),
 		attribute.String("rpc.span_id", r.SpanId),
-		attribute.Int("rpc.port", int(1023)),
-		attribute.String("rpc.ip", "127.0.0.1"),
+		attribute.Int("rpc.port", appInfo.Sport),
+		attribute.String("rpc.ip", appInfo.Sn),
 		attribute.String("rpc.oper_type", "PROVIDER"),
 		attribute.String("rpc.method", "test_method"),
 	)
@@ -602,16 +602,52 @@ func (t *Trace) GrpcClientTraceQueryEvent(r *l7.RequestData) {
 		assumedAppID = 0
 	}
 	
+	// 解析 RPCTarget 字符串,格式为 "ip:port"
+	klog.Infof("r.RPCTarget is %s", r.RPCTarget)
+	var rpcIP, rpcPort string
+	if r.RPCTarget != "" {
+		// 清理空字节,只保留有效字符
+		cleanTarget := strings.TrimRight(r.RPCTarget, "\x00")
+		cleanTarget = strings.TrimSpace(cleanTarget)
+		klog.Infof("cleanTarget is %s", cleanTarget)
+		
+		if colonIndex := strings.LastIndex(cleanTarget, ":"); colonIndex != -1 {
+			rpcIP = cleanTarget[:colonIndex]
+			rpcPort = cleanTarget[colonIndex+1:]
+			klog.Infof("rpcIP is %s, rpcPort is %s", rpcIP, rpcPort)
+		} else {
+			// 如果没有找到冒号,可能是只有IP或只有端口
+			rpcIP = cleanTarget
+		}
+	}
+	
+	// 设置默认值
+	if rpcIP == "" {
+		rpcIP = "127.0.0.1"
+	}
+	if rpcPort == "" {
+		rpcPort = "80"
+	}
+	
+	// 转换端口为整数
+	portInt, err := strconv.Atoi(rpcPort)
+	if err != nil {
+		klog.Infof("r.RPCTarget convert port err is %s", err)
+		portInt = 80 // 默认端口
+	} else {
+		klog.Infof("r.RPCTarget convert port success, portInt is %d", portInt)
+	}
+	
 	var attr []attribute.KeyValue
 	attr = append(attr,
 		semconv.RPCSystemGRPC,
-		attribute.String("rpc.uri", "selectone"),
+		attribute.String("rpc.uri", string(r.Payload)),
 		attribute.String("rpc.schema", "grpc"),
 		attribute.Int64("rpc.assumed_app_id", assumedAppID),
 		attribute.String("rpc.span_id", r.SpanId),
-		attribute.Int("rpc.port", int(1023)),
-		attribute.String("rpc.ip", "127.0.0.1"),
-		attribute.String("rpc.oper_type", "consumer"),
+		attribute.Int("rpc.port", portInt),
+		attribute.String("rpc.ip", rpcIP),
+		attribute.String("rpc.oper_type", "CONSUMER"),
 		attribute.String("rpc.method", "test_method"),
 	)