Răsfoiți Sursa

Feature #TASK_QT-18250 grpc-server数据结构拼装完成,待验证其是否可以与redis等数据库关联起来。

rock 7 luni în urmă
părinte
comite
49edeb4ea1

+ 17 - 5
containers/container_apm.go

@@ -119,11 +119,22 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 				klog.Debugf("->>> [%s] -> payload:[%s]", c.AppInfo.AppName, r.Payload)
 			}
 			if err == nil {
-				method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload)
-				ip, _ := netaddr.ParseIP(sn)
-				//codeType := c.GetCodeTypeFromCache(pid)
-				trace.TraceStartEvent(method, requestURI, sn, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo())
-				c.SendEvent(trace, r.TraceId)
+				if r.TraceType == 0 {
+					method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload)
+					ip, _ := netaddr.ParseIP(sn)
+					//codeType := c.GetCodeTypeFromCache(pid)
+					trace.TraceStartEvent(method, requestURI, sn, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo())
+					c.SendEvent(trace, r.TraceId)
+				} else if r.TraceType == 1 {
+					trace.GrpcServerTraceStartEvent(r,c.GetAppInfo())
+					c.SendEvent(trace, r.TraceId)
+
+					apmTrace, err := c.getOrInitTrace(r.TraceId)
+					if err == nil {
+						apmTrace.GrpcServerTraceQueryEvent(r)
+						c.SendEvent(apmTrace, r.TraceId)
+					}
+				}
 			}
 
 			return nil
@@ -281,6 +292,7 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 		}
 		//trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
 	case l7.ProtocolGrpc:
