Browse Source

Feature #TASK_QT-18250 grpc-clien完成数据拼装,部分数据待替换为真实数据。

rock 7 months ago
parent
commit
4759c1033a

+ 9 - 0
containers/container_apm.go

@@ -280,6 +280,15 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 			}
 		}
 		//trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
+	case l7.ProtocolGrpc:
+		stats.observe(r.Status.String(), "", r.Duration)
+		if c.l7Attach && c.valuableTrace(r.TraceId) {
+			apmTrace, err := c.getOrInitTrace(r.TraceId)
+			if err == nil {
+				apmTrace.GrpcClientTraceQueryEvent(r)
+				c.SendEvent(apmTrace, r.TraceId)
+			}
+		}
 	case l7.ProtocolMongo:
 		stats.observe(r.Status.String(), "", r.Duration)
 		if c.l7Attach && c.valuableTrace(r.TraceId) {

+ 13 - 13
ebpftracer/ebpf/l7/l7.c

@@ -670,19 +670,19 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         if (prev_req && prev_req->protocol == PROTOCOL_KAFKA) {
             req->ns = prev_req->ns;
         }
-    } else if (looks_like_http2_frame(payload, size, METHOD_HTTP2_CLIENT_FRAMES)) {
-        struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
-        if (!e) {
-            return 0;
-        }
-        e->protocol = PROTOCOL_HTTP2;
-        e->method = METHOD_HTTP2_CLIENT_FRAMES;
-        e->duration = bpf_ktime_get_ns();
-        e->payload_size = size;
-        e->trace_id = get_apm_trace_id(pid, tid);
-        COPY_PAYLOAD(e->payload, size, payload);
-        send_event(ctx, e, cid, conn);
-        return 0;
+    // } else if (looks_like_http2_frame(payload, size, METHOD_HTTP2_CLIENT_FRAMES)) {
+    //     struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+    //     if (!e) {
+    //         return 0;
+    //     }
+    //     e->protocol = PROTOCOL_HTTP2;
+    //     e->method = METHOD_HTTP2_CLIENT_FRAMES;
+    //     e->duration = bpf_ktime_get_ns();
+    //     e->payload_size = size;
+    //     e->trace_id = get_apm_trace_id(pid, tid);
+    //     COPY_PAYLOAD(e->payload, size, payload);
+    //     send_event(ctx, e, cid, conn);
+    //     return 0;
     } else if (is_dubbo2_request(payload, size)) {
         req->protocol = PROTOCOL_DUBBO2;
     } else if (is_dns_request(payload, size, &k.stream_id)) {

+ 93 - 16
ebpftracer/ebpf/utrace/go/net/grpc.client.probe.bpf.c

@@ -13,12 +13,16 @@
 #define MAX_CONCURRENT 50
 #define MAX_ERROR_LEN 128
 
+#define PROTOCOL_GRPC 15
+
 struct grpc_client_request_t {
     BASE_SPAN_PROPERTIES
     char err_msg[MAX_ERROR_LEN];
     char method[MAX_SIZE];
     char target[MAX_SIZE];
     u32 status_code;
+    u64 method_size;
+    u64 target_size;
 
     struct apm_span_context apm_sc;
     struct apm_span_context apm_psc;
@@ -130,6 +134,7 @@ int uprobe_ClientConn_Invoke(struct pt_regs *ctx) {
     u64 method_size = sizeof(grpcReq->method);
     method_size = method_size < method_len ? method_size : method_len;
     bpf_probe_read(&grpcReq->method, method_size, method_ptr);
+    grpcReq->method_size = method_size;
 
     // Read ClientConn.Target
     void *clientconn_ptr = get_argument(ctx, clientconn_pos);
@@ -139,6 +144,9 @@ int uprobe_ClientConn_Invoke(struct pt_regs *ctx) {
         bpf_printk("target write failed, aborting ebpf probe");
         return 0;
     }
+    grpcReq->target_size = sizeof(grpcReq->target);
+
+    bpf_printk("grpcReq->target is %s\n", grpcReq->target);
 
     // start_span_params_t start_span_params = {
     //     .ctx = ctx,
@@ -153,6 +161,7 @@ int uprobe_ClientConn_Invoke(struct pt_regs *ctx) {
     // Write event
     bpf_map_update_elem(&grpc_client_events, &key, grpcReq, 0);
     start_tracking_span(context_ptr_val, &grpcReq->sc);
+    bpf_printk("enter the uprobe_ClientConn_Invoke start_tracking_span\n");
     return 0;
 }
 
@@ -206,11 +215,79 @@ int uprobe_ClientConn_Invoke_Returns(struct pt_regs *ctx) {
         (void *)(s_ptr + status_message_pos), grpc_span->err_msg, sizeof(grpc_span->err_msg));
 
 done:
-    bpf_printk("switch to the done position\n");
+    // bpf_printk("switch to the done position\n");
     grpc_span->end_time = bpf_ktime_get_ns();
     // output_span_event(ctx, grpc_span, sizeof(*grpc_span), &grpc_span->sc);
     stop_tracking_span(&grpc_span->sc, &grpc_span->psc);
     bpf_map_delete_elem(&grpc_client_events, &key);
+
+    __u64 pid_tgid = bpf_get_current_pid_tgid();
+    __u64 pid = pid_tgid >> 32;
+
+	struct ebpf_proc_info *info = bpf_map_lookup_elem(&proc_info_map, &pid);
+	if (!info) {
+		return 0;
+	}
+
+	cw_bpf_debug("[Go] [uprobe/setNodeEnter]: proc_info_map::%ld, %d, %d\n", info->code_type);
+
+
+    struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
+    struct apm_trace_info_t * trace_info = get_apm_trace_info_by_trace_key(trace_key);
+
+    if (trace_info == NULL) {
+        bpf_printk("enter the trace_info is NULL\n");
+        trace_info = get_apm_trace_info_v3(trace_key,pid_tgid, pid, pid_tgid);
+    }
+    
+    u32 zero = 0;
+    struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+    if (!e) {
+        return 0;
+    }
+
+    // e->fd = 0;
+    // e->pid = pid;
+    e->protocol = PROTOCOL_GRPC;
+    e->trace_end = 0;
+    e->trace_start = 0;
+    // e->status = STATUS_UNKNOWN;
+    // e->method = METHOD_GRPC;
+    e->statement_id = 0;
+    e->payload_size = grpc_span->method_size;
+    if (trace_info) {
+        bpf_printk("trace_info->trace_id is %llu\n", trace_info->trace_id);
+        e->trace_id = trace_info->trace_id;
+    }
+    if(e->trace_id == 0){
+        e->trace_id = get_apm_trace_id(pid,pid_tgid);
+        bpf_printk("e->trace_id is %llu\n", e->trace_id);
+    }
+    COPY_PAYLOAD(e->payload, grpc_span->method_size, grpc_span->method);
+
+    // bpf_printk("e->payload is %s\n", e->payload);
+    // bpf_printk("grpc_span->target is %s\n", grpc_span->target);
+    // COPY_PAYLOAD(e->payload + grpc_span->method_size, grpc_span->target_size, grpc_span->target);
+    // bpf_printk("e->payload is %s\n", e->payload);
+    // e->payload_size += grpc_span->target_size;
+    
+    struct  apm_span_context * sc = cw_get_current_tracking_span(trace_info);
+    if (sc) {
+        cw_copy_byte_arrays(sc->assumed_app_id, e->assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
+        cw_copy_byte_arrays(sc->span_id, e->span_id, APM_SPAN_ID_SIZE);
+    }
+    
+    e->end_at = bpf_ktime_get_ns();
+    e->start_at = grpc_span->start_time;
+    e->duration = e->end_at - e->start_at;
+    bpf_printk("send_event trace_id is444 %llu\n", e->trace_id);
+    long error = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+	if (error ==0){
+	    cw_add_event_count(e->trace_id);
+	}
+
+
+    bpf_printk("enter the uprobe_ClientConn_Invoke_Returns done\n");
     return 0;
 }
 
@@ -225,15 +302,15 @@ cw_append_item_to_slice(void *new_item, u32 item_size, void *slice_user_ptr) {
         return;
     }
 
-    bpf_printk("cw_append_item_to_slice len is %d\n", slice.len);
-    bpf_printk("cw_append_item_to_slice cap is %d\n", slice.cap);
-    bpf_printk("cw_append_item_to_slice array is %p\n", slice.array);
+    // bpf_printk("cw_append_item_to_slice len is %d\n", slice.len);
+    // bpf_printk("cw_append_item_to_slice cap is %d\n", slice.cap);
+    // bpf_printk("cw_append_item_to_slice array is %p\n", slice.array);
 
     u64 slice_len = slice.len;
     u64 slice_cap = slice.cap;
     if (slice_len < slice_cap && slice.array != NULL) {
         // Room available on current array, append to the underlying array
-        bpf_printk("enter the cw_append_item_to_slice11111\n");
+        // bpf_printk("enter the cw_append_item_to_slice11111\n");
         res = bpf_probe_write_user(slice.array + (item_size * slice_len), new_item, item_size);
     } else {
         // No room on current array - try to copy new one of size item_size * (len + 1)
@@ -259,9 +336,9 @@ cw_append_item_to_slice(void *new_item, u32 item_size, void *slice_user_ptr) {
         // Append to buffer
         if (slice.array != NULL) {
             bpf_probe_read_user(new_slice_array, alloc_size, slice.array);
-            bpf_printk("append_item_to_slice: copying %d bytes to new array from address 0x%llx",
-                       alloc_size,
-                       slice.array);
+            // bpf_printk("append_item_to_slice: copying %d bytes to new array from address 0x%llx",
+                    //    alloc_size,
+                    //    slice.array);
         }
         copy_byte_arrays(new_item, new_slice_array + alloc_size, item_size);
 
@@ -278,12 +355,12 @@ cw_append_item_to_slice(void *new_item, u32 item_size, void *slice_user_ptr) {
         slice.array = new_array;
         slice.cap++;
 
-        bpf_printk("enter the cw_append_item_to_slice222222\n");
+        // bpf_printk("enter the cw_append_item_to_slice222222\n");
     }
 
     // Update len
     slice.len++;
-    bpf_printk("after cw_append_item_to_slice len is %d\n", slice.len);
+    // bpf_printk("after cw_append_item_to_slice len is %d\n", slice.len);
     long success = bpf_probe_write_user(slice_user_ptr, &slice, sizeof(slice));
     if (success != 0) {
         bpf_printk("append_item_to_slice: failed to update slice in userspace");
@@ -294,7 +371,7 @@ cw_append_item_to_slice(void *new_item, u32 item_size, void *slice_user_ptr) {
 SEC("uprobe/loopyWriter_headerHandler")
 int uprobe_LoopyWriter_HeaderHandler(struct pt_regs *ctx) {
     void *headerFrame_ptr = get_argument(ctx, 2);
-    bpf_printk("enter the get header handler storage\n");
+    // bpf_printk("enter the get header handler storage\n");
 
     __u64 pid_tgid = bpf_get_current_pid_tgid();
 	__u32 tgid = pid_tgid >> 32;
@@ -329,7 +406,7 @@ int uprobe_LoopyWriter_HeaderHandler(struct pt_regs *ctx) {
     __builtin_memset(grpcClientReq, 0, sizeof(struct grpc_client_request_t));
     grpcClientReq->start_time = bpf_ktime_get_ns();
 
-    bpf_printk("enter the uprobe_LoopyWriter_HeaderHandler444444\n");
+    // bpf_printk("enter the uprobe_LoopyWriter_HeaderHandler444444\n");
 	struct apm_span_context *cw_psc = cw_get_parent_tracking_span();
 	if(cw_psc){
 		bpf_probe_read(&grpcClientReq->apm_psc, sizeof(grpcClientReq->apm_psc), cw_psc);
@@ -358,7 +435,7 @@ int uprobe_LoopyWriter_HeaderHandler(struct pt_regs *ctx) {
         bpf_printk("Key data write failed\n");
         return 0;
     }
-    bpf_printk("key_data_addr=%p\n", key_data_addr);
+    // bpf_printk("key_data_addr=%p\n", key_data_addr);
     
     // Manually advance the pointer for value (workaround for alloc_map bug)
     // Allocate enough space: align to 8 bytes
@@ -371,7 +448,7 @@ int uprobe_LoopyWriter_HeaderHandler(struct pt_regs *ctx) {
     // Try to write value with manual offset
     char *val_data_addr = key_data_addr + key_size_aligned;
     long val_write_result = bpf_probe_write_user(val_data_addr, storage->val, sizeof(storage->val));
-    bpf_printk("val_data_addr=%p, write_result=%ld\n", val_data_addr, val_write_result);
+    // bpf_printk("val_data_addr=%p, write_result=%ld\n", val_data_addr, val_write_result);
     
     if (val_write_result != 0) {
         bpf_printk("Val direct write failed\n");
@@ -381,8 +458,8 @@ int uprobe_LoopyWriter_HeaderHandler(struct pt_regs *ctx) {
     struct go_string_ot key_str = {.str = key_data_addr, .len = sizeof(tp_key)};
     struct go_string_ot val_str = {.str = val_data_addr, .len = sizeof(storage->val)};
     
-    bpf_printk("key_str: len=%d, ptr=%p\n", key_str.len, key_str.str);
-    bpf_printk("val_str: len=%d, ptr=%p\n", val_str.len, val_str.str);
+    // bpf_printk("key_str: len=%d, ptr=%p\n", key_str.len, key_str.str);
+    // bpf_printk("val_str: len=%d, ptr=%p\n", val_str.len, val_str.str);
     
     // Build header field
     storage->hf.name = key_str;

+ 2 - 2
ebpftracer/ebpf/utrace/go/net/grpc.server.probe.bpf.c

@@ -236,7 +236,7 @@ handleStream(struct pt_regs *ctx, void *stream_ptr, struct go_iface *go_context)
 
 
     //不发送payload
-    bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+    // bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
 
     return 0;
 }
@@ -403,7 +403,7 @@ int uprobe_server_handleStream_Returns(struct pt_regs *ctx) {
     //     __builtin_memcpy(&e->daddr, &accept_conn->daddr, sizeof(e->daddr));
     // }
     //不发送payload
-    bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+    // bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
 
     bpf_printk("stop get apm data\n");
     return 0;                                                                                  

+ 5 - 0
ebpftracer/l7/l7.go

@@ -25,6 +25,7 @@ const (
 	ProtocolDubbo2    Protocol = 12
 	ProtocolDNS       Protocol = 13
 	ProtocolDM        Protocol = 14
+	ProtocolGrpc      Protocol = 15
 )
 
 func (p Protocol) Int() int {
@@ -63,6 +64,8 @@ func (p Protocol) String() string {
 		return "DNS"
 	case ProtocolDM:
 		return "DM"
+	case ProtocolGrpc:
+		return "GRPC"
 	}
 	return "UNKNOWN:" + strconv.Itoa(int(p))
 }
@@ -99,6 +102,8 @@ func (p Protocol) ServiceNameString() string {
 		return "DNS"
 	case ProtocolDM:
 		return "DM"
+	case ProtocolGrpc:
+		return "GRPC"
 	}
 	return "UNKNOWN:" + strconv.Itoa(int(p))
 }

+ 1 - 1
main.go

@@ -322,7 +322,7 @@ func main() {
 		if err != nil {
 			return
 		}
-		log.Debugln("netdata is:", string(jsonData))
+		// log.Debugln("netdata is:", string(jsonData))
 		// 创建请求
 		urlRoute := "/api/v2/ebpf/receive"
 

+ 43 - 0
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -25,6 +25,7 @@ const (
 	NOSQL_SERVICE_TYPE = "NOSQL"
 	HTTP_SERVICE_TYPE  = "HTTP"
 	NET_SERVICE_TYPE   = "L7_NET"
+	RPC_SERVICE_TYPE   = "RPC"
 )
 
 const (
@@ -35,6 +36,7 @@ const (
 	MONGO_SERVICE_NAME      = "MONGODB"
 	HTTP_SERVICE_NAME       = "HTTPCLIENT"
 	POSTGRESQL_SERVICE_NAME = "POSTGRESQL"
+	GRPC_SERVICE_NAME       = "GRPC"
 )
 
 type apmTraceSpan tracesdk.ReadOnlySpan
@@ -192,6 +194,8 @@ func tracetransformData(sdl []tracesdk.ReadOnlySpan) map[int][]RootDataT {
 					buildSQLMapEvent(&mNode, event)
 				case l7.ProtocolPostgres:
 					buildSQLMapEvent(&mNode, event)
+				case l7.ProtocolGrpc:
+					buildGrpcMapEvent(&mNode, event)
 				}
 			}
 
@@ -801,6 +805,45 @@ func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int {
 //	//mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
 //}
 
+func buildGrpcMapEvent(mNode *MapInfoT, event tracesdk.Event) {
+	mNode.ServiceName = GRPC_SERVICE_NAME
+	mNode.ServiceType = RPC_SERVICE_TYPE
+	mNode.Schema = "grpc"
+	//mNode.MethodName = "HTTP"
+	//var descAddr string
+	// var method string
+	for _, attr := range event.Attributes {
+		switch attr.Key {
+		case "rpc.ip":
+			mNode.Ip = attr.Value.AsString()
+			//descAddr += mNode.Ip
+		case "rpc.port":
+			mNode.Port = attr.Value.AsInt64()
+		case "rpc.method":
+			//mNode.MethodName += " " + attr.Value.AsString()
+			// method = attr.Value.AsString()
+			mNode.MethodName = attr.Value.AsString()
+			//descAddr += ":" + attr.Value.AsString()
+		case "rpc.uri":
+			mNode.Uri = attr.Value.AsString()
+			//mNode.MethodName += " " + attr.Value.AsString()
+		case "rpc.assumed_app_id":
+			mNode.AssumedAppId = attr.Value.AsInt64()
+		case "rpc.span_id":
+			mNode.SpanId = attr.Value.AsString()
+		case "time.start_at":
+			mNode.StartTime = uint64(attr.Value.AsInt64())
+		case "time.end_at":
+			mNode.EndTime = uint64(attr.Value.AsInt64())
+		case "time.duration":
+			//mNode.PureTime = uint64(attr.Value.AsInt64()) / 1e3
+			mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
+		}
+	}
+	// mNode.MethodName = fmt.Sprintf("%s %s %s:%d%s", "HTTP", method, mNode.Ip, mNode.Port, mNode.Uri)
+	//mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
+}
+
 func buildHttpMapFromEvent(mNode *MapInfoT, event tracesdk.Event) {
 	mNode.ServiceName = HTTP_SERVICE_NAME
 	mNode.ServiceType = HTTP_SERVICE_TYPE

+ 30 - 0
tracing/apm_tracing.go

@@ -540,6 +540,36 @@ func (t *Trace) RedisTraceQueryEvent(cmd, args string, r *l7.RequestData, destin
 	t.createTraceEvent(l7.ProtocolRedis.String(), ebpftracer.EventTypeL7Request.Int(), l7.ProtocolRedis.Int(), attr...)
 }
 
+func (t *Trace) GrpcClientTraceQueryEvent(r *l7.RequestData) {
+	if t == nil {
+		return
+	}
+	t.addEvent()
+	// if method == "" {
+	// 	return
+	// }
+
+	assumedAppID, err := strconv.ParseInt(r.AssumedAppId, 10, 64)
+	if err != nil {
+		assumedAppID = 0
+	}
+	
+	var attr []attribute.KeyValue
+	attr = append(attr,
+		semconv.RPCSystemGRPC,
+		attribute.String("rpc.uri", "selectone"),
+		attribute.String("rpc.schema", "grpc"),
+		attribute.Int64("rpc.assumed_app_id", assumedAppID),
+		attribute.String("rpc.span_id", r.SpanId),
+		attribute.Int("rpc.port", int(1023)),
+		attribute.String("rpc.ip", "127.0.0.1"),
+		attribute.String("rpc.oper_type", "consumer"),
+		attribute.String("rpc.method", "test_method"),
+	)
+
+	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
+	t.createTraceEvent(l7.ProtocolGrpc.String(), ebpftracer.EventTypeL7Request.Int(), l7.ProtocolGrpc.Int(), attr...)
+}
 func (t *Trace) MongoTraceQueryEvent(query string, r *l7.RequestData, destination netaddr.IPPort) {
 	if t == nil {
 		return