Browse Source

Feature #TASK_QT-18250 适配GRPC
Feature #TASK_QT-18250 端到端功能完成
Feature #TASK_QT-18250 端到端数据已经打通
Feature #TASK_QT-18250 grpc-server数据结构拼装完成,待验证其是否可以与redis等数据库关联起来。
Feature #TASK_QT-18250 grpc-clien端到端数据trace_id与server数据采集到的trace_id相关联。
Feature #TASK_QT-18250 grpc-clien完成数据拼装,部分数据待替换为真实数据。
Feature #TASK_QT-18250 grpc-client完成端到端数据的注入
Feature #TASK_QT-18250 提交代码,修复注入header头的问题,下一步解决注入正确header数据
Feature #TASK_QT-18250 grpc-client监控代码编写完成,但是在塞header头时出现问题。
Feature #TASK_QT-18250 grpc-server临时完成了数据拼接,基础功能验证是无问题的。
Feature #TASK_QT-18250 提交代码
Feature #TASK_QT-18250 现在uprobe点能正常挂载,解决handle_stream无return的问题。
Feature #TASK_QT-18250 grpc server uprobe点挂载为低版本。
Feature #TASK_QT-18250 grpc server uprobe点挂载成功。
Feature #TASK_QT-18250 测试uprobe是否挂载成功。
Feature #TASK_QT-18250 暂时性注释获取端到端逻辑。
Feature #TASK_QT-18250 恢复代码分割前逻辑。
Feature #TASK_QT-18250 提交代码,解决bug
Feature #TASK_QT-18250 提交代码,解决兼容问题,让AI协助排查定位问题。
Feature #TASK_QT-18250 补充go_net.h文件的定义
Feature #TASK_QT-18250 提交代码,便于后续比较
Feature #TASK_QT-18250 增加grpc监控代码,先提交,方便后续修改比较

Tom 7 months ago
parent
commit
9120928160

+ 29 - 8
containers/container_apm.go

@@ -120,15 +120,26 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 				klog.Debugf("->>> [%s] -> payload:[%s]", c.AppInfo.AppName, r.Payload)
 			}
 			if err == nil {
-				method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
-				ip, _ := netaddr.ParseIP(sn)
-				//codeType := c.GetCodeTypeFromCache(pid)
-				container_id := ""
-				if c.cgroup != nil {
-					container_id = c.cgroup.ContainerId
+				if r.TraceType == 0 {
+					method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
+					ip, _ := netaddr.ParseIP(sn)
+					//codeType := c.GetCodeTypeFromCache(pid)
+					container_id := ""
+					if c.cgroup != nil {
+						container_id = c.cgroup.ContainerId
+					}
+					trace.TraceStartEvent(method, requestURI, sn, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo(), container_id)
+					c.SendEvent(trace, r.TraceId)
+				} else if r.TraceType == 1 {
+					trace.GrpcServerTraceStartEvent(r, c.GetAppInfo())
+					c.SendEvent(trace, r.TraceId)
+
+					apmTrace, err := c.getOrInitTrace(r.TraceId)
+					if err == nil {
+						apmTrace.GrpcServerTraceQueryEvent(r, c.GetAppInfo())
+						c.SendEvent(apmTrace, r.TraceId)
+					}
 				}
-				trace.TraceStartEvent(method, requestURI, sn, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo(), container_id)
-				c.SendEvent(trace, r.TraceId)
 			}
 
 			return nil
@@ -291,6 +302,16 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 			}
 		}
 		//trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