+		klog.Debugln("enter the l7.ProtocolGrpc")
 		stats.observe(r.Status.String(), "", r.Duration)
 		if c.l7Attach && c.valuableTrace(r.TraceId) {
 			apmTrace, err := c.getOrInitTrace(r.TraceId)

+ 3 - 0
ebpftracer/ebpf/l7/l7.c

@@ -78,6 +78,7 @@ struct l7_event {
 	__u64 end_at;
     __u32 trace_start;
     __u32 trace_end;
+    __u32 trace_type;           // 0: normal, 1: grpc-server, 2: https
     __u32 event_count;
     __u16 sport;
     __u16 dport;
@@ -457,6 +458,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         // e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
         e->trace_start = 0;
         e->trace_end = 1;
+        e->trace_type = 0;
         e->trace_id = trace_id;
         e->payload_size = size;
         e->event_count = event_count;
@@ -833,6 +835,7 @@ int trace_exit_read_common(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long in
 
         e->trace_start = 1;
         e->trace_end = 0;
+        e->trace_type = 0;
         e->protocol = PROTOCOL_TRACE;
         e->trace_id = trace_info.trace_id;
 	    cw_bpf_debug("\n");

+ 3 - 3
ebpftracer/ebpf/utrace/go/net/grpc.client.probe.bpf.c

@@ -13,7 +13,7 @@
 #define MAX_CONCURRENT 50
 #define MAX_ERROR_LEN 128
 
-#define PROTOCOL_GRPC 15
+// #define PROTOCOL_GRPC 15
 
 struct grpc_client_request_t {
     BASE_SPAN_PROPERTIES
@@ -373,10 +373,10 @@ SEC("uprobe/loopyWriter_headerHandler")
 int uprobe_LoopyWriter_HeaderHandler(struct pt_regs *ctx) {
 
     void *key = (void *)GOROUTINE(ctx);
-    bpf_printk("enter the uprobe_LoopyWriter_HeaderHandler key is 0x%llx\n", (u64)key);
+    // bpf_printk("enter the uprobe_LoopyWriter_HeaderHandler key is 0x%llx\n", (u64)key);
 
     void *headerFrame_ptr = get_argument(ctx, 2);
-    bpf_printk("enter the get header handler storage\n");
+    // bpf_printk("enter the get header handler storage\n");
 
     __u64 pid_tgid = bpf_get_current_pid_tgid();
 	__u32 tgid = pid_tgid >> 32;

+ 40 - 15
ebpftracer/ebpf/utrace/go/net/grpc.server.probe.bpf.c

@@ -16,6 +16,8 @@
 #define MAX_HEADERS 20
 #define MAX_HEADER_STRING 50
 
+#define PROTOCOL_GRPC 15
+
 struct grpc_request_t {
     BASE_SPAN_PROPERTIES
     char method[MAX_SIZE];
@@ -23,6 +25,7 @@ struct grpc_request_t {
     net_addr_t local_addr;
     u8 has_status;
     u32 stream_id;
+    u64 method_size;
 };
 
 struct {
@@ -196,7 +199,7 @@ handleStream(struct pt_regs *ctx, void *stream_ptr, struct go_iface *go_context)
         return 0;
     }
 
-    bpf_printk("start get apm data\n");
+    // bpf_printk("start get apm data\n");
 
 
     struct apm_span_context *cw_parent_span_context = bpf_map_lookup_elem(&apm_span_context_heap3, &zero);
@@ -220,23 +223,41 @@ handleStream(struct pt_regs *ctx, void *stream_ptr, struct go_iface *go_context)
 
     e->fd = k.fd;
     e->pid = k.pid;
-    e->protocol = PROTOCOL_UNKNOWN;
     e->status = STATUS_UNKNOWN;
     e->method = METHOD_UNKNOWN;
     e->statement_id = 0;
-    e->payload_size = 0;
-    e->trace_id = 0;
+    
+    // 拷贝 grpcReq->method 到 payload 并设置 payload_size
+    // 手动计算字符串长度(在 eBPF 中不能使用 strlen)
+    u32 method_len = 0;
+    for (int i = 0; i < MAX_SIZE; i++) {
+        if (grpcReq->method[i] == '\0') {
+            method_len = i;
+            break;
+        }
+    }
+    // 如果没有找到 '\0',使用最大长度
+    if (method_len == 0) {
+        method_len = MAX_SIZE - 1;
+    }
+    
+    grpcReq->method_size = method_len;
+    e->payload_size = method_len;
+    COPY_PAYLOAD(e->payload, method_len, grpcReq->method);
+
+    bpf_printk("grpc:server:handleStream: get the payload size is %d\n", e->payload_size);
 
     struct apm_trace_info_t trace_info = cw_save_trace_info(id,pid, k.fd);
     
     e->trace_start = 1;
     e->trace_end = 0;
+    e->trace_type = 1;
     e->protocol = PROTOCOL_TRACE;
     e->trace_id = trace_info.trace_id;
 
 
     //不发送payload
-    // bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+    bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
 
     return 0;
 }
@@ -289,7 +310,7 @@ SEC("uprobe/server_handleStream")
 int uprobe_server_handleStream(struct pt_regs *ctx) {
     u64 stream_pos = 4;
     void *stream_ptr = get_argument(ctx, stream_pos);
-    bpf_printk("enter uprobe_server_handleStream\n");
+    // bpf_printk("enter uprobe_server_handleStream\n");
     // Get key
     __u64 pid_tgid = bpf_get_current_pid_tgid();
 	__u32 tgid = pid_tgid >> 32;
@@ -318,7 +339,7 @@ int uprobe_server_handleStream_Returns(struct pt_regs *ctx) {
 
 	pid = id >> 32;
 	tid =  (__u32)id;
-    bpf_printk("enter uprobe_server_handleStream_Returns\n");
+    // bpf_printk("enter uprobe_server_handleStream_Returns\n");
 
     struct l7_request_key k = {};
     k.pid = pid;
@@ -381,10 +402,13 @@ int uprobe_server_handleStream_Returns(struct pt_regs *ctx) {
     // e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
     e->trace_start = 0;
     e->trace_end = 1;
+    e->trace_type = 1;
     e->trace_id = trace_id;
     e->payload_size = 0;
     e->event_count = event_count;
-    // COPY_PAYLOAD(e->payload, size, payload);
+    
+    e->payload_size = event->method_size;
+    COPY_PAYLOAD(e->payload, event->method_size, event->method);
     // bpf_map_delete_elem(&active_l7_requests, &k);
 	// 清除事件计数
 	bpf_map_delete_elem(&trace_event_count_heap, &trace_id);
@@ -403,9 +427,9 @@ int uprobe_server_handleStream_Returns(struct pt_regs *ctx) {
     //     __builtin_memcpy(&e->daddr, &accept_conn->daddr, sizeof(e->daddr));
     // }
     //不发送payload
-    // bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+    bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
 
-    bpf_printk("stop get apm data\n");
+    // bpf_printk("stop get apm data\n");
     return 0;                                                                                  
 }
 
@@ -417,7 +441,7 @@ int uprobe_server_handleStream_Returns(struct pt_regs *ctx) {
 SEC("uprobe/server_handleStream2")
 int uprobe_server_handleStream2(struct pt_regs *ctx) {
     u64 server_stream_pos = 4;
-    bpf_printk("enter uprobe_server_handleStream2\n");
+    // bpf_printk("enter uprobe_server_handleStream2\n");
     void *server_stream_ptr = get_argument(ctx, server_stream_pos);
     if (server_stream_ptr == NULL) {
         bpf_printk("grpc:server:uprobe/server_handleStream2: failed to get ServerStream arg");
@@ -467,7 +491,7 @@ int uprobe_server_handleStream2_Returns(struct pt_regs *ctx) {
 
 	pid = id >> 32;
 	tid =  (__u32)id;
-    bpf_printk("enter uprobe_server_handleStream2_Returns\n");
+    // bpf_printk("enter uprobe_server_handleStream2_Returns\n");
 
     struct l7_request_key k = {};
     k.pid = pid;
@@ -547,6 +571,7 @@ lookup:
     // e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
     e->trace_start = 0;
     e->trace_end = 1;
+    e->trace_type = 1;
     e->trace_id = trace_id;
     e->payload_size = 0;
     e->event_count = event_count;
@@ -583,7 +608,7 @@ int uprobe_http2Server_operateHeader(struct pt_regs *ctx) {
     struct go_slice header_fields = {};
     bpf_probe_read(&header_fields, sizeof(header_fields), (void *)(frame_ptr + frame_fields_pos));
     char key[W3C_KEY_LENGTH] = "traceparent";
-    bpf_printk("enter the uprobe_http2Server_operateHeader\n");
+    // bpf_printk("enter the uprobe_http2Server_operateHeader\n");
     
     __u32 zero = 0;
     struct apm_span_context *cw_parent_span_context = bpf_map_lookup_elem(&apm_span_context_heap3, &zero);
@@ -648,7 +673,7 @@ int uprobe_http2Server_operateHeader(struct pt_regs *ctx) {
 // This is only compatible with versions > 1.40 and < 1.69.0 of the Server.
 SEC("uprobe/http2Server_WriteStatus")
 int uprobe_http2Server_WriteStatus(struct pt_regs *ctx) {
-    bpf_printk("enter uprobe_http2Server_WriteStatus\n");
+    // bpf_printk("enter uprobe_http2Server_WriteStatus\n");
     void *status_ptr = get_argument(ctx, 3);
     return writeStatus(ctx, status_ptr);
 }
@@ -659,7 +684,7 @@ int uprobe_http2Server_WriteStatus(struct pt_regs *ctx) {
 // This is only compatible with versions > 1.69.0 of the Server.
 SEC("uprobe/http2Server_WriteStatus2")
 int uprobe_http2Server_WriteStatus2(struct pt_regs *ctx) {
-    bpf_printk("enter uprobe_http2Server_WriteStatus2\n");
+    // bpf_printk("enter uprobe_http2Server_WriteStatus2\n");
     u64 server_stream_pos = 2;
     void *server_stream_ptr = get_argument(ctx, server_stream_pos);
     if (server_stream_ptr == NULL) {

+ 1 - 0
ebpftracer/l7/l7.go

@@ -196,6 +196,7 @@ type RequestData struct {
 	TraceId           uint64
 	TraceStart        uint32
 	TraceEnd          uint32
+	TraceType         uint32
 	EventCount        uint32
 	AssumedAppId      string
 	SpanId            string

+ 5 - 0
ebpftracer/tracer.go

@@ -534,6 +534,7 @@ type l7Event struct {
 	EndtAt              uint64 // ns
 	TraceStart          uint32
 	TraceEnd            uint32
+	TraceType           uint32
 	EventCount          uint32
 	Sport               uint16
 	Dport               uint16
@@ -772,6 +773,7 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				TraceId:        v.TraceId,
 				TraceStart:     v.TraceStart,
 				TraceEnd:       v.TraceEnd,
+				TraceType:      v.TraceType,
 				EventCount:     v.EventCount,
 				AssumedAppId:   hex.EncodeToString(v.AssumedAppId[:]),
 				SpanId:         hex.EncodeToString(v.SpanId[:]),
@@ -784,6 +786,9 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				klog.Debugf("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())
 				klog.Debugf("runEventsReader ComponentDAddr.String %s", req.ComponentDAddr.String())
 			}
+			if req.Protocol == l7.ProtocolGrpc {
+				klog.Debugln("receive the l7.ProtocolGrpc data")
+			}
 			if v.TraceEnd == 1 {
 				req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])
 				req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:])

+ 8 - 1
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -723,6 +723,8 @@ func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int {
 		switch attr.Key {
 		case "http.uri":
 			traceRoot.Uri, traceRoot.Parameters, _ = parseURIToParams(attr.Value.AsString())
+		case "rpc.uri":
+			traceRoot.Uri = attr.Value.AsString()
 		case "http.method":
 			traceRoot.HttpMethod = attr.Value.AsString()
 		case "http.status_code":
@@ -762,7 +764,10 @@ func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int {
 			traceRoot.AppName = attr.Value.AsString()
 		case "server.service_name":
 			traceRoot.ServiceName = attr.Value.AsString()
-			mNode.ServiceName = attr.Value.AsString()
+		case "rpc.service_type":
+			traceRoot.ServiceType = RPC_SERVICE_TYPE
+		case "rpc.oper_type":	
+			traceRoot.OperType = "PROVIDER"
 		case "server.app_id":
 			traceRoot.AppId = attr.Value.AsInt64()
 		case "server.agent_id":
@@ -827,6 +832,8 @@ func buildGrpcMapEvent(mNode *MapInfoT, event tracesdk.Event) {
 		case "rpc.uri":
 			mNode.Uri = attr.Value.AsString()
 			//mNode.MethodName += " " + attr.Value.AsString()
+		case "rpc.oper_type":
+			mNode.OperType = attr.Value.AsString()
 		case "rpc.assumed_app_id":
 			mNode.AssumedAppId = attr.Value.AsInt64()
 		case "rpc.span_id":

+ 48 - 0
tracing/apm_tracing.go

@@ -129,6 +129,23 @@ func (t *Trace) AllEventReady(traceID uint64) bool {
 	return t.startEventReady && t.endEventReady && *t.currenEventCount >= t.needEventCount
 }
 
+func (t *Trace) GrpcServerTraceStartEvent(r *l7.RequestData, appInfo AppInfo) {
+	t.span.SetAttributes(attribute.String("rpc.uri", string(r.Payload)))
+	t.commonAttrs = []attribute.KeyValue{
+		// buildAppMapFromEvent
+		attribute.Int("server.code_type", appInfo.CodeType.Int()),
+		attribute.String("server.app_name", appInfo.AppName),
+		attribute.String("server.service_name", "GRPC"),
+		attribute.String("rpc.service_type", "RPC"),
+		attribute.String("rpc.oper_type", "PROVIDER"),
+		attribute.Int64("server.app_id", appInfo.AppIdHash.IntVal),
+		attribute.Int64("server.agent_id", appInfo.AgentId),
+		attribute.Int64("server.instance_id", appInfo.InstanceIdHash.IntVal),
+	}
+	t.span.SetAttributes(t.commonAttrs...)
+	t.startReady()
+}
+
 func (t *Trace) TraceStartEvent(method, path, sn string, sport uint16, status l7.Status, addr netaddr.IPPort, pid uint32, appInfo AppInfo) {
 	t.span.SetAttributes(semconv.HTTPURL(fmt.Sprintf("http://%s:%d%s", sn, sport, path)),
 		semconv.HTTPMethod(method),
@@ -540,6 +557,37 @@ func (t *Trace) RedisTraceQueryEvent(cmd, args string, r *l7.RequestData, destin
 	t.createTraceEvent(l7.ProtocolRedis.String(), ebpftracer.EventTypeL7Request.Int(), l7.ProtocolRedis.Int(), attr...)
 }
 
+func (t *Trace) GrpcServerTraceQueryEvent(r *l7.RequestData) {
+	if t == nil {
+		return
+	}
+	t.addEvent()
+	// if method == "" {
+	// 	return
+	// }
+
+	assumedAppID, err := strconv.ParseInt(r.AssumedAppId, 10, 64)
+	if err != nil {
+		assumedAppID = 0
+	}
+	
+	var attr []attribute.KeyValue
+	attr = append(attr,
+		semconv.RPCSystemGRPC,
+		attribute.String("rpc.uri", string(r.Payload)),
+		attribute.String("rpc.schema", "grpc"),
+		attribute.Int64("rpc.assumed_app_id", assumedAppID),
+		attribute.String("rpc.span_id", r.SpanId),
+		attribute.Int("rpc.port", int(1023)),
+		attribute.String("rpc.ip", "127.0.0.1"),
+		attribute.String("rpc.oper_type", "PROVIDER"),
+		attribute.String("rpc.method", "test_method"),
+	)
+
+	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
+	t.createTraceEvent(l7.ProtocolGrpc.String(), ebpftracer.EventTypeL7Request.Int(), l7.ProtocolGrpc.Int(), attr...)
+}
+
 func (t *Trace) GrpcClientTraceQueryEvent(r *l7.RequestData) {
 	if t == nil {
 		return