+	case l7.ProtocolGrpc:
+		klog.Debugln("enter the l7.ProtocolGrpc")
+		stats.observe(r.Status.String(), "", r.Duration)
+		if c.l7Attach && c.valuableTrace(r.TraceId) {
+			apmTrace, err := c.getOrInitTrace(r.TraceId)
+			if err == nil {
+				apmTrace.GrpcClientTraceQueryEvent(r)
+				c.SendEvent(apmTrace, r.TraceId)
+			}
+		}
 	case l7.ProtocolMongo:
 		stats.observe(r.Status.String(), "", r.Duration)
 		if c.l7Attach && c.valuableTrace(r.TraceId) {

+ 1 - 0
ebpftracer/ebpf/config.h

@@ -64,6 +64,7 @@ enum {
 	PROG_DATA_JAVA_FIND_HOST_UP_IDX,
 	PROG_DATA_JAVA_BUILD_HEADER_UP_IDX,
 	PROG_DATA_GO_UPDATE_HEADER_UP_IDX,
+	PROG_GRPC_SERVER_PROCESS_HEADERS_UP_IDX,
 	PROG_UP_NUM
 };
 

+ 2 - 0
ebpftracer/ebpf/ebpf.c

@@ -58,6 +58,8 @@
 #include "utrace/go/net/client.probe.bpf.c"
 #include "utrace/go/net/stack.probe.bpf.c"
 #include "utrace/go/net/jvmstack.probe.bpf.c"
+#include "utrace/go/net/grpc.server.probe.bpf.c"
+#include "utrace/go/net/grpc.client.probe.bpf.c"
 
 #include "utrace/java/net/server.probe.bpf.c"
 #include "utrace/java/net/client.probe.bpf.c"

+ 18 - 13
ebpftracer/ebpf/l7/l7.c

@@ -79,6 +79,7 @@ struct l7_event {
 	__u64 end_at;
     __u32 trace_start;
     __u32 trace_end;
+    __u32 trace_type;           // 0: normal, 1: grpc-server, 2: https
     __u32 event_count;
     __u16 sport;
     __u16 dport;
@@ -97,6 +98,8 @@ struct l7_event {
 	unsigned char app_id_from[APM_APP_ID_SIZE];
 	unsigned char span_id_from[APM_SPAN_ID_SIZE];
     unsigned char type_from[APM_TYPE_FROM_SIZE];
+    unsigned char rpc_target[64];
+
     // 错误消息字段
     unsigned char error_message[ERROR_MSG_PAYLOAD_SIZE];
 //    __u32 test_id;
@@ -460,6 +463,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         // e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
         e->trace_start = 0;
         e->trace_end = 1;
+        e->trace_type = 0;
         e->trace_id = trace_id;
         e->payload_size = size;
         e->event_count = event_count;
@@ -675,19 +679,19 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         if (prev_req && prev_req->protocol == PROTOCOL_KAFKA) {
             req->ns = prev_req->ns;
         }
-    } else if (looks_like_http2_frame(payload, size, METHOD_HTTP2_CLIENT_FRAMES)) {
-        struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
-        if (!e) {
-            return 0;
-        }
-        e->protocol = PROTOCOL_HTTP2;
-        e->method = METHOD_HTTP2_CLIENT_FRAMES;
-        e->duration = bpf_ktime_get_ns();
-        e->payload_size = size;
-        e->trace_id = get_apm_trace_id(pid, tid);
-        COPY_PAYLOAD(e->payload, size, payload);
-        send_event(ctx, e, cid, conn);
-        return 0;
+    // } else if (looks_like_http2_frame(payload, size, METHOD_HTTP2_CLIENT_FRAMES)) {
+    //     struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+    //     if (!e) {
+    //         return 0;
+    //     }
+    //     e->protocol = PROTOCOL_HTTP2;
+    //     e->method = METHOD_HTTP2_CLIENT_FRAMES;
+    //     e->duration = bpf_ktime_get_ns();
+    //     e->payload_size = size;
+    //     e->trace_id = get_apm_trace_id(pid, tid);
+    //     COPY_PAYLOAD(e->payload, size, payload);
+    //     send_event(ctx, e, cid, conn);
+    //     return 0;
     } else if (is_dubbo2_request(payload, size)) {
         req->protocol = PROTOCOL_DUBBO2;
     } else if (is_dns_request(payload, size, &k.stream_id)) {
@@ -839,6 +843,7 @@ int trace_exit_read_common(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long in
 
         e->trace_start = 1;
         e->trace_end = 0;
+        e->trace_type = 0;
         e->protocol = PROTOCOL_TRACE;
         e->trace_id = trace_info.trace_id;
 	    cw_bpf_debug("\n");

+ 58 - 0
ebpftracer/ebpf/utrace/go/include/go_net.h

@@ -0,0 +1,58 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+#ifndef GO_NET_H
+#define GO_NET_H
+
+#include "common.h"
+#include "go_types.h"
+
+typedef struct net_addr {
+    u8 ip[16];
+    u32 port;
+} net_addr_t;
+
+/*
+type TCPAddr struct {
+	IP   IP
+	Port int
+	Zone string // IPv6 scoped addressing zone
+}
+*/
+const volatile u64 TCPAddr_IP_offset;
+const volatile u64 TCPAddr_Port_offset;
+
+static __always_inline long
+get_tcp_net_addr_from_tcp_addr(struct pt_regs *ctx, net_addr_t *addr, void *tcpAddr_ptr) {
+    go_slice_ot ip;
+    long res = bpf_probe_read_user(&ip, sizeof(ip), (void *)(tcpAddr_ptr + TCPAddr_IP_offset));
+    if (res != 0) {
+        bpf_printk("failed to read ip slice %d", res);
+        return res;
+    }
+
+    u8 ip_slice_len = 4;
+    if (ip.len != 4 && ip.len != 16) {
+        bpf_printk("invalid ip slice length: %d", ip.len);
+        return -1;
+    }
+
+    if (ip.len == 16) {
+        ip_slice_len = 16;
+    }
+
+    res = bpf_probe_read_user(addr->ip, ip_slice_len, ip.array);
+    if (res != 0) {
+        bpf_printk("failed to read ip array");
+        return res;
+    }
+
+    res = bpf_probe_read_user(
+        &addr->port, sizeof(addr->port), (void *)(tcpAddr_ptr + TCPAddr_Port_offset));
+    if (res != 0) {
+        bpf_printk("failed to read port");
+    }
+    return res;
+}
+
+#endif

+ 3 - 0
ebpftracer/ebpf/utrace/go/include/go_types.h

@@ -140,6 +140,7 @@ static __always_inline void append_item_to_slice(void *new_item, u32 item_size,
         u32 alloc_size = item_size * slice_len;
         if (alloc_size >= MAX_SLICE_ARRAY_SIZE)
         {
+            bpf_printk("enter the uprobe_LoopyWriter_HeaderHandler99999\n");
             return;
         }
     
@@ -148,6 +149,7 @@ static __always_inline void append_item_to_slice(void *new_item, u32 item_size,
         struct slice_array_buff *map_buff = bpf_map_lookup_elem(&slice_array_buff_map, &index);
         if (!map_buff)
         {
+            bpf_printk("enter the uprobe_LoopyWriter_HeaderHandler88888\n");
             return;
         }
     
@@ -157,6 +159,7 @@ static __always_inline void append_item_to_slice(void *new_item, u32 item_size,
         if (alloc_size + item_size > MAX_SLICE_ARRAY_SIZE)
         {
             // No room for new item
+            bpf_printk("enter the uprobe_LoopyWriter_HeaderHandler7777\n");
             return;
         }
         // Append to buffer

+ 520 - 0
ebpftracer/ebpf/utrace/go/net/grpc.client.probe.bpf.c

@@ -0,0 +1,520 @@
+
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+#include "arguments.h"
+#include "span_context.h"
+#include "go_types.h"
+#include "go_context.h"
+#include "uprobe.h"
+
+
+// #define MAX_SIZE 50
+#define MAX_CONCURRENT 50
+#define MAX_ERROR_LEN 128
+
+// #define PROTOCOL_GRPC 15
+
+struct grpc_client_request_t {
+    BASE_SPAN_PROPERTIES
+    char err_msg[MAX_ERROR_LEN];
+    char method[MAX_SIZE];
+    char target[MAX_SIZE];
+    u32 status_code;
+    u64 method_size;
+    u64 target_size;
+
+    struct apm_span_context apm_sc;
+    struct apm_span_context apm_psc;
+};
+
+// struct hpack_header_field {
+//     struct go_string_ot name;
+//     struct go_string_ot value;
+//     bool sensitive;
+// };
+
+struct {
+    __uint(type, BPF_MAP_TYPE_HASH);
+    __type(key, void *);
+    __type(value, struct grpc_client_request_t);
+    __uint(max_entries, MAX_CONCURRENT);
+} grpc_client_events SEC(".maps");
+
+struct {
+    __uint(type, BPF_MAP_TYPE_HASH);
+    __type(key, u32);
+    __type(value, struct apm_trace_key_t);
+    __uint(max_entries, MAX_CONCURRENT);
+} streamid_to_span_contexts SEC(".maps");
+
+// 用于 ClientConn_Invoke 函数的临时存储,避免栈空间超限
+struct {
+    __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+    __uint(key_size, sizeof(u32));
+    __uint(value_size, sizeof(struct grpc_client_request_t));
+    __uint(max_entries, 1);
+} grpc_client_storage_map SEC(".maps");
+
+// 用于 loopyWriter_headerHandler 的临时存储
+struct header_handler_storage {
+    struct span_context sc;
+    char val[CW_HEADER_VAL_LENGTH];
+    struct hpack_header_field hf;
+};
+
+struct {
+    __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+    __uint(key_size, sizeof(u32));
+    __uint(value_size, sizeof(struct header_handler_storage));
+    __uint(max_entries, 1);
+} header_handler_storage_map SEC(".maps");
+
+// Injected in init
+// volatile const u64 clientconn_target_ptr_pos;
+u64 clientconn_target_ptr_pos = 24;
+// volatile const u64 httpclient_nextid_pos;
+u64 httpclient_nextid_pos = 404;
+// volatile const u64 headerFrame_streamid_pos;
+u64 headerFrame_streamid_pos = 0;
+// volatile const u64 headerFrame_hf_pos;
+u64 headerFrame_hf_pos = 8;
+// volatile const u64 error_status_pos;
+u64 error_status_pos = 0;
+// volatile const u64 status_s_pos;
+// static u64 status_s_pos = 0;
+// volatile const u64 status_message_pos;
+u64 status_message_pos = 48;
+// volatile const u64 status_code_pos;
+// static u64 status_code_pos = 40;
+
+volatile const bool write_status_supported;
+
+// This instrumentation attaches uprobe to the following function:
+// func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error
+SEC("uprobe/ClientConn_Invoke")
+int uprobe_ClientConn_Invoke(struct pt_regs *ctx) {
+    // bpf_printk("enter the uprobe_ClientConn_Invoke \n");
+    // positions
+    u64 clientconn_pos = 1;
+    u64 method_ptr_pos = 4;
+    u64 method_len_pos = 5;
+
+    // struct go_iface go_context = {0};
+    // get_Go_context(ctx, 2, 0, true);
+    void *context_ptr_val = get_Go_context(ctx, 2, 0, true);
+    if (context_ptr_val == NULL)
+    {
+        return 0;
+    }
+
+    // Get key
+    void *key = (void *)GOROUTINE(ctx);
+    // bpf_printk("enter the uprobe_ClientConn_Invoke key is 0x%llx\n", (u64)key);
+    void *grpcReq_ptr = bpf_map_lookup_elem(&grpc_client_events, &key);
+    if (grpcReq_ptr != NULL) {
+        bpf_printk("uprobe/ClientConn_Invoke already tracked with the current context");
+        return 0;
+    }
+
+    // 使用 per-cpu array map 存储大变量,避免栈空间超限
+    u32 zero = 0;
+    struct grpc_client_request_t *grpcReq = bpf_map_lookup_elem(&grpc_client_storage_map, &zero);
+    if (grpcReq == NULL) {
+        bpf_printk("grpc:client:ClientConn_Invoke: failed to get storage");
+        return -1;
+    }
+    
+    // 清零并初始化
+    __builtin_memset(grpcReq, 0, sizeof(struct grpc_client_request_t));
+    grpcReq->start_time = bpf_ktime_get_ns();
+
+    // Read Method
+    void *method_ptr = get_argument(ctx, method_ptr_pos);
+    u64 method_len = (u64)get_argument(ctx, method_len_pos);
+    u64 method_size = sizeof(grpcReq->method);
+    method_size = method_size < method_len ? method_size : method_len;
+    bpf_probe_read(&grpcReq->method, method_size, method_ptr);
+    grpcReq->method_size = method_size;
+
+    // Read ClientConn.Target
+    void *clientconn_ptr = get_argument(ctx, clientconn_pos);
+    if (!get_go_string_from_user_ptr((void *)(clientconn_ptr + clientconn_target_ptr_pos),
+                                     grpcReq->target,
+                                     sizeof(grpcReq->target))) {
+        bpf_printk("target write failed, aborting ebpf probe");
+        return 0;
+    }
+
+
+    grpcReq->target_size = sizeof(grpcReq->target);
+
+    struct apm_span_context *cw_psc = cw_get_parent_tracking_span();
+	if(cw_psc){
+        set_assumed_app_id_arrays(grpcReq->target, cw_psc->assumed_app_id, APM_ASSUMED_APP_ID_STRING_SIZE);
+        cw_save_parent_tracking_span(cw_psc);
+	}
+
+    // bpf_printk("grpcReq->target is %s\n", grpcReq->target);
+
+    // start_span_params_t start_span_params = {
+    //     .ctx = ctx,
+    //     .go_context = &go_context,
+    //     .psc = &grpcReq->psc,
+    //     .sc = &grpcReq->sc,
+    //     .get_parent_span_context_fn = NULL,
+    //     .get_parent_span_context_arg = NULL,
+    // };
+    // start_span(&start_span_params);
+
+    // Write event
+    bpf_map_update_elem(&grpc_client_events, &key, grpcReq, 0);
+    start_tracking_span(context_ptr_val, &grpcReq->sc);
+    // bpf_printk("enter the uprobe_ClientConn_Invoke start_tracking_span\n");
+    return 0;
+}
+
+// This instrumentation attaches uprobe to the following function:
+// func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error
+SEC("uprobe/ClientConn_Invoke")
+int uprobe_ClientConn_Invoke_Returns(struct pt_regs *ctx) {
+    // bpf_printk("enter the uprobe_ClientConn_Invoke_Returns \n");
+    void *key = (void *)GOROUTINE(ctx);
+    struct grpc_client_request_t *grpc_span = bpf_map_lookup_elem(&grpc_client_events, &key);
+    if (grpc_span == NULL) {
+        bpf_printk("event is NULL in ret probe");
+        return 0;
+    }
+
+    if (!write_status_supported) {
+        goto done;
+    }
+    // Getting the returned response (error)
+    // The status code is embedded 3 layers deep:
+    // Invoke() error
+    // the `error` interface concrete type here is a gRPC `internal.Error` struct
+    // type Error struct {
+    //   s *Status
+    // }
+    // The `Error` struct embeds a `Status` proto object
+    // type Status struct {
+    //   s *Status
+    // }
+    // The `Status` proto object contains a `Code` int32 field, which is what we want
+    // type Status struct {
+    //     Code int32
+    //     Message string
+    //     Details []*anypb.Any
+    // }
+    void *resp_ptr = get_argument(ctx, 2);
+    if (resp_ptr == 0) {
+        // err == nil
+        goto done;
+    }
+    void *status_ptr = 0;
+    // get `s` (Status pointer field) from Error struct
+    bpf_probe_read_user(&status_ptr, sizeof(status_ptr), (void *)(resp_ptr + error_status_pos));
+    // get `s` field from Status object pointer
+    void *s_ptr = 0;
+    bpf_probe_read_user(&s_ptr, sizeof(s_ptr), (void *)(status_ptr + status_s_pos));
+    // Get status code from Status.s pointer
+    bpf_probe_read_user(
+        &grpc_span->status_code, sizeof(grpc_span->status_code), (void *)(s_ptr + status_code_pos));
+    get_go_string_from_user_ptr(
+        (void *)(s_ptr + status_message_pos), grpc_span->err_msg, sizeof(grpc_span->err_msg));
+
+done:
+    // bpf_printk("switch to the done position\n");
+    grpc_span->end_time = bpf_ktime_get_ns();
+    // output_span_event(ctx, grpc_span, sizeof(*grpc_span), &grpc_span->sc);
+    stop_tracking_span(&grpc_span->sc, &grpc_span->psc);
+    bpf_map_delete_elem(&grpc_client_events, &key);
+
+    __u64 pid_tgid = bpf_get_current_pid_tgid();
+    __u64 pid = pid_tgid >> 32;
+
+	struct ebpf_proc_info *info = bpf_map_lookup_elem(&proc_info_map, &pid);
+	if (!info) {
+		return 0;
+	}
+
+	cw_bpf_debug("[Go] [uprobe/setNodeEnter]: proc_info_map::%ld, %d, %d\n", info->code_type);
+
+
+    struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
+    struct apm_trace_info_t * trace_info = get_apm_trace_info_by_trace_key(trace_key);
+
+    if (trace_info == NULL) {
+        bpf_printk("enter the trace_info is NULL\n");
+        trace_info = get_apm_trace_info_v3(trace_key,pid_tgid, pid, pid_tgid);
+    }
+    
+    u32 zero = 0;
+    struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+    if (!e) {
+        return 0;
+    }
+
+    // e->fd = 0;
+    // e->pid = pid;
+    e->protocol = PROTOCOL_GRPC;
+    e->trace_end = 0;
+    e->trace_start = 0;
+    // e->status = STATUS_UNKNOWN;
+    // e->method = METHOD_GRPC;
+    e->statement_id = 0;
+    e->payload_size = grpc_span->method_size;
+    if (trace_info) {
+        bpf_printk("trace_info->trace_id is %llu\n", trace_info->trace_id);
+        e->trace_id = trace_info->trace_id;
+    }
+    if(e->trace_id == 0){
+        e->trace_id = get_apm_trace_id(pid,pid_tgid);
+        bpf_printk("e->trace_id is %llu\n", e->trace_id);
+    }
+    // 使用固定长度读取 target 信息,避免验证器复杂性
+    if (grpc_span->target_size > 0) {
+        // 使用固定的小长度来避免验证器问题
+        u32 fixed_size = 32; // 固定32字节,足够大多数target
+        if (grpc_span->target_size < fixed_size) {
+            fixed_size = (u32)grpc_span->target_size;
+        }
+        bpf_probe_read(&e->rpc_target, fixed_size, grpc_span->target);
+    }
+    COPY_PAYLOAD(e->payload, grpc_span->method_size, grpc_span->method);
+
+    // bpf_printk("e->payload is %s\n", e->payload);
+    // bpf_printk("grpc_span->target is %s\n", grpc_span->target);
+    // COPY_PAYLOAD(e->payload + grpc_span->method_size, grpc_span->target_size, grpc_span->target);
+    // bpf_printk("e->payload is %s\n", e->payload);
+    // e->payload_size += grpc_span->target_size;
+    
+    struct  apm_span_context * sc = cw_get_current_tracking_span(trace_info);
+    if (sc) {
+        cw_copy_byte_arrays(sc->assumed_app_id, e->assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
+        cw_copy_byte_arrays(sc->span_id, e->span_id, APM_SPAN_ID_SIZE);
+    }
+    
+    e->end_at = bpf_ktime_get_ns();
+    e->start_at = grpc_span->start_time;
+    e->duration = e->end_at - e->start_at;
+    // bpf_printk("send_event trace_id is444 %llu\n", e->trace_id);
+    long error = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+	if (error ==0){
+	    cw_add_event_count(e->trace_id);
+	}
+
+
+    // bpf_printk("enter the uprobe_ClientConn_Invoke_Returns done\n");
+    return 0;
+}
+
+
+static __always_inline void
+cw_append_item_to_slice(void *new_item, u32 item_size, void *slice_user_ptr) {
+    // read the slice descriptor
+    struct go_slice_ot slice = {0};
+    long res = bpf_probe_read_user(&slice, sizeof(slice), slice_user_ptr);
+    if (res != 0) {
+        bpf_printk("cw_append_item_to_slice: failed to read slice descriptor, res=%ld\n", res);
+        return;
+    }
+
+    // bpf_printk("cw_append_item_to_slice len is %d\n", slice.len);
+    // bpf_printk("cw_append_item_to_slice cap is %d\n", slice.cap);
+    // bpf_printk("cw_append_item_to_slice array is %p\n", slice.array);
+
+    u64 slice_len = slice.len;
+    u64 slice_cap = slice.cap;
+    if (slice_len < slice_cap && slice.array != NULL) {
+        // Room available on current array, append to the underlying array
+        // bpf_printk("enter the cw_append_item_to_slice11111\n");
+        res = bpf_probe_write_user(slice.array + (item_size * slice_len), new_item, item_size);
+    } else {
+        // No room on current array - try to copy new one of size item_size * (len + 1)
+        u32 alloc_size = item_size * slice_len;
+        if (alloc_size >= MAX_SLICE_ARRAY_SIZE) {
+            return;
+        }
+
+        // Get temporary buffer
+        u32 index = 0;
+        struct slice_array_buff *map_buff = bpf_map_lookup_elem(&slice_array_buff_map, &index);
+        if (!map_buff) {
+            return;
+        }
+
+        unsigned char *new_slice_array = map_buff->buff;
+        // help the verifier
+        alloc_size &= (MAX_SLICE_ARRAY_SIZE - 1);
+        if (alloc_size + item_size > MAX_SLICE_ARRAY_SIZE) {
+            // No room for new item
+            return;
+        }
+        // Append to buffer
+        if (slice.array != NULL) {
+            bpf_probe_read_user(new_slice_array, alloc_size, slice.array);
+            // bpf_printk("append_item_to_slice: copying %d bytes to new array from address 0x%llx",
+                    //    alloc_size,
+                    //    slice.array);
+        }
+        copy_byte_arrays(new_item, new_slice_array + alloc_size, item_size);
+
+        // Copy buffer to userspace
+        u32 new_array_size = alloc_size + item_size;
+
+        void *new_array = write_target_data(new_slice_array, new_array_size);
+        if (new_array == NULL) {
+            bpf_printk("append_item_to_slice: failed to copy new array to userspace");
+            return;
+        }
+
+        // Update array pointer of slice
+        slice.array = new_array;
+        slice.cap++;
+
+        // bpf_printk("enter the cw_append_item_to_slice222222\n");
+    }
+
+    // Update len
+    slice.len++;
+    // bpf_printk("after cw_append_item_to_slice len is %d\n", slice.len);
+    long success = bpf_probe_write_user(slice_user_ptr, &slice, sizeof(slice));
+    if (success != 0) {
+        bpf_printk("append_item_to_slice: failed to update slice in userspace");
+        return;
+    }
+}
+
+SEC("uprobe/loopyWriter_headerHandler")
+int uprobe_LoopyWriter_HeaderHandler(struct pt_regs *ctx) {
+    void *headerFrame_ptr = get_argument(ctx, 2);
+    // bpf_printk("enter the get header handler storage\n");
+
+    __u64 pid_tgid = bpf_get_current_pid_tgid();
+	__u32 tgid = pid_tgid >> 32;
+	struct ebpf_proc_info *proc_info =
+			bpf_map_lookup_elem(&proc_info_map, &tgid);
+    if(!proc_info)
+    {
+        return 0;
+    }
+
+    u32 stream_id = 0;
+    bpf_probe_read(
+        &stream_id, sizeof(stream_id), (void *)(headerFrame_ptr + (headerFrame_streamid_pos)));
+    struct apm_trace_key_t *sc_ptr = bpf_map_lookup_elem(&streamid_to_span_contexts, &stream_id);
+    if (sc_ptr == NULL) {
+        return 0;
+    }
+    
+    // Get storage from per-cpu map
+    u32 zero = 0;
+    struct header_handler_storage *storage = bpf_map_lookup_elem(&header_handler_storage_map, &zero);
+    if (!storage) {
+        bpf_printk("Failed to get header handler storage\n");
+        return 0;
+    }
+    
+    // Generate span context
+    generate_random_bytes(storage->sc.TraceID, TRACE_ID_SIZE);
+    generate_random_bytes(storage->sc.SpanID, SPAN_ID_SIZE);
+
+
+    u32 map_id = 0;
+    struct grpc_client_request_t *grpcClientReq = bpf_map_lookup_elem(&grpc_client_storage_map, &map_id);
+    if (grpcClientReq == NULL)
+    {
+        cw_bpf_debug("uprobe_LoopyWriter_HeaderHandler: grpcClientReq is NULL");
+        return 0;
+    }
+
+    __builtin_memset(grpcClientReq, 0, sizeof(struct grpc_client_request_t));
+    grpcClientReq->start_time = bpf_ktime_get_ns();
+
+    // bpf_printk("enter the uprobe_LoopyWriter_HeaderHandler444444\n");
+	// struct apm_span_context *cw_psc = cw_get_parent_tracking_span();
+    struct apm_span_context *cw_psc = cw_get_parent_tracking_span_by_trace_key(*sc_ptr);
+	if(cw_psc){
+		bpf_probe_read(&grpcClientReq->apm_psc, sizeof(grpcClientReq->apm_psc), cw_psc);
+		copy_byte_arrays(grpcClientReq->apm_psc.trace_id, grpcClientReq->apm_sc.trace_id, APM_TRACE_ID_SIZE);
+		generate_random_bytes(grpcClientReq->apm_sc.span_id, APM_SPAN_ID_SIZE);
+        copy_byte_arrays(grpcClientReq->apm_psc.assumed_app_id, grpcClientReq->apm_sc.assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
+	}
+
+    u32 k0 = 0;
+	struct trace_conf_t *trace_conf = trace_conf_map__lookup(&k0);
+	if (trace_conf) {
+		copy_byte_arrays(trace_conf->host_id, grpcClientReq->apm_sc.host_id, APM_HOST_ID_SIZE);
+	}
+
+    copy_byte_arrays(proc_info->instance_id, grpcClientReq->apm_sc.instance_id, APM_APP_ID_SIZE);
+	copy_byte_arrays(proc_info->app_id, grpcClientReq->apm_sc.app_id, APM_APP_ID_SIZE);
+    // bpf_printk("grpcClientReq->apm_sc.app_id is %s\n", grpcClientReq->apm_sc.app_id);
+
+    bpf_map_update_elem(&apm_current_span_context_map, sc_ptr, &grpcClientReq->apm_sc, BPF_ANY);
+	// cw_save_current_tracking_span(&grpcClientReq->apm_sc);
+
+    // Strategy: Write key and value separately with manual offset
+    // First write the key
+    char tp_key[CW_HEADER_KEY_LENGTH] = CW_HEADER_KEY_VAL;
+    char *key_data_addr = write_target_data((void *)tp_key, sizeof(tp_key));
+    if (key_data_addr == NULL) {
+        bpf_printk("Key data write failed\n");
+        return 0;
+    }
+    // bpf_printk("key_data_addr=%p\n", key_data_addr);
+    
+    // Manually advance the pointer for value (workaround for alloc_map bug)
+    // Allocate enough space: align to 8 bytes
+    u64 key_size_aligned = ((sizeof(tp_key) + 7) / 8) * 8;
+    
+    // Write value at offset from key
+    span_context_to_cw_string(&grpcClientReq->apm_sc, storage->val);
+    // span_context_to_w3c_string(&storage->sc, storage->val);
+    
+    // Try to write value with manual offset
+    char *val_data_addr = key_data_addr + key_size_aligned;
+    long val_write_result = bpf_probe_write_user(val_data_addr, storage->val, sizeof(storage->val));
+    // bpf_printk("val_data_addr=%p, write_result=%ld\n", val_data_addr, val_write_result);
+    
+    if (val_write_result != 0) {
+        bpf_printk("Val direct write failed\n");
+        return 0;
+    }
+    
+    struct go_string_ot key_str = {.str = key_data_addr, .len = sizeof(tp_key)};
+    struct go_string_ot val_str = {.str = val_data_addr, .len = sizeof(storage->val)};
+    
+    // bpf_printk("key_str: len=%d, ptr=%p\n", key_str.len, key_str.str);
+    // bpf_printk("val_str: len=%d, ptr=%p\n", val_str.len, val_str.str);
+    
+    // Build header field
+    storage->hf.name = key_str;
+    storage->hf.value = val_str;
+    storage->hf.sensitive = false;
+    
+    // Append to slice
+    void *slice_ptr = (void *)(headerFrame_ptr + headerFrame_hf_pos);
+    cw_append_item_to_slice(&storage->hf, sizeof(storage->hf), slice_ptr);
+
+    return 0;
+}
+
+SEC("uprobe/http2Client_NewStream")
+// func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
+int uprobe_http2Client_NewStream(struct pt_regs *ctx) {
+    // bpf_printk("enter the uprobe_http2Client_NewStream \n");
+    struct go_iface go_context = {0};
+    get_Go_context(ctx, 2, 0, true);
+    void *httpclient_ptr = get_argument(ctx, 1);
+    u32 nextid = 0;
+    bpf_probe_read(&nextid, sizeof(nextid), (void *)(httpclient_ptr + (httpclient_nextid_pos)));
+    // Get the span context from go context. The mapping is created in the Invoke probe,
+    // the context here is derived from the Invoke context.
+    // struct span_context *current_span_context = get_parent_span_context(&go_context);
+    struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
+    bpf_map_update_elem(&streamid_to_span_contexts, &nextid, &trace_key, 0);
+
+    return 0;
+}

+ 698 - 0
ebpftracer/ebpf/utrace/go/net/grpc.server.probe.bpf.c

@@ -0,0 +1,698 @@
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+#include "arguments.h"
+#include "go_types.h"
+#include "go_net.h"
+#include "span_context.h"
+#include "go_context.h"
+#include "uprobe.h"
+// #include "trace/start_span.h"
+
+// char __license[] SEC("license") = "Dual MIT/GPL";
+
+// #define MAX_SIZE 100
+#define MAX_CONCURRENT 50
+#define MAX_HEADERS 20
+#define MAX_HEADER_STRING 50
+
+#define PROTOCOL_GRPC 16
+
+struct grpc_request_t {
+    BASE_SPAN_PROPERTIES
+    char method[MAX_SIZE];
+    u32 status_code;
+    net_addr_t local_addr;
+    u8 has_status;
+    u32 stream_id;
+    u64 method_size;
+};
+
+struct {
+    __uint(type, BPF_MAP_TYPE_HASH);
+    __type(key, void *);
+    __type(value, struct grpc_request_t);
+    __uint(max_entries, MAX_CONCURRENT);
+} grpc_events SEC(".maps");
+
+struct {
+    __uint(type, BPF_MAP_TYPE_HASH);
+    __type(key, u32);
+    __type(value, struct grpc_request_t);
+    __uint(max_entries, MAX_CONCURRENT);
+} streamid_to_grpc_events SEC(".maps");
+
+struct {
+    __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+    __uint(key_size, sizeof(u32));
+    __uint(value_size, sizeof(struct grpc_request_t));
+    __uint(max_entries, 1);
+} grpc_storage_map SEC(".maps");
+
+struct hpack_header_field {
+    struct go_string_ot name;
+    struct go_string_ot value;
+    bool sensitive;
+};
+
+// Injected in init
+// volatile const u64 stream_method_ptr_pos;
+u64 stream_method_ptr_pos = 80;
+// volatile const u64 frame_fields_pos;
+u64 frame_fields_pos = 8;
+// volatile const u64 frame_stream_id_pod;
+u64 frame_stream_id_pod = 8;
+// volatile const u64 stream_id_pos;
+u64 stream_id_pos = 0;
+// volatile const u64 stream_ctx_pos;
+u64 stream_ctx_pos = 32;
+// volatile const u64 server_stream_stream_pos;
+u64 server_stream_stream_pos;
+// volatile const bool is_new_frame_pos;
+bool is_new_frame_pos;
+// volatile const u64 status_s_pos;
+static u64 status_s_pos = 0;
+// volatile const u64 status_code_pos;
+static u64 status_code_pos = 40;
+// volatile const u64 http2server_peer_pos;
+u64 http2server_peer_pos;
+// volatile const u64 peer_local_addr_pos;
+u64 peer_local_addr_pos;
+
+volatile const bool server_addr_supported;
+
+static __always_inline long
+dummy_extract_span_context_from_headers(void *stream_id, struct span_context *parent_span_context) {
+    return 0;
+}
+
+// handleStream handles gRPC stream telemetry.
+//
+// Arguments:
+//   - ctx: the pt_regs passed to the uprobe function
+//   - stream_ptr: pointer to the transport.Stream tracking the stream
+//   - go_context: the parsed Go context.Context
+//
+// Returns 0 on success, otherwise a negative error value in case of failure.
+static __always_inline int
+handleStream(struct pt_regs *ctx, void *stream_ptr, struct go_iface *go_context) {
+    if (go_context == NULL) {
+        bpf_printk("grpc:server:handleStream: NULL go_context");
+        return -1;
+    }
+
+    if (stream_ptr == NULL) {
+        bpf_printk("grpc:server:handleStream: NULL stream_ptr");
+        return -1;
+    }
+
+    void *key = (void *)GOROUTINE(ctx);
+    void *grpcReq_event_ptr = bpf_map_lookup_elem(&grpc_events, &key);
+    if (grpcReq_event_ptr != NULL) {
+        bpf_printk("grpc:server:handleStream: event already tracked");
+        return 0;
+    }
+
+    // Get parent context if exists
+    u32 stream_id = 0;
+    __u32 zero = 0;
+    long rc =
+        bpf_probe_read_user(&stream_id, sizeof(stream_id), (void *)(stream_ptr + stream_id_pos));
+    if (rc != 0) {
+        bpf_printk("grpc:server:handleStream: failed to read stream ID");
+        return -2;
+    }
+
+    struct grpc_request_t *grpcReq = bpf_map_lookup_elem(&streamid_to_grpc_events, &stream_id);
+    if (grpcReq == NULL) {
+        // No parent span context, generate new span context
+        u32 zero = 0;
+        grpcReq = bpf_map_lookup_elem(&grpc_storage_map, &zero);
+        if (grpcReq == NULL) {
+            bpf_printk("grpc:server:handleStream: failed to get grpcReq");
+            return 0;
+        }
+    }
+
+    grpcReq->start_time = bpf_ktime_get_ns();
+    grpcReq->stream_id = stream_id;
+
+    // start_span_params_t start_span_params = {
+    //     .ctx = ctx,
+    //     .sc = &grpcReq->sc,
+    //     .psc = &grpcReq->psc,
+    //     .go_context = go_context,
+    //     // The parent span context is set by operateHeader probe
+    //     .get_parent_span_context_fn = dummy_extract_span_context_from_headers,
+    //     .get_parent_span_context_arg = NULL,
+    // };
+    // start_span(&start_span_params);
+
+    // Set attributes
+    void *method_ptr = stream_ptr + stream_method_ptr_pos;
+    bool parsed_method =
+        get_go_string_from_user_ptr(method_ptr, grpcReq->method, sizeof(grpcReq->method));
+    if (!parsed_method) {
+        bpf_printk("grpc:server:handleStream: failed to read gRPC method from stream");
+        bpf_map_delete_elem(&streamid_to_grpc_events, &stream_id);
+        return -3;
+    }
+
+    bpf_printk("grpc:server:handleStream: get the method is %s\n", grpcReq->method);
+
+    if (server_addr_supported) {
+        void *http2server = get_argument(ctx, 3);
+        if (http2server != NULL) {
+            void *local_addr_ptr = 0;
+            void *local_addr_pos = http2server + http2server_peer_pos + peer_local_addr_pos;
+            bpf_probe_read_user(
+                &local_addr_ptr, sizeof(local_addr_ptr), get_go_interface_instance(local_addr_pos));
+            get_tcp_net_addr_from_tcp_addr(ctx, &grpcReq->local_addr, (void *)(local_addr_ptr));
+        } else {
+            bpf_printk("grpc:server:handleStream: failed to get http2server arg");
+        }
+    }
+
+    // Write event
+    rc = bpf_map_update_elem(&grpc_events, &key, grpcReq, 0);
+    if (rc != 0) {
+        bpf_printk("grpc:server:handleStream: failed to update event");
+        return -4;
+    }
+    start_tracking_span(go_context->data, &grpcReq->sc);
+
+
+    //处理http请求之前,确认进程信息是否存在
+    __u64 id = bpf_get_current_pid_tgid();
+    __u32 pid = id >> 32;
+    struct ebpf_proc_info *proc_info = bpf_map_lookup_elem(&proc_info_map, &pid);
+    if (!proc_info) {
+        cw_bpf_debug("[Trace End in l7][Response][HTTP]:no proc info. pid:%d \n",pid);
+        return 0;
+    }
+
+    // bpf_printk("start get apm data\n");
+
+
+    // struct apm_span_context *cw_parent_span_context = bpf_map_lookup_elem(&apm_span_context_heap3, &zero);
+    // if (cw_parent_span_context == NULL) {
+    //     return -1;
+    // }
+    // __builtin_memset(cw_parent_span_context, 0, sizeof(struct apm_span_context));
+    // generate_random_bytes(cw_parent_span_context->trace_id, TRACE_ID_SIZE);
+    // cw_save_parent_tracking_span(cw_parent_span_context);
+
+    struct l7_request_key k = {};
+    k.pid = pid;
+    k.fd = 0;
+    k.is_tls = 0;
+    k.stream_id = stream_id;
+
+    struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+    if (!e) {
+        return 0;
+    }
+
+    e->fd = k.fd;
+    e->pid = k.pid;
+    e->status = STATUS_UNKNOWN;
+    e->method = METHOD_UNKNOWN;
+    e->statement_id = 0;
+    
+    // 拷贝 grpcReq->method 到 payload 并设置 payload_size
+    // 手动计算字符串长度(在 eBPF 中不能使用 strlen)
+    u32 method_len = 0;
+    for (int i = 0; i < MAX_SIZE; i++) {
+        if (grpcReq->method[i] == '\0') {
+            method_len = i;
+            break;
+        }
+    }
+    // 如果没有找到 '\0',使用最大长度
+    if (method_len == 0) {
+        method_len = MAX_SIZE - 1;
+    }
+    
+    grpcReq->method_size = method_len;
+    e->payload_size = method_len;
+    COPY_PAYLOAD(e->payload, method_len, grpcReq->method);
+
+    bpf_printk("grpc:server:handleStream: get the payload size is %d\n", e->payload_size);
+
+    struct apm_trace_info_t trace_info = cw_save_trace_info(id,pid, k.fd);
+    
+    e->trace_start = 1;
+    e->trace_end = 0;
+    e->trace_type = 1;
+    e->protocol = PROTOCOL_TRACE;
+    e->trace_id = trace_info.trace_id;
+
+
+    //不发送payload
+    bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+
+    return 0;
+}
+
+// writeStatus writes the OTel status to any active spans.
+//
+// Arguments:
+//   - ctx: the pt_regs passed to the uprobe function
+//   - status_ptr: pointer to the status.Stream holding the status info
+//
+// Returns 0 on success, otherwise a negative error value in case of failure.
+static __always_inline int writeStatus(struct pt_regs *ctx, void *status_ptr) {
+    if (status_ptr == NULL) {
+        bpf_printk("grpc:server:writeStatus: NULL status_ptr");
+        return -1;
+    }
+
+    void *key = (void *)GOROUTINE(ctx);
+
+    struct grpc_request_t *req_ptr = bpf_map_lookup_elem(&grpc_events, &key);
+    if (req_ptr == NULL) {
+        bpf_printk("grpc:server:handleStream: failed to lookup grpc request");
+        return -2;
+    }
+
+    void *s_ptr = 0;
+    long rc = bpf_probe_read_user(&s_ptr, sizeof(s_ptr), (void *)(status_ptr + status_s_pos));
+    if (rc != 0) {
+        bpf_printk("grpc:server:handleStream: failed to read Status.s");
+        return -3;
+    }
+
+    // Get status code from Status.s pointer
+    rc = bpf_probe_read_user(
+        &req_ptr->status_code, sizeof(req_ptr->status_code), (void *)(s_ptr + status_code_pos));
+    if (rc != 0) {
+        bpf_printk("grpc:server:handleStream: failed to read status code");
+        return -4;
+    }
+    req_ptr->has_status = true;
+
+    return 0;
+}
+
+// This instrumentation attaches uprobe to the following function:
+// func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo)
+//
+// This is only compatible with versions < 1.69.0 of the Server.
+SEC("uprobe/server_handleStream")
+int uprobe_server_handleStream(struct pt_regs *ctx) {
+    bpf_printk("enter the uprobe_server_handleStream");
+    u64 stream_pos = 4;
+    void *stream_ptr = get_argument(ctx, stream_pos);
+    // bpf_printk("enter uprobe_server_handleStream\n");
+    // Get key
+    __u64 pid_tgid = bpf_get_current_pid_tgid();
+	__u32 tgid = pid_tgid >> 32;
+	struct ebpf_proc_info *proc_info =
+			bpf_map_lookup_elem(&proc_info_map, &tgid);
+    if(!proc_info)
+    {
+	    bpf_printk("[uprobe_HandlerFunc_ServeHTTP] no proc info");
+        return 0;
+    }
+    struct go_iface go_context = {0};
+    get_Go_context(ctx, stream_pos, proc_info->ctx_ptr_pos, false);
+
+    return handleStream(ctx, stream_ptr, &go_context);
+}
+
+// UPROBE_RETURN(server_handleStream, struct grpc_request_t, grpc_events) 
+SEC("uprobe/server_handleStream")
+int uprobe_server_handleStream_Returns(struct pt_regs *ctx) {  
+    bpf_printk("enter the uprobe_server_handleStream return");     
+    void *key = (void *)GOROUTINE(ctx);
+    __u64 id = bpf_get_current_pid_tgid();
+	__u32 zero = 0;
+    __u32 fd = 0;
+	__u32 pid, tid;
+	__u32 http_status = 200;
+
+	pid = id >> 32;
+	tid =  (__u32)id;
+    // bpf_printk("enter uprobe_server_handleStream_Returns\n");
+
+    struct l7_request_key k = {};
+    k.pid = pid;
+    k.fd = fd;
+    k.is_tls = 0;
+    k.stream_id = -1;    
+    struct grpc_request_t *event = bpf_map_lookup_elem(&grpc_events, &key);
+    if (event == NULL) {
+        bpf_printk("grpc:server:uprobe/server_handleStream2Return: event is NULL");
+        return -5;
+    }
+    event->end_time = bpf_ktime_get_ns();
+    // output_span_event(ctx, event, sizeof(struct grpc_request_t), &event->sc);
+    stop_tracking_span(&event->sc, &event->psc);
+    bpf_map_delete_elem(&grpc_events, &key);
+	
+    struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
+	struct apm_trace_info_t * start_trace_info = get_apm_trace_info_by_trace_key(trace_key);
+    if (!start_trace_info) {
+		return -1;
+	}
+
+    __u64 trace_id = start_trace_info->trace_id;
+	__u32 event_count = cw_get_event_count(trace_id);
+    cw_bpf_debug("[uprobeThread/pidpidpidpid][Trace End in l7][HTTP]pid:[%d]--[%lld]", tid, bpf_ktime_get_ns());
+	cw_bpf_debug("[Trace End in l7][Response][HTTP] event_count:%d", event_count);
+	cw_bpf_debug("[Trace End in l7][Response][HTTP] pid:%d,fd:%d,trace_id:%llu", tid, fd, trace_id);
+
+    // 发送事件到用户空间 start
+    struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+    if (!e) {
+	    cw_clear_trace(pid, tid, fd);
+        return -1;
+    }
+	// parent sc
+	struct apm_span_context *cw_psc = cw_get_parent_tracking_span_by_trace_key(start_trace_info->trace_key);
+	if(cw_psc){
+		cw_copy_byte_arrays(cw_psc->trace_id, e->trace_id_from, APM_TRACE_ID_SIZE);
+		cw_copy_byte_arrays(cw_psc->assumed_app_id, e->called_id, APM_ASSUMED_APP_ID_SIZE);
+		cw_copy_byte_arrays(cw_psc->instance_id, e->instance_id_from, APM_INSTANCE_ID_SIZE);
+		cw_copy_byte_arrays(cw_psc->app_id, e->app_id_from, APM_APP_ID_SIZE);
+		cw_copy_byte_arrays(cw_psc->span_id, e->span_id_from, APM_SPAN_ID_SIZE);
+        cw_copy_byte_arrays(cw_psc->type_from, e->type_from, APM_TYPE_FROM_SIZE);
+	}
+    // struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
+    // if (!req)
+    // {
+	//     cw_clear_trace(pid, tid, fd);
+    //     return 0;
+    // }
+	// e->start_at = req->ns;
+    e->start_at = event->start_time;
+	// cw_bpf_debug("req->ns:%llu",req->ns);
+	e->end_at = bpf_ktime_get_ns();
+    e->duration = e->end_at - e->start_at;
+    e->protocol = PROTOCOL_TRACE;
+    e->status = http_status;
+    e->pid = k.pid;
+    e->fd = k.fd;
+    // e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
+    e->trace_start = 0;
+    e->trace_end = 1;
+    e->trace_type = 1;
+    e->trace_id = trace_id;
+    e->payload_size = 0;
+    e->event_count = event_count;
+    
+    e->payload_size = event->method_size;
+    COPY_PAYLOAD(e->payload, event->method_size, event->method);
+    // bpf_map_delete_elem(&active_l7_requests, &k);
+	// 清除事件计数
+	bpf_map_delete_elem(&trace_event_count_heap, &trace_id);
+	// 清除业务层trace信息
+	clear_parent_span_context_by_trace_key(start_trace_info->trace_key);
+	// 清除trace信息
+	cw_clear_trace(pid, tid, fd);
+    // cw_bpf_debug("socket accept bytes_sent cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
+    // struct accept_connection *accept_conn = bpf_map_lookup_elem(&active_accepts, &cid);
+    // if (accept_conn) {
+    //     cw_bpf_debug("socket accept bytes_sent after cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
+    //     cw_bpf_debug("rock enter the  accept_conn function cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
+    //     e->sport = accept_conn->sport;
+    //     e->dport = accept_conn->dport;
+    //     __builtin_memcpy(&e->saddr, &accept_conn->saddr, sizeof(e->saddr));
+    //     __builtin_memcpy(&e->daddr, &accept_conn->daddr, sizeof(e->daddr));
+    // }
+    //不发送payload
+    bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+
+    // bpf_printk("stop get apm data\n");
+    return 0;                                                                                  
+}
+
+// This instrumentation attaches uprobe to the following function:
+// func (s *Server) handleStream(t transport.ServerTransport, stream *transport.ServerStream)
+// https://github.com/grpc/grpc-go/blob/317271b232677b7869576a49855b01b9f4775d67/server.go#L1735
+//
+// This is only compatible with versions > 1.69.0 of the Server.
+SEC("uprobe/server_handleStream2")
+int uprobe_server_handleStream2(struct pt_regs *ctx) {
+    u64 server_stream_pos = 4;
+    // bpf_printk("enter uprobe_server_handleStream2\n");
+    void *server_stream_ptr = get_argument(ctx, server_stream_pos);
+    if (server_stream_ptr == NULL) {
+        bpf_printk("grpc:server:uprobe/server_handleStream2: failed to get ServerStream arg");
+        return -1;
+    }
+
+    void *stream_ptr;
+    long rc = bpf_probe_read_user(
+        &stream_ptr, sizeof(stream_ptr), (void *)(server_stream_ptr + server_stream_stream_pos));
+    if (rc != 0) {
+        bpf_printk("grpc:server:uprobe/server_handleStream2: failed to read stream_ptr");
+        return -2;
+    }
+
+    struct go_iface go_context = {0};
+    rc = bpf_probe_read_user(
+        &go_context.type, sizeof(go_context.type), (void *)(stream_ptr + stream_ctx_pos));
+    if (rc != 0) {
+        bpf_printk("grpc:server:uprobe/server_handleStream2: failed to read context type");
+        return -3;
+    }
+
+    rc = bpf_probe_read_user(&go_context.data,
+                             sizeof(go_context.data),
+                             get_go_interface_instance(stream_ptr + stream_ctx_pos));
+    if (rc != 0) {
+        bpf_printk("grpc:server:uprobe/server_handleStream2: failed to read context data");
+        return -4;
+    }
+
+    return handleStream(ctx, stream_ptr, &go_context);
+}
+
+// This instrumentation attaches a return uprobe to the following function:
+// func (s *Server) handleStream(t transport.ServerTransport, stream *transport.ServerStream)
+// https://github.com/grpc/grpc-go/blob/317271b232677b7869576a49855b01b9f4775d67/server.go#L1735
+//
+// This is only compatible with versions > 1.69.0 of the Server.
+SEC("uprobe/server_handleStream2")
+int uprobe_server_handleStream2_Returns(struct pt_regs *ctx) {
+    u64 server_stream_pos = 4;
+    __u64 id = bpf_get_current_pid_tgid();
+	__u32 zero = 0;
+    __u32 fd = 0;
+	__u32 pid, tid;
+	__u32 http_status = 200;
+
+	pid = id >> 32;
+	tid =  (__u32)id;
+    // bpf_printk("enter uprobe_server_handleStream2_Returns\n");
+
+    struct l7_request_key k = {};
+    k.pid = pid;
+    k.fd = fd;
+    k.is_tls = 0;
+    k.stream_id = -1;
+
+    void *server_stream_ptr = get_argument(ctx, server_stream_pos);
+    void *key = NULL;
+    if (server_stream_ptr == NULL) {
+        // We might fail to get the pointer for versions of Go which use register ABI, as this function does not return anything.
+        // This is not an error in that case so we can just go to the lookup which will happen by goroutine.
+        goto lookup;
+    }
+
+    void *stream_ptr;
+    long rc = bpf_probe_read_user(
+        &stream_ptr, sizeof(stream_ptr), (void *)(server_stream_ptr + server_stream_stream_pos));
+    if (rc != 0) {
+        bpf_printk("grpc:server:uprobe/server_handleStream2Return: failed to read stream_ptr");
+        return -2;
+    }
+
+lookup:
+    key = (void *)GOROUTINE(ctx);
+    struct grpc_request_t *event = bpf_map_lookup_elem(&grpc_events, &key);
+    if (event == NULL) {
+        bpf_printk("grpc:server:uprobe/server_handleStream2Return: event is NULL");
+        return -5;
+    }
+    event->end_time = bpf_ktime_get_ns();
+    // output_span_event(ctx, event, sizeof(struct grpc_request_t), &event->sc);
+    stop_tracking_span(&event->sc, &event->psc);
+    bpf_map_delete_elem(&grpc_events, &key);
+
+
+    struct apm_trace_info_t * start_trace_info = get_trace_info_by_fd(pid, fd);
+	if (!start_trace_info) {
+		return -1;
+	}
+    __u64 trace_id = start_trace_info->trace_id;
+	__u32 event_count = cw_get_event_count(trace_id);
+    cw_bpf_debug("[uprobeThread/pidpidpidpid][Trace End in l7][HTTP]pid:[%d]--[%lld]", tid, bpf_ktime_get_ns());
+	cw_bpf_debug("[Trace End in l7][Response][HTTP] event_count:%d", event_count);
+	cw_bpf_debug("[Trace End in l7][Response][HTTP] pid:%d,fd:%d,trace_id:%llu", tid, fd, trace_id);
+
+    // 发送事件到用户空间 start
+    struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+    if (!e) {
+	    cw_clear_trace(pid, tid, fd);
+        return -1;
+    }
+	// parent sc
+	struct apm_span_context *cw_psc = cw_get_parent_tracking_span_by_trace_key(start_trace_info->trace_key);
+	if(cw_psc){
+		cw_copy_byte_arrays(cw_psc->trace_id, e->trace_id_from, APM_TRACE_ID_SIZE);
+		cw_copy_byte_arrays(cw_psc->assumed_app_id, e->called_id, APM_ASSUMED_APP_ID_SIZE);
+		cw_copy_byte_arrays(cw_psc->instance_id, e->instance_id_from, APM_INSTANCE_ID_SIZE);
+		cw_copy_byte_arrays(cw_psc->app_id, e->app_id_from, APM_APP_ID_SIZE);
+		cw_copy_byte_arrays(cw_psc->span_id, e->span_id_from, APM_SPAN_ID_SIZE);
+        cw_copy_byte_arrays(cw_psc->type_from, e->type_from, APM_TYPE_FROM_SIZE);
+	}
+    struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
+    if (!req)
+    {
+	    cw_clear_trace(pid, tid, fd);
+        return 0;
+    }
+	e->start_at = req->ns;
+	cw_bpf_debug("req->ns:%llu",req->ns);
+	e->end_at = bpf_ktime_get_ns();
+    e->duration = e->end_at - e->start_at;
+    e->protocol = PROTOCOL_TRACE;
+    e->status = http_status;
+    e->pid = k.pid;
+    e->fd = k.fd;
+    // e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
+    e->trace_start = 0;
+    e->trace_end = 1;
+    e->trace_type = 1;
+    e->trace_id = trace_id;
+    e->payload_size = 0;
+    e->event_count = event_count;
+    // COPY_PAYLOAD(e->payload, size, payload);
+    bpf_map_delete_elem(&active_l7_requests, &k);
+	// 清除事件计数
+	bpf_map_delete_elem(&trace_event_count_heap, &trace_id);
+	// 清除业务层trace信息
+	clear_parent_span_context_by_trace_key(start_trace_info->trace_key);
+	// 清除trace信息
+	cw_clear_trace(pid, tid, fd);
+    // cw_bpf_debug("socket accept bytes_sent cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
+    // struct accept_connection *accept_conn = bpf_map_lookup_elem(&active_accepts, &cid);
+    // if (accept_conn) {
+    //     cw_bpf_debug("socket accept bytes_sent after cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
+    //     cw_bpf_debug("rock enter the  accept_conn function cid.pid=%d, cid.fd=%d\n", cid.pid, cid.fd);
+    //     e->sport = accept_conn->sport;
+    //     e->dport = accept_conn->dport;
+    //     __builtin_memcpy(&e->saddr, &accept_conn->saddr, sizeof(e->saddr));
+    //     __builtin_memcpy(&e->daddr, &accept_conn->daddr, sizeof(e->daddr));
+    // }
+    return 0;
+}
+
+// func (d *http2Server) operateHeader(frame *http2.MetaHeadersFrame) error
+// for version 1.60 and above:
+// func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*Stream)) error
+SEC("uprobe/http2Server_operateHeader")
+int uprobe_http2Server_operateHeader(struct pt_regs *ctx) {
+    bpf_printk("enter the uprobe_http2Server_operateHeader");
+    s32 find_w3c = 0;
+    void *arg4 = get_argument(ctx, 4);
+    void *arg2 = get_argument(ctx, 2);
+    void *frame_ptr = is_new_frame_pos ? arg4 : arg2;
+    struct go_slice header_fields = {};
+    bpf_probe_read(&header_fields, sizeof(header_fields), (void *)(frame_ptr + frame_fields_pos));
+    char key[CW_HEADER_KEY_LENGTH + 1] = "cwtrace";
+    // 确保字符串以 null 结尾
+    key[CW_HEADER_KEY_LENGTH] = '\0';
+    // bpf_printk("enter the uprobe_http2Server_operateHeader\n");
+    
+    __u32 zero = 0;
+    struct apm_span_context *cw_parent_span_context = bpf_map_lookup_elem(&apm_span_context_heap3, &zero);
+    if (cw_parent_span_context == NULL) {
+        return -1;
+    }
+    __builtin_memset(cw_parent_span_context, 0, sizeof(struct apm_span_context));
+    
+    // 优化循环:减少复杂度,提前退出
+    for (s32 i = 0; i < MAX_HEADERS && i < header_fields.len; i++) {
+        struct hpack_header_field hf = {};
+        long res = bpf_probe_read(&hf, sizeof(hf), (void *)(header_fields.ptr + (i * sizeof(hf))));
+        if (res != 0) {
+            continue; // 读取失败,跳过
+        }
+        
+        // 简化条件判断
+        if (hf.name.len != CW_HEADER_KEY_LENGTH || hf.value.len != CW_HEADER_VAL_LENGTH) {
+            continue;
+        }
+        // bpf_printk("found traceparent header name is %s", hf.name.str);
+        // bpf_printk("found traceparent header value is %s", hf.value.str);
+        
+        char current_key[CW_HEADER_KEY_LENGTH + 1];
+        if (bpf_probe_read(current_key, sizeof(current_key), hf.name.str) != 0) {
+            continue;
+        }
+        // bpf_printk("---found traceparent header name is %s", hf.name.str);
+        // bpf_printk("+++found traceparent header value is %s", hf.value.str);
+
+        current_key[CW_HEADER_KEY_LENGTH] = '\0';
+
+        // bpf_printk("---11111found cwtrace key is %s", key);
+        // bpf_printk("+++11111found cwtrace current_key is %s", current_key);
+        // 简化字符串比较
+        // if (bpf_memcmp(key, current_key, 6) == 0) {
+        if (current_key[0] == 'c' && current_key[1] == 'w' && current_key[2] == 't' && current_key[3] == 'r' && current_key[4] == 'a' && current_key[5] == 'c' && current_key[6] == 'e') {
+            find_w3c = 1;
+            bpf_printk("found traceparent header");
+            // 执行字符串到span context的转换
+            cw_string_to_span_context(hf.value.str, cw_parent_span_context);
+            bpf_printk("11111found traceparent header value is %s", hf.value.str);
+            break; // 找到后立即退出
+        }
+    }
+    if (find_w3c == 0)
+    {
+        generate_random_bytes(cw_parent_span_context->trace_id, TRACE_ID_SIZE);
+        bpf_printk("enter uprobe_http2Server_operateHeader, generate the traceid\n");
+    }
+    cw_save_parent_tracking_span(cw_parent_span_context);
+    return 0;
+}
+
+// func (ht *http2Server) WriteStatus(s *Stream, st *status.Status)
+// https://github.com/grpc/grpc-go/blob/bcf9171a20e44ed81a6eb152e3ca9e35b2c02c5d/internal/transport/http2_server.go#L1049
+//
+// This is only compatible with versions > 1.40 and < 1.69.0 of the Server.
+SEC("uprobe/http2Server_WriteStatus")
+int uprobe_http2Server_WriteStatus(struct pt_regs *ctx) {
+    // bpf_printk("enter uprobe_http2Server_WriteStatus\n");
+    void *status_ptr = get_argument(ctx, 3);
+    return writeStatus(ctx, status_ptr);
+}
+
+// func (ht *http2Server) writeStatus(s *Stream, st *status.Status)
+// https://github.com/grpc/grpc-go/blob/317271b232677b7869576a49855b01b9f4775d67/internal/transport/http2_server.go#L1045
+//
+// This is only compatible with versions > 1.69.0 of the Server.
+SEC("uprobe/http2Server_WriteStatus2")
+int uprobe_http2Server_WriteStatus2(struct pt_regs *ctx) {
+    // bpf_printk("enter uprobe_http2Server_WriteStatus2\n");
+    u64 server_stream_pos = 2;
+    void *server_stream_ptr = get_argument(ctx, server_stream_pos);
+    if (server_stream_ptr == NULL) {
+        bpf_printk("grpc:server:uprobe/http2Server_WriteStatus2: failed to get ServerStream arg");
+        return -1;
+    }
+
+    void *stream_ptr;
+    long rc = bpf_probe_read_user(
+        &stream_ptr, sizeof(stream_ptr), (void *)(server_stream_ptr + server_stream_stream_pos));
+    if (rc != 0) {
+        bpf_printk("grpc:server:uprobe/http2Server_WriteStatus2: failed to read stream_ptr");
+        return -2;
+    }
+
+    void *status_ptr = get_argument(ctx, 3);
+    return writeStatus(ctx, status_ptr);
+}

+ 7 - 0
ebpftracer/l7/l7.go

@@ -26,6 +26,7 @@ const (
 	ProtocolDNS       Protocol = 13
 	ProtocolDM        Protocol = 14
 	ProtocolMariaDB   Protocol = 15
+	ProtocolGrpc      Protocol = 16
 )
 
 func (p Protocol) Int() int {
@@ -66,6 +67,8 @@ func (p Protocol) String() string {
 		return "DM"
 	case ProtocolMariaDB:
 		return "Mariadb"
+	case ProtocolGrpc:
+		return "GRPC"
 	}
 	return "UNKNOWN:" + strconv.Itoa(int(p))
 }
@@ -104,6 +107,8 @@ func (p Protocol) ServiceNameString() string {
 		return "DM"
 	case ProtocolMariaDB:
 		return "MARIA"
+	case ProtocolGrpc:
+		return "GRPC"
 	}
 	return "UNKNOWN:" + strconv.Itoa(int(p))
 }
@@ -196,6 +201,7 @@ type RequestData struct {
 	TraceId           uint64
 	TraceStart        uint32
 	TraceEnd          uint32
+	TraceType         uint32
 	EventCount        uint32
 	AssumedAppId      string
 	SpanId            string
@@ -205,6 +211,7 @@ type RequestData struct {
 	DAddr             netaddr.IPPort
 	ComponentSAddr    netaddr.IPPort
 	ComponentDAddr    netaddr.IPPort
+	RPCTarget         string
 	ParentSpanContext struct {
 		TraceIdFrom    string
 		CalledId       string

+ 101 - 1
ebpftracer/tls.go

@@ -31,6 +31,12 @@ const (
 	goRunqget             = "runtime.runqget"
 	goServeHTTP           = "net/http.serverHandler.ServeHTTP"
 	goTransport           = "net/http.(*Transport).roundTrip"
+	goGrpcServerHandleStream = "google.golang.org/grpc.(*Server).handleStream"
+	goGrpcHttp2OperateHeader = "google.golang.org/grpc/internal/transport.(*http2Server).operateHeaders"
+	goGrpcServerWritestatus = "google.golang.org/grpc/internal/transport.(*http2Server).WriteStatus"
+	goGrpcClientConnInvoke = "google.golang.org/grpc.(*ClientConn).Invoke"
+	goGrpcClientLoopyHeaderHandler = "google.golang.org/grpc/internal/transport.(*loopyWriter).headerHandler"
+	goGrpcHttp2ClientNewStream = "google.golang.org/grpc/internal/transport.(*http2Client).NewStream"
 )
 
 var (
@@ -294,7 +300,7 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 		}
 		switch s.Name {
 		case goTlsWriteSymbol, goTlsReadSymbol:
-		case goExecute, goNewproc1, goRunqget, goServeHTTP, goTransport:
+		case goExecute, goNewproc1, goRunqget, goServeHTTP, goTransport, goGrpcHttp2OperateHeader,goGrpcServerHandleStream,goGrpcServerWritestatus,goGrpcClientConnInvoke,goGrpcClientLoopyHeaderHandler,goGrpcHttp2ClientNewStream:
 		default:
 			continue
 		}
@@ -376,6 +382,100 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 			//	}
 			//	links = append(links, l)
 			//}
+		case goGrpcClientConnInvoke:
+			l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_ClientConn_Invoke"], &link.UprobeOptions{Address: address})
+			if err != nil {
+				klog.WithError(err).Errorln("failed to attach uprobe_ClientConn_Invoke uprobe")
+				continue
+			}
+			klog.Infoln("uprobe_ClientConn_Invoke ok")
+			links = append(links, l)
+			sStart := s.Value - textSection.Addr
+			sEnd := sStart + s.Size
+			klog.Infoln("uprobe_ClientConn_Invoke ok----111111")
+			if sEnd > textSectionLen {
+				continue
+			}
+			klog.Infoln("uprobe_ClientConn_Invoke ok----2222")
+			sBytes := textSectionData[sStart:sEnd]
+			returnOffsets := getReturnOffsets(ef.Machine, sBytes)
+			if len(returnOffsets) == 0 {
+				err = fmt.Errorf("failed to attach uprobe_ClientConn_Invoke no return offsets found")
+				klog.Errorln(err)
+				return nil, err
+			}
+			for _, offset := range returnOffsets {
+				l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_ClientConn_Invoke_Returns"], &link.UprobeOptions{Address: address, Offset: uint64(offset)})
+				if err != nil {
+					klog.WithError(err).Errorln(fmt.Errorf("failed to attach uprobe_ClientConn_Invoke_Returns uprobe"))
+					return nil, err
+				}
+				klog.Infoln("uprobe_ClientConn_Invoke_Returns ok----")
+				links = append(links, l)
+			}
+		case goGrpcClientLoopyHeaderHandler:
+			l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_LoopyWriter_HeaderHandler"], &link.UprobeOptions{Address: address})
+			if err != nil {
+				klog.WithError(err).Errorln("failed to attach uprobe_LoopyWriter_HeaderHandler uprobe")
+				continue
+			}
+			klog.Infoln("uprobe_LoopyWriter_HeaderHandler ok")
+			links = append(links, l)
+		case goGrpcHttp2ClientNewStream:
+			l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_http2Client_NewStream"], &link.UprobeOptions{Address: address})
+			if err != nil {
+				klog.WithError(err).Errorln("failed to attach uprobe_http2Client_NewStream uprobe")
+				continue
+			}
+			klog.Infoln("uprobe_http2Client_NewStream ok")
+			links = append(links, l)
+		case goGrpcHttp2OperateHeader:
+			l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_http2Server_operateHeader"], &link.UprobeOptions{Address: address})
+			if err != nil {
+				klog.WithError(err).Errorln("failed to attach uprobe_http2Server_operateHeader uprobe")
+				continue
+			}
+			klog.Infoln("goGrpcHttp2OperateHeader ok")
+			links = append(links, l)
+		case goGrpcServerWritestatus:
+			l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_http2Server_WriteStatus"], &link.UprobeOptions{Address: address})
+			if err != nil {
+				klog.WithError(err).Errorln("failed to attach uprobe_http2Server_WriteStatus uprobe")
+				continue
+			}
+			klog.Infoln("google.golang.org/grpc/internal/transport.(*http2Server).writeStatus ok")
+			links = append(links, l)
+		case goGrpcServerHandleStream:
+			l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_server_handleStream"], &link.UprobeOptions{Address: address})
+			if err != nil {
+				klog.WithError(err).Errorln("failed to attach uprobe_server_handleStream uprobe")
+				continue
+			}
+			klog.Infoln("google.golang.org/grpc.(*Server).handleStream ok")
+			links = append(links, l)
+			sStart := s.Value - textSection.Addr
+			sEnd := sStart + s.Size
+			klog.Infoln("google.golang.org/grpc.(*Server).handleStream ok----111111")
+			if sEnd > textSectionLen {
+				continue
+			}
+			klog.Infoln("google.golang.org/grpc.(*Server).handleStream ok----2222")
+			sBytes := textSectionData[sStart:sEnd]
+			returnOffsets := getReturnOffsets(ef.Machine, sBytes)
+			if len(returnOffsets) == 0 {
+				err = fmt.Errorf("failed to attach uprobe_server_handleStream2 no return offsets found")
+				klog.Errorln(err)
+				return nil, err
+			}
+			for _, offset := range returnOffsets {
+				l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_server_handleStream_Returns"], &link.UprobeOptions{Address: address, Offset: uint64(offset)})
+				if err != nil {
+					klog.WithError(err).Errorln(fmt.Errorf("failed to attach exit_runtime_newproc1 uprobe"))
+					return nil, err
+				}
+				klog.Infoln("google.golang.org/grpc.(*Server).handleStream ok----")
+				links = append(links, l)
+			}
 		case goServeHTTP:
 			l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_HandlerFunc_ServeHTTP"], &link.UprobeOptions{Address: address})
 			if err != nil {

+ 4 - 0
ebpftracer/tracer.go

@@ -534,6 +534,7 @@ type l7Event struct {
 	EndtAt              uint64 // ns
 	TraceStart          uint32
 	TraceEnd            uint32
+	TraceType           uint32
 	EventCount          uint32
 	Sport               uint16
 	Dport               uint16
@@ -552,6 +553,7 @@ type l7Event struct {
 	AppIdFrom           HashByte
 	SpanIdFrom          HashByte
 	TypeFrom            [1]byte
+	RPCTarget           [64]byte
 	ErrorMsg            HashByte128
 }
 
@@ -774,6 +776,7 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				TraceId:        v.TraceId,
 				TraceStart:     v.TraceStart,
 				TraceEnd:       v.TraceEnd,
+				TraceType:      v.TraceType,
 				EventCount:     v.EventCount,
 				AssumedAppId:   hex.EncodeToString(v.AssumedAppId[:]),
 				SpanId:         hex.EncodeToString(v.SpanId[:]),
@@ -781,6 +784,7 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				EndAt:          v.EndtAt,
 				ComponentSAddr: ipPort(v.ComponentSAddr, v.ComponentSport),
 				ComponentDAddr: ipPort(v.ComponentDAddr, v.ComponentDport),
+				RPCTarget:      string(v.RPCTarget[:]),
 				ErrorMsg:       strings.TrimRight(string(v.ErrorMsg[:]), "\x00"),
 				IsTls:          v.IsTls > 0,
 			}

+ 1 - 1
main.go

@@ -322,7 +322,7 @@ func main() {
 		if err != nil {
 			return
 		}
-		log.Debugln("netdata is:", string(jsonData))
+		// log.Debugln("netdata is:", string(jsonData))
 		// 创建请求
 		urlRoute := "/api/v2/ebpf/receive"
 

+ 51 - 1
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -26,6 +26,7 @@ const (
 	NOSQL_SERVICE_TYPE = "NOSQL"
 	HTTP_SERVICE_TYPE  = "HTTP"
 	NET_SERVICE_TYPE   = "L7_NET"
+	RPC_SERVICE_TYPE   = "RPC"
 )
 
 const (
@@ -36,6 +37,7 @@ const (
 	MONGO_SERVICE_NAME      = "MONGODB"
 	HTTP_SERVICE_NAME       = "HTTPCLIENT"
 	POSTGRESQL_SERVICE_NAME = "POSTGRESQL"
+	GRPC_SERVICE_NAME       = "GRPC"
 )
 
 type apmTraceSpan tracesdk.ReadOnlySpan
@@ -193,6 +195,8 @@ func tracetransformData(sdl []tracesdk.ReadOnlySpan) map[int][]RootDataT {
 					buildRedisMapEvent(&mNode, event)
 				case l7.ProtocolMongo:
 					buildMongoMapEvent(&mNode, event)
+				case l7.ProtocolGrpc:
+					buildGrpcMapEvent(&mNode, event)
 				}
 			}
 
@@ -724,6 +728,8 @@ func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int {
 		switch attr.Key {
 		case "http.uri":
 			traceRoot.Uri, traceRoot.Parameters, _ = parseURIToParams(attr.Value.AsString())
+		case "rpc.uri":
+			traceRoot.Uri = attr.Value.AsString()
 		case "http.method":
 			traceRoot.HttpMethod = attr.Value.AsString()
 		case "http.status_code":
@@ -763,7 +769,10 @@ func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int {
 			traceRoot.AppName = attr.Value.AsString()
 		case "server.service_name":
 			traceRoot.ServiceName = attr.Value.AsString()
-			mNode.ServiceName = attr.Value.AsString()
+		case "rpc.service_type":
+			traceRoot.ServiceType = RPC_SERVICE_TYPE
+		case "rpc.oper_type":
+			traceRoot.OperType = "PROVIDER"
 		case "server.app_id":
 			traceRoot.AppId = attr.Value.AsInt64()
 		case "server.agent_id":
@@ -810,6 +819,47 @@ func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int {
 //	//mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
 //}
 
+func buildGrpcMapEvent(mNode *MapInfoT, event tracesdk.Event) {
+	mNode.ServiceName = GRPC_SERVICE_NAME
+	mNode.ServiceType = RPC_SERVICE_TYPE
+	mNode.Schema = "grpc"
+	//mNode.MethodName = "HTTP"
+	//var descAddr string
+	// var method string
+	for _, attr := range event.Attributes {
+		switch attr.Key {
+		case "rpc.ip":
+			mNode.Ip = attr.Value.AsString()
+			//descAddr += mNode.Ip
+		case "rpc.port":
+			mNode.Port = attr.Value.AsInt64()
+		case "rpc.method":
+			//mNode.MethodName += " " + attr.Value.AsString()
+			// method = attr.Value.AsString()
+			mNode.MethodName = attr.Value.AsString()
+			//descAddr += ":" + attr.Value.AsString()
+		case "rpc.uri":
+			mNode.Uri = attr.Value.AsString()
+			//mNode.MethodName += " " + attr.Value.AsString()
+		case "rpc.oper_type":
+			mNode.OperType = attr.Value.AsString()
+		case "rpc.assumed_app_id":
+			mNode.AssumedAppId = attr.Value.AsInt64()
+		case "rpc.span_id":
+			mNode.SpanId = attr.Value.AsString()
+		case "time.start_at":
+			mNode.StartTime = uint64(attr.Value.AsInt64())
+		case "time.end_at":
+			mNode.EndTime = uint64(attr.Value.AsInt64())
+		case "time.duration":
+			//mNode.PureTime = uint64(attr.Value.AsInt64()) / 1e3
+			mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
+		}
+	}
+	// mNode.MethodName = fmt.Sprintf("%s %s %s:%d%s", "HTTP", method, mNode.Ip, mNode.Port, mNode.Uri)
+	//mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
+}
+
 func buildHttpMapFromEvent(mNode *MapInfoT, event tracesdk.Event) {
 	mNode.ServiceName = HTTP_SERVICE_NAME
 	mNode.ServiceType = HTTP_SERVICE_TYPE

+ 114 - 0
tracing/apm_tracing.go

@@ -129,6 +129,23 @@ func (t *Trace) AllEventReady(traceID uint64) bool {
 	return t.startEventReady && t.endEventReady && *t.currenEventCount >= t.needEventCount
 }
 
+func (t *Trace) GrpcServerTraceStartEvent(r *l7.RequestData, appInfo AppInfo) {
+	t.span.SetAttributes(attribute.String("rpc.uri", string(r.Payload)))
+	t.commonAttrs = []attribute.KeyValue{
+		// buildAppMapFromEvent
+		attribute.Int("server.code_type", appInfo.CodeType.Int()),
+		attribute.String("server.app_name", appInfo.AppName),
+		attribute.String("server.service_name", "GRPC"),
+		attribute.String("rpc.service_type", "RPC"),
+		attribute.String("rpc.oper_type", "PROVIDER"),
+		attribute.Int64("server.app_id", appInfo.AppIdHash.IntVal),
+		attribute.Int64("server.agent_id", appInfo.AgentId),
+		attribute.Int64("server.instance_id", appInfo.InstanceIdHash.IntVal),
+	}
+	t.span.SetAttributes(t.commonAttrs...)
+	t.startReady()
+}
+
 func (t *Trace) TraceStartEvent(method, path, sn string, sport uint16, status l7.Status, addr netaddr.IPPort, pid uint32, appInfo AppInfo, container_id string) {
 	t.span.SetAttributes(semconv.HTTPURL(fmt.Sprintf("http://%s:%d%s", sn, sport, path)),
 		semconv.HTTPMethod(method),
@@ -550,6 +567,103 @@ func (t *Trace) RedisTraceQueryEvent(cmd, args string, r *l7.RequestData, destin
 	t.createTraceEvent(l7.ProtocolRedis.String(), ebpftracer.EventTypeL7Request.Int(), l7.ProtocolRedis.Int(), attr...)
 }
 
+func (t *Trace) GrpcServerTraceQueryEvent(r *l7.RequestData, appInfo AppInfo) {
+	if t == nil {
+		return
+	}
+	t.addEvent()
+	// if method == "" {
+	// 	return
+	// }
+
+	assumedAppID, err := strconv.ParseInt(r.AssumedAppId, 10, 64)
+	if err != nil {
+		assumedAppID = 0
+	}
+
+	var attr []attribute.KeyValue
+	attr = append(attr,
+		semconv.RPCSystemGRPC,
+		attribute.String("rpc.uri", string(r.Payload)),
+		attribute.String("rpc.schema", "grpc"),
+		attribute.Int64("rpc.assumed_app_id", assumedAppID),
+		attribute.String("rpc.span_id", r.SpanId),
+		attribute.Int("rpc.port", appInfo.Sport),
+		attribute.String("rpc.ip", appInfo.Sn),
+		attribute.String("rpc.oper_type", "PROVIDER"),
+		attribute.String("rpc.method", "test_method"),
+	)
+
+	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
+	t.createTraceEvent(l7.ProtocolGrpc.String(), ebpftracer.EventTypeL7Request.Int(), l7.ProtocolGrpc.Int(), attr...)
+}
+
+func (t *Trace) GrpcClientTraceQueryEvent(r *l7.RequestData) {
+	if t == nil {
+		return
+	}
+	t.addEvent()
+	// if method == "" {
+	// 	return
+	// }
+
+	assumedAppID, err := strconv.ParseInt(r.AssumedAppId, 10, 64)
+	if err != nil {
+		assumedAppID = 0
+	}
+
+	// 解析 RPCTarget 字符串,格式为 "ip:port"
+	klog.Infof("r.RPCTarget is %s", r.RPCTarget)
+	var rpcIP, rpcPort string
+	if r.RPCTarget != "" {
+		// 清理空字节,只保留有效字符
+		cleanTarget := strings.TrimRight(r.RPCTarget, "\x00")
+		cleanTarget = strings.TrimSpace(cleanTarget)
+		klog.Infof("cleanTarget is %s", cleanTarget)
+
+		if colonIndex := strings.LastIndex(cleanTarget, ":"); colonIndex != -1 {
+			rpcIP = cleanTarget[:colonIndex]
+			rpcPort = cleanTarget[colonIndex+1:]
+			klog.Infof("rpcIP is %s, rpcPort is %s", rpcIP, rpcPort)
+		} else {
+			// 如果没有找到冒号,可能是只有IP或只有端口
+			rpcIP = cleanTarget
+		}
+	}
+
+	// 设置默认值
+	if rpcIP == "" {
+		rpcIP = "127.0.0.1"
+	}
+	if rpcPort == "" {
+		rpcPort = "80"
+	}
+
+	// 转换端口为整数
+	portInt, err := strconv.Atoi(rpcPort)
+	if err != nil {
+		klog.Infof("r.RPCTarget convert port err is %s", err)
+		portInt = 80 // 默认端口
+	} else {
+		klog.Infof("r.RPCTarget convert port success, portInt is %d", portInt)
+	}
+
+	var attr []attribute.KeyValue
+	attr = append(attr,
+		semconv.RPCSystemGRPC,
+		attribute.String("rpc.uri", string(r.Payload)),
+		attribute.String("rpc.schema", "grpc"),
+		attribute.Int64("rpc.assumed_app_id", assumedAppID),
+		attribute.String("rpc.span_id", r.SpanId),
+		attribute.Int("rpc.port", portInt),
+		attribute.String("rpc.ip", rpcIP),
+		attribute.String("rpc.oper_type", "CONSUMER"),
+		attribute.String("rpc.method", "test_method"),
+	)
+
+	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
+	t.createTraceEvent(l7.ProtocolGrpc.String(), ebpftracer.EventTypeL7Request.Int(), l7.ProtocolGrpc.Int(), attr...)
+}
 func (t *Trace) MongoTraceQueryEvent(query string, r *l7.RequestData, destination netaddr.IPPort) {
 	if t == nil {
 		return