Ver Fonte

Feature #TASK_QT-31498 【Q4】eBPF-Go cassandra 适配

Carl há 6 meses atrás
pai
commit
556aa6f0fb

+ 5 - 4
containers/container.go

@@ -103,10 +103,11 @@ type ActiveConnection struct {
 	FirstWriteTime uint64
 	NewReadTime    uint64
 
-	http2Parser    *l7.Http2Parser
-	postgresParser *l7.PostgresParser
-	mysqlParser    *l7.MysqlParser
-	dmParser       *l7.DmParser
+	http2Parser      *l7.Http2Parser
+	postgresParser   *l7.PostgresParser
+	mysqlParser      *l7.MysqlParser
+	dmParser         *l7.DmParser
+	cassandraParser  *l7.CassandraParser
 }
 
 type ActiveAccept struct {

+ 63 - 27
containers/container_apm.go

@@ -198,6 +198,10 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 			return nil
 		}
 	}
+
+	/**
+	 * HTTP
+	 */
 	if r.Protocol == l7.ProtocolHTTP {
 		if c.l7Attach && c.valuableTrace(r.TraceId) {
 			method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
@@ -210,35 +214,44 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 		}
 		//return nil
 	}
+
+	/**
+	 * gRPC
+	 */
+	if r.Protocol == l7.ProtocolGrpc {
+		klog.Infoln("conn == nil r.Protocol == l7.ProtocolGrpc")
+		klog.Infoln("enter the l7.ProtocolGrpc")
+		if c.l7Attach && c.valuableTrace(r.TraceId) {
+			apmTrace, err := c.getOrInitTrace(r.TraceId)
+			if err == nil {
+				apmTrace.GrpcClientTraceQueryEvent(r)
+				c.SendEvent(apmTrace, r.TraceId)
+			}
+		}
+	}
 	conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
 	//fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
 
 	if conn == nil {
-		if r.Protocol == l7.ProtocolGrpc {
-			klog.Infoln("conn == nil r.Protocol == l7.ProtocolGrpc")
-			klog.Infoln("enter the l7.ProtocolGrpc")
-			if c.l7Attach && c.valuableTrace(r.TraceId) {
-				apmTrace, err := c.getOrInitTrace(r.TraceId)
-				if err == nil {
-					apmTrace.GrpcClientTraceQueryEvent(r)
-					c.SendEvent(apmTrace, r.TraceId)
-				}
-			}
+		conn = &ActiveConnection{
+			Dest:       r.ComponentDAddr,
+			ActualDest: r.ComponentDAddr,
+			Timestamp:  timestamp,
 		}
-		return nil
+		//return nil
 	}
 	if timestamp != 0 && conn.Timestamp != timestamp {
-		if r.Protocol == l7.ProtocolGrpc {
-			klog.Infoln("timestamp != 0 && conn.Timestamp != timestamp r.Protocol == l7.ProtocolGrpc")
-			klog.Infoln("enter the l7.ProtocolGrpc")
-			if c.l7Attach && c.valuableTrace(r.TraceId) {
-				apmTrace, err := c.getOrInitTrace(r.TraceId)
-				if err == nil {
-					apmTrace.GrpcClientTraceQueryEvent(r)
-					c.SendEvent(apmTrace, r.TraceId)
-				}
-			}
-		}
+		//if r.Protocol == l7.ProtocolGrpc {
+		//	klog.Infoln("timestamp != 0 && conn.Timestamp != timestamp r.Protocol == l7.ProtocolGrpc")
+		//	klog.Infoln("enter the l7.ProtocolGrpc")
+		//	if c.l7Attach && c.valuableTrace(r.TraceId) {
+		//		apmTrace, err := c.getOrInitTrace(r.TraceId)
+		//		if err == nil {
+		//			apmTrace.GrpcClientTraceQueryEvent(r)
+		//			c.SendEvent(apmTrace, r.TraceId)
+		//		}
+		//	}
+		//}
 		return nil
 	}
 	stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
@@ -374,7 +387,7 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 					joined := fmt.Sprintf("[%s]", strings.Join(items, " "))
 					statement += " " + joined
 				}
-				apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMemcached, cmd, statement, r, conn.ActualDest)
+				apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMemcached, cmd, statement, r, conn.Src, conn.ActualDest)
 				c.SendEvent(apmTrace, r.TraceId)
 			}
 		}
@@ -398,7 +411,7 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 				if args != "" {
 					statement += " " + args
 				}
-				apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemRedis, cmd, statement, r, conn.ActualDest)
+				apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemRedis, cmd, statement, r, conn.Src, conn.ActualDest)
 				c.SendEvent(apmTrace, r.TraceId)
 			}
 		}
@@ -430,14 +443,37 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 			apmTrace, err := c.getOrInitTrace(r.TraceId)
 			if err == nil {
 				// MongoDB query 格式通常是 JSON,如 {"insert":"users"} 或 {"find":"users","filter":{...}}
-				apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMongoDB, "", query, r, conn.ActualDest)
+				apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMongoDB, "", query, r, conn.Src, conn.ActualDest)
+				c.SendEvent(apmTrace, r.TraceId)
+			}
+		}
+	/**
+	 * Cassandra
+	 */
+	case l7.ProtocolCassandra:
+		stats.observe(r.Status.String(), "", r.Duration)
+		if c.l7Attach && c.valuableTrace(r.TraceId) {
+			if conn.cassandraParser == nil {
+				conn.cassandraParser = l7.NewCassandraParser()
+			}
+			var query string
+			query = string(r.Payload)
+			//query := conn.cassandraParser.Parse(r.Payload)
+			if c.AppInfo.AppName != "" {
+				klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
+			}
+
+			apmTrace, err := c.getOrInitTrace(r.TraceId)
+			if err == nil {
+				apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemCassandra, "", query, r, conn.Src, conn.ActualDest)
 				c.SendEvent(apmTrace, r.TraceId)
 			}
 		}
+
 	/**
-	 * Kafka / Cassandra
+	 * Kafka
 	 */
-	case l7.ProtocolKafka, l7.ProtocolCassandra:
+	case l7.ProtocolKafka:
 		stats.observe(r.Status.String(), "", r.Duration)
 		if c.l7Attach && c.valuableTrace(r.TraceId) {
 		}

+ 3 - 2
containers/process.go

@@ -4,13 +4,14 @@ import (
 	"bytes"
 	"context"
 	"fmt"
-	"github.com/coroot/coroot-node-agent/ebpftracer/tracer/jattach"
-	. "github.com/coroot/coroot-node-agent/utils/modelse"
 	"os"
 	"path/filepath"
 	"strings"
 	"time"
 
+	"github.com/coroot/coroot-node-agent/ebpftracer/tracer/jattach"
+	. "github.com/coroot/coroot-node-agent/utils/modelse"
+
 	"github.com/cilium/ebpf/link"
 	"github.com/coroot/coroot-node-agent/ebpftracer"
 	"github.com/coroot/coroot-node-agent/proc"

+ 2 - 0
containers/registry.go

@@ -516,6 +516,8 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 			case ebpftracer.EventTypeL7Request:
 
 				klog.Debugln("e.L7Request Payload:", string(e.L7Request.Payload))
+				klog.Debugln("e.L7Request Payload:", e.L7Request.Protocol)
+				klog.Debugln("e.L7Request Payload:", e.L7Request.TraceId)
 				if e.L7Request == nil {
 					continue
 				}

+ 3 - 0
ebpftracer/ebpf/ebpf.c

@@ -51,15 +51,18 @@
 #include "tcp/state.c"
 #include "tcp/retransmit.c"
 //#include "l7/uprobe_base_bpf.c"
+#include "l7/apm_trace.c"
 #include "l7/l7.c"
 #include "l7/gotls.c"
 //#include "l7/openssl.c"
+//#include "utrace/go/base.probe.bpf.c"
 #include "utrace/go/net/server.probe.bpf.c"
 #include "utrace/go/net/client.probe.bpf.c"
 #include "utrace/go/net/stack.probe.bpf.c"
 #include "utrace/go/net/jvmstack.probe.bpf.c"
 #include "utrace/go/net/grpc.server.probe.bpf.c"
 #include "utrace/go/net/grpc.client.probe.bpf.c"
+#include "utrace/go/db/gocql.probe.bpf.c"
 
 #include "utrace/java/net/server.probe.bpf.c"
 #include "utrace/java/net/client.probe.bpf.c"

+ 18 - 0
ebpftracer/ebpf/include/apm_trace.h

@@ -60,6 +60,10 @@ struct goid_trace_key_t {
 	__u64 goid;
 };
 
+struct fd_trace_poll_times_t {
+	__u32 start_count;
+	__u32 end_count;
+};
 
 //struct apm_span_context {
 //	unsigned char TraceID[APM_TRACE_ID_SIZE];
@@ -99,6 +103,20 @@ struct apm_trace_info_t {
 //	struct apm_span_context sc;
 };
 
+// ---------- 业务上下文(thread_ctx) ----------
+struct thread_ctx_t {
+	__u64 token;         // 追踪 token / trace_id(入口处写入)
+	__u64 ts_ns;         // 最近一次刷新时间
+	__u64 exp_ns;        // 过期时间(now + TTL)
+	__u32 root_thread;   // 主线程
+	__u32 tgid;          // 所属进程
+	__u32 parent_tid;    // 父线程(用于调试/回溯)
+	__u8  is_main_thread;// 主线程
+	__u16 level;         // 继承层数
+	struct apm_trace_info_t *trace_info;
+	struct apm_trace_key_t trace_key;
+};
+
 static __always_inline void cw_copy_byte_arrays(unsigned char *src, unsigned char *dst, __u32 size) {
 	for (int i = 0; i < size; i++) {
 		dst[i] = src[i];

+ 15 - 0
ebpftracer/ebpf/l7/apm_trace.c

@@ -2,6 +2,21 @@
 // Created by Carl.Guo on 2024/4/1.
 //
 
+struct {
+	__uint(type, BPF_MAP_TYPE_LRU_HASH);
+	__type(key, __u32);
+	__type(value, struct thread_ctx_t);
+	__uint(max_entries, 65535);
+} thread_ctx_map SEC(".maps");
+
+
+struct {
+	__uint(type, BPF_MAP_TYPE_LRU_HASH);
+	__type(key, struct fd_trace_key_t);
+	__type(value, struct fd_trace_poll_times_t);
+	__uint(max_entries, 65535);
+} l7_request_fd_pool_map SEC(".maps");
+
 struct {
 	__uint(type, BPF_MAP_TYPE_LRU_HASH);
 	__uint(key_size, sizeof(struct apm_trace_key_t));

+ 4 - 4
ebpftracer/ebpf/l7/cassandra.c

@@ -6,6 +6,7 @@
 #define CASSANDRA_OPCODE_ERROR      0x00
 #define CASSANDRA_OPCODE_QUERY      0x07
 #define CASSANDRA_OPCODE_RESULT     0x08
+#define CASSANDRA_OPCODE_PREPARE    0x09
 #define CASSANDRA_OPCODE_EXECUTE    0x0A
 #define CASSANDRA_OPCODE_BATCH      0x0D
 
@@ -27,7 +28,7 @@ int is_cassandra_request(char *buf, __u64 buf_size, __s16 *stream_id) {
     if (h.version != CASSANDRA_REQUEST_FRAME) {
         return 0;
     }
-    if (h.opcode == CASSANDRA_OPCODE_QUERY || h.opcode == CASSANDRA_OPCODE_EXECUTE || h.opcode == CASSANDRA_OPCODE_BATCH) {
+    if (h.opcode == CASSANDRA_OPCODE_QUERY || h.opcode == CASSANDRA_OPCODE_EXECUTE || h.opcode == CASSANDRA_OPCODE_BATCH || h.opcode == CASSANDRA_OPCODE_PREPARE) {
         *stream_id = h.stream_id;
         return 1;
     }
@@ -37,7 +38,7 @@ int is_cassandra_request(char *buf, __u64 buf_size, __s16 *stream_id) {
 static __always_inline
 int is_cassandra_response(char *buf, __u64 buf_size, __s16 *stream_id, __u32 *status) {
 	return 0;
-    struct cassandra_header h = {};
+	struct cassandra_header h = {};
     if (buf_size < sizeof(h)) {
         return 0;
     }
@@ -56,5 +57,4 @@ int is_cassandra_response(char *buf, __u64 buf_size, __s16 *stream_id, __u32 *st
         return 1;
     }
     return 0;
-}
-
+}

+ 22 - 4
ebpftracer/ebpf/l7/l7.c

@@ -60,7 +60,6 @@
 #include "dubbo2.c"
 #include "dns.c"
 #include "dm.c"
-#include "apm_trace.c"
 
 // go type l7Event struct && type RequestData struct
 struct l7_event {
@@ -372,6 +371,8 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
     if (load_filter_pid() != 0 && pid != load_filter_pid()) {
         return 0;
     }
+//	bpf_printk("trace_enter_write fd(%d) ,size(%d)", fd, size);
+
     char* payload = buf;
     if (iovlen) {
         payload = bpf_map_lookup_elem(&iovec_buf_heap, &zero);
@@ -421,6 +422,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
 		}
         __u64 trace_id = start_trace_info->trace_id;
 	    __u32 event_count = cw_get_event_count(trace_id);
+//	    bpf_printk("[Trace End in l7] count(%d) %llu ", event_count, trace_id);
         cw_bpf_debug("[uprobeThread/pidpidpidpid][Trace End in l7][HTTP]pid:[%d]--[%lld]", tid, bpf_ktime_get_ns());
 	    cw_bpf_debug("[Trace End in l7][Response][HTTP] event_count:%d", event_count);
 	    cw_bpf_debug("[Trace End in l7][Response][HTTP] pid:%d,fd:%d,trace_id:%llu", tid, fd, trace_id);
@@ -673,7 +675,14 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         send_event(ctx, e, cid, conn);
         return 0;
     } else if (is_cassandra_request(payload, size, &k.stream_id)) {
-        req->protocol = PROTOCOL_CASSANDRA;
+//	    bpf_printk("[cassandra] [start] fd(%d) stream_id(%d) goid(%d)", k.fd, k.stream_id, get_current_goroutine());
+	    __u32 ctx_id =get_current_goroutine();
+	    struct thread_ctx_t *current_ctx = bpf_map_lookup_elem(&thread_ctx_map, &ctx_id);
+	    if (current_ctx) {
+		    req->trace_id  =  current_ctx->token;
+			bpf_map_delete_elem(&thread_ctx_map, &ctx_id);
+		}
+	    req->protocol = PROTOCOL_CASSANDRA;
     } else if (is_kafka_request(payload, size, &req->request_id)) {
         req->protocol = PROTOCOL_KAFKA;
         struct l7_request *prev_req = bpf_map_lookup_elem(&active_l7_requests, &k);
@@ -848,8 +857,8 @@ int trace_exit_read_common(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long in
         e->protocol = PROTOCOL_TRACE;
         e->trace_id = trace_info.trace_id;
 	    cw_bpf_debug("\n");
-		cw_bpf_debug("[Trace Start in l7][HTTP]pid:[%d]--[%lld]--trace_id:%llu\n", tid, bpf_ktime_get_ns(),trace_info.trace_id);
-	    cw_bpf_debug("[Trace Start in l7][Receive][HTTP]pid:[%d]|GOID:[%d]|FD:%d|Trace:%llu\n", tid, trace_info.trace_key.goid,k.fd);
+		cw_bpf_debug("[Trace Start in l7][HTTP]pid:[%d]--[%lld]--trace_id:%llu\n", pid, bpf_ktime_get_ns(),trace_info.trace_id);
+	    cw_bpf_debug("[Trace Start in l7][Receive][HTTP]tid:[%d]|GOID:[%d]|FD:%d\n", tid, trace_info.trace_key.goid,k.fd);
         e->payload_size = ret;
         COPY_PAYLOAD(e->payload, ret, payload);
 
@@ -934,6 +943,15 @@ int trace_exit_read_common(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long in
             send_event(ctx, e, cid, conn);
             bpf_map_delete_elem(&active_l7_requests, &k);
             return 0;
+        } else if (is_cassandra_response(payload, ret, &k.stream_id, &e->status)) {
+//	        bpf_printk("[cassandra end] fd(%d) k.stream_id(%d) goid(%d)", k.fd,k.stream_id,get_current_goroutine());
+
+	        req = bpf_map_lookup_elem(&active_l7_requests, &k);
+	        if (!req) {
+		        return 0;
+	        }
+	        e->trace_id = req->trace_id;
+	        response = 1;
         } else {
 //	        cw_bpf_debug("bb 6:[0x%x] k.pid:%d, k.fd:%d",b[4],k.pid,k.fd);
             return 0;

+ 3 - 4
ebpftracer/ebpf/tcp/state.c

@@ -507,9 +507,8 @@ int sys_exit_accept(struct sys_exit_accept_ctx *ctx)
     }
 
     // 从地图中移除项目,避免泄漏  
-    bpf_map_delete_elem(&socket_map, &pid_tgid);  
-
-    return 0;  
+    bpf_map_delete_elem(&socket_map, &pid_tgid);
+    return 0;
 }
 
 // 在系统调用accept返回时挂钩获取文件描述符  
@@ -591,7 +590,7 @@ int tracepoint__sys_exit_accept4(struct sys_exit_accept4_ctx *ctx) {
     }
 
     // 从地图中移除项目,避免泄漏  
-    bpf_map_delete_elem(&socket_map, &pid_tgid);  
+    bpf_map_delete_elem(&socket_map, &pid_tgid);
 
     return 0;  
 }  

+ 10 - 3
ebpftracer/ebpf/uprobe_base_bpf.c

@@ -493,7 +493,7 @@ int enter_runtime_newproc1(struct pt_regs *ctx)
 	if (!goid) {
 		return 0;
 	}
-	// debug("[Go] [runtime.newproc1] goid:%llu",goid);
+//	bpf_printk("[Go] [runtime.newproc1] goid:%llu",goid);
 
 	struct go_newproc_caller caller = {
 			.goid = goid,
@@ -552,11 +552,11 @@ int exit_runtime_newproc1(struct pt_regs *ctx)
 		bpf_map_delete_elem(&pid_tgid_callerid_map, &pid_tgid);
 		return 0;
 	}
-	// debug("[Go] [runtime.newproc1.exit] current->goid:%llu",goid);
+//	 bpf_printk("[Go] [runtime.newproc1.exit] current->goid:%llu",goid);
 	// 生成当前协程key
 	struct go_key key = { .tgid = tgid, .goid = goid };
 	goid = caller->goid;
-	// debug("[Go] [runtime.newproc1.exit] caller->goid:%llu",goid);	goid = caller->goid;
+//	bpf_printk("[Go] [runtime.newproc1.exit] caller->goid:%llu",goid);	goid = caller->goid;
 	bpf_map_update_elem(&go_ancerstor_map, &key, &goid, BPF_ANY);
 
 	bpf_map_delete_elem(&pid_tgid_callerid_map, &pid_tgid);
@@ -641,6 +641,13 @@ int enter_runtime_runqget(struct pt_regs *ctx)
 
 }
 
+// runtime.goready 用于唤醒一个 goroutine
+// 函数签名: func goready(gp *g, traceskip int)
+// 参数:
+//   - gp *g: 第一个参数,要被唤醒的 goroutine
+//   - traceskip int: 第二个参数,用于堆栈跟踪
+// 在这里建立唤醒者和被唤醒者的关系
+
 // /sys/kernel/debug/tracing/events/sched/sched_process_exit/format
 SEC("tracepoint/sched/sched_process_exit")
 int bpf_func_sched_process_exit(struct sched_comm_exit_ctx *ctx)

+ 295 - 0
ebpftracer/ebpf/utrace/go/db/gocql.probe.bpf.c

@@ -0,0 +1,295 @@
+//
+// Created by Carl.Guo on 2025/11/12.
+//
+
+#include "arguments.h"
+#include "span_context.h"
+#include "go_context.h"
+#include "go_types.h"
+#include "uprobe.h"
+#include "go_common.h"
+
+#define MAX_STMT_LEN 128  // 最大语句长度
+
+// Query 结构体中 stmt 字段的偏移量
+// 如果 stmt 是 Query 结构体的第一个字段,偏移量为 0
+// 可以通过 DWARF 信息动态获取,这里使用可配置的偏移量
+volatile const __u64 query_stmt_offset = 0;
+
+// 存储 gocql 查询请求的上下文
+struct gocql_query_context_t {
+    BASE_SPAN_PROPERTIES
+    char stmt[MAX_STMT_LEN];
+    __u64 stmt_size;
+};
+
+
+struct {
+	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+	__uint(key_size, sizeof(u32));
+	__uint(value_size, sizeof(struct gocql_query_context_t));
+	__uint(max_entries, 1);
+} gocql_query_storage_map SEC(".maps");
+
+struct {
+    __uint(type, BPF_MAP_TYPE_HASH);
+    __type(key, void *);
+    __type(value, struct gocql_query_context_t);
+    __uint(max_entries, MAX_CONCURRENT);
+} gocql_query_events SEC(".maps");
+
+#define MAX_CONCURRENT 50
+
+#define MAX_QUERY_SIZE 256
+
+struct sql_request_t {
+	BASE_SPAN_PROPERTIES
+	char query[MAX_QUERY_SIZE];
+};
+
+struct {
+	__uint(type, BPF_MAP_TYPE_HASH);
+	__type(key, void *);
+	__type(value, struct sql_request_t);
+	__uint(max_entries, MAX_CONCURRENT);
+} sql_events SEC(".maps");
+
+// This instrumentation attaches uprobe to the following function:
+// func (s *Session) Query(stmt string, values ...interface{}) *Query
+/*
+SEC("uprobe/Session_Query")
+int uprobe_Session_Query(struct pt_regs *ctx) {
+	cw_bpf_debug("[Session_Query] 0");
+
+    __u64 pid_tgid = bpf_get_current_pid_tgid();
+    __u32 tgid = pid_tgid >> 32;
+    
+    struct ebpf_proc_info *info = bpf_map_lookup_elem(&proc_info_map, &tgid);
+    if (!info) {
+        return 0;
+    }
+
+    // 获取 stmt string 参数
+    // 在 Go 的调用约定中,string 参数被拆分成两个独立的参数:指针和长度
+    // 参数顺序:1 = s *Session (receiver), 2 = stmt 指针, 3 = stmt 长度
+    void *stmt_ptr = get_argument(ctx, 2);
+    __u64 stmt_len = (__u64)get_argument(ctx, 3);
+    
+    cw_bpf_debug("[Session_Query] stmt_ptr=%llx", (__u64)stmt_ptr);
+    cw_bpf_debug("[Session_Query] stmt_len=%llu", stmt_len);
+    
+    if (stmt_ptr == 0 || stmt_len == 0) {
+        return 0;
+    }
+    
+    if (stmt_len > MAX_STMT_LEN) {
+        stmt_len = MAX_STMT_LEN;
+    }
+
+    char stmt_buf[MAX_STMT_LEN + 1] = {0};
+
+    // 使用固定大小读取,验证器可以确定读取大小的上界
+    // sizeof(stmt_buf) - 1 是编译时常量,验证器可以验证
+    long res = bpf_probe_read(stmt_buf, sizeof(stmt_buf) - 1, stmt_ptr);
+    if (res != 0) {
+        return 0;
+    }
+
+    // 确保字符串以 null 结尾
+    stmt_buf[MAX_STMT_LEN] = '\0';
+    
+    // 打印 stmt
+    cw_bpf_debug("[Session_Query] stmt: %s", stmt_buf);
+
+    return 0;
+}
+*/
+
+// This instrumentation attaches uprobe to the following function:
+// func (s *Session) executeQuery(qry *Query) (it *Iter)
+SEC("uprobe/Session_executeQuery")
+int uprobe_Session_executeQuery(struct pt_regs *ctx) {
+    __u64 pid_tgid = bpf_get_current_pid_tgid();
+    __u32 tgid = pid_tgid >> 32;
+    
+//    cw_bpf_debug("[gocql] enter executeQuery");
+//    cw_bpf_debug("[gocql] tgid=%u", tgid);
+    
+    struct ebpf_proc_info *info = bpf_map_lookup_elem(&proc_info_map, &tgid);
+    if (!info) {
+        cw_bpf_debug("[gocql] no proc_info");
+        return 0;
+    }
+
+    // 获取第二个参数 qry *Query
+    // 参数顺序:1 = s *Session (receiver), 2 = qry *Query
+    void *qry_ptr = get_argument(ctx, 2);
+//    cw_bpf_debug("[gocql] qry_ptr=%llx", (__u64)qry_ptr);
+    if (qry_ptr == 0) {
+        cw_bpf_debug("[gocql] qry_ptr is null");
+        return 0;
+    }
+
+    // 获取 consistent key (使用 goroutine 地址作为 key)
+    void *key = (void *)GOROUTINE(ctx);
+//    cw_bpf_debug("[gocql] key=%llx", (__u64)key);
+    
+    // 检查是否已经存在该请求
+    struct gocql_query_context_t *existing = bpf_map_lookup_elem(&gocql_query_events, &key);
+    if (existing != NULL) {
+        cw_bpf_debug("[gocql] already exists");
+        return 0;
+    }
+
+
+
+	// 从 Query 结构体中读取 stmt 字段
+    // stmt 是一个 string 类型,在 Go 中是 go_string_ot 结构体
+    struct go_string_ot stmt_str = {0};
+    void *stmt_ptr = (void *)((char *)qry_ptr + query_stmt_offset);
+//    cw_bpf_debug("[gocql] stmt_ptr=%llx", (__u64)stmt_ptr);
+
+    // 读取 string 结构体
+    long res = bpf_probe_read(&stmt_str, sizeof(stmt_str), stmt_ptr);
+//    cw_bpf_debug("[gocql] read res=%ld", res);
+//    cw_bpf_debug("[gocql] read ken=%ld", stmt_str.len);
+//    cw_bpf_debug("[gocql] sql : %s", stmt_str.str);
+//	return 0;
+
+    if (res != 0) {
+        return 0;
+    }
+//    cw_bpf_debug("[gocql] stmt len=%lld", stmt_str.len);
+
+    // 分配存储空间
+	u32 zero = 0;
+	struct gocql_query_context_t *query_ctx = bpf_map_lookup_elem(&gocql_query_storage_map, &zero);
+	if (query_ctx == NULL) {
+		cw_bpf_debug("grpc:client:ClientConn_Invoke: failed to get storage");
+		return -1;
+	}
+
+	__builtin_memset(query_ctx, 0, sizeof(struct gocql_query_context_t));
+
+	query_ctx->start_time = bpf_ktime_get_ns();
+	query_ctx->stmt_size = stmt_str.len;
+
+    // 读取 stmt 字符串内容
+//    __u64 stmt_size = stmt_str.len > MAX_STMT_LEN ? MAX_STMT_LEN : stmt_str.len;
+//    cw_bpf_debug("[gocql] stmt_size=%llu", stmt_size);
+    res = bpf_probe_read(query_ctx->stmt, sizeof(query_ctx->stmt), stmt_str.str);
+//    cw_bpf_debug("[gocql] read stmt res=%ld", res);
+    if (res != 0) {
+        cw_bpf_debug("[gocql] read stmt failed");
+        return 0;
+    }
+
+//    // 确保字符串以 null 结尾
+//    if (stmt_size < MAX_STMT_LEN) {
+//        query_ctx->stmt[stmt_size] = '\0';
+//    } else {
+//        query_ctx->stmt[MAX_STMT_LEN - 1] = '\0';
+//    }
+
+    // 获取 context
+//    void *context_ptr_val = get_Go_context(ctx, 2, 0, true);
+//    cw_bpf_debug("[gocql] context_ptr=%llx", (__u64)context_ptr_val);
+//    if (context_ptr_val != 0) {
+//        start_tracking_span(context_ptr_val, &query_ctx.sc);
+//    }
+
+    // 保存到 map
+     bpf_map_update_elem(&gocql_query_events, &key, query_ctx, 0);
+//    cw_bpf_debug("[gocql] save res=%ld", update_res);
+//    cw_bpf_debug("[gocql] stmt_size=%llu", query_ctx->stmt_size);
+//    cw_bpf_debug("[gocql] stmt=%s", query_ctx->stmt);
+
+//    if (query_ctx.stmt_size > 0 && query_ctx.stmt_size <= 64) {
+//        cw_bpf_debug("[gocql] stmt: %s", query_ctx.stmt);
+//    }
+
+    return 0;
+}
+
+// This instrumentation attaches uprobe to the return of executeQuery
+SEC("uprobe/Session_executeQuery")
+int uprobe_Session_executeQuery_Returns(struct pt_regs *ctx) {
+    __u64 pid_tgid = bpf_get_current_pid_tgid();
+    __u32 tgid = pid_tgid >> 32;
+    __u32 pid = pid_tgid;
+    
+    cw_bpf_debug("[gocql] enter Returns");
+//    cw_bpf_debug("[gocql] tgid=%u", tgid);
+//    return 0;
+    struct ebpf_proc_info *info = bpf_map_lookup_elem(&proc_info_map, &tgid);
+    if (!info) {
+        cw_bpf_debug("[gocql] Returns no info");
+        return 0;
+    }
+
+    // 获取 consistent key
+    void *key = (void *)GOROUTINE(ctx);
+//    cw_bpf_debug("[gocql] Returns key=%llx", (__u64)key);
+    
+    // 查找对应的请求
+    struct gocql_query_context_t *query_ctx = bpf_map_lookup_elem(&gocql_query_events, &key);
+    if (query_ctx == 0) {
+        cw_bpf_debug("[gocql] Returns no ctx");
+        return 0;
+    }
+//    cw_bpf_debug("[gocql] Returns found ctx");
+
+    // 更新结束时间
+    query_ctx->end_time = bpf_ktime_get_ns();
+//    cw_bpf_debug("[gocql] Returns stmt_size=%llu", query_ctx->stmt_size);
+//    cw_bpf_debug("[gocql] Returns duration=%llu", query_ctx->end_time - query_ctx->start_time);
+//
+//	cw_bpf_debug("[gocql] Returns stmt=%s", query_ctx->stmt);
+
+    // 停止跟踪 span
+//    stop_tracking_span(&query_ctx->sc, &query_ctx->psc);
+
+    __u32 zero = 0;
+    struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+    if (!e) {
+        cw_bpf_debug("[gocql] Returns no event");
+        bpf_map_delete_elem(&gocql_query_events, &key);
+        return 0;
+    }
+
+    // 设置事件属性
+    e->protocol = PROTOCOL_CASSANDRA;
+    e->pid = tgid;
+    e->start_at = query_ctx->start_time;
+    e->end_at = query_ctx->end_time;
+    e->duration = e->end_at - e->start_at;
+    e->payload_size = query_ctx->stmt_size;
+//    cw_bpf_debug("[gocql] Returns payload_size=%llu", e->payload_size);
+    
+    COPY_PAYLOAD(e->payload, query_ctx->stmt_size, query_ctx->stmt);
+
+    struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
+    struct apm_trace_info_t *trace_info = get_apm_trace_info_by_trace_key(trace_key);
+    
+    if (trace_info == 0) {
+        trace_info = get_apm_trace_info_v3(trace_key, pid_tgid, tgid, pid_tgid);
+    }
+    
+    if (trace_info) {
+        e->trace_id = trace_info->trace_id;
+    }
+    
+    if (e->trace_id == 0) {
+        e->trace_id = get_apm_trace_id(tgid, pid_tgid);
+    }
+    // 发送事件
+    long error = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+    if (error == 0) {
+        cw_add_event_count(e->trace_id);
+        cw_bpf_debug("[gocql] Returns success");
+    }
+    // 清理
+    bpf_map_delete_elem(&gocql_query_events, &key);
+    cw_bpf_debug("[gocql] Returns done %llu",e->trace_id);
+    return 0;
+}

+ 427 - 0
ebpftracer/l7/cassandra.go

@@ -0,0 +1,427 @@
+package l7
+
+import (
+	"encoding/binary"
+	"fmt"
+)
+
+const (
+	CassandraHeaderSize = 9 // version(1) + flags(1) + stream_id(2) + opcode(1) + length(4)
+
+	CassandraOpcodeQuery   = 0x07
+	CassandraOpcodeExecute = 0x0A
+	CassandraOpcodeBatch   = 0x0D
+	CassandraOpcodePrepare = 0x09
+	CassandraOpcodeClose   = 0x04
+)
+
+type CassandraParser struct {
+	preparedStatements map[string]string // statement_id (hex string) -> query
+	pendingPrepares    map[int16]string  // stream_id -> query (用于关联 PREPARE 请求和响应)
+	lastPrepareQuery   string            // 最近一次 PREPARE 的查询语句(备用方案)
+}
+
+func NewCassandraParser() *CassandraParser {
+	return &CassandraParser{
+		preparedStatements: make(map[string]string),
+		pendingPrepares:    make(map[int16]string),
+		lastPrepareQuery:   "",
+	}
+}
+
+// ParseCassandra 解析 Cassandra CQL 查询语句
+// 支持 QUERY、EXECUTE、BATCH、PREPARE 操作
+func ParseCassandra(payload []byte) string {
+	if len(payload) < CassandraHeaderSize {
+		return ""
+	}
+
+	// 读取 header
+	version := payload[0]
+	if version != 0x04 { // REQUEST_FRAME
+		return ""
+	}
+
+	opcode := payload[4]
+	bodyLength := int(binary.BigEndian.Uint32(payload[5:9]))
+
+	if len(payload) < CassandraHeaderSize+bodyLength {
+		// 数据不完整,但尝试解析
+		bodyLength = len(payload) - CassandraHeaderSize
+	}
+
+	body := payload[CassandraHeaderSize:]
+	if len(body) > bodyLength {
+		body = body[:bodyLength]
+	}
+
+	switch opcode {
+	case CassandraOpcodeQuery:
+		return parseQuery(body)
+	case CassandraOpcodeExecute:
+		return parseExecute(body)
+	case CassandraOpcodeBatch:
+		return parseBatch(body)
+	default:
+		return ""
+	}
+}
+
+// parseQuery 解析 QUERY 操作
+// QUERY 格式: query string (long string) + consistency (2 bytes) + flags (1 byte) + ...
+// long string 格式: 4 bytes (length, big-endian) + string bytes (NOT null-terminated)
+func parseQuery(body []byte) string {
+	if len(body) < 4 {
+		return ""
+	}
+
+	// 读取 long string 的长度(4 bytes, big-endian)
+	queryLen := int(binary.BigEndian.Uint32(body[0:4]))
+	if queryLen <= 0 {
+		return ""
+	}
+
+	// 检查是否有足够的数据
+	availableData := len(body) - 4
+	if queryLen > availableData {
+		// 数据被截断,只读取可用的部分
+		if availableData > 0 {
+			queryBytes := body[4 : 4+availableData]
+			// 尝试找到字符串的结束位置(可能是null terminator或有效字符串的末尾)
+			// 但Cassandra协议中long string不是null-terminated,所以直接读取
+			query := string(queryBytes)
+			// 检查是否包含可打印字符
+			if isPrintableString(queryBytes) {
+				return query + "..."
+			}
+			return ""
+		}
+		return ""
+	}
+
+	// 读取完整的查询字符串
+	queryBytes := body[4 : 4+queryLen]
+
+	// Cassandra的long string不是null-terminated,但某些实现可能会添加null terminator
+	// 检查并移除末尾的null terminator(如果存在)
+	if len(queryBytes) > 0 && queryBytes[len(queryBytes)-1] == 0 {
+		queryBytes = queryBytes[:len(queryBytes)-1]
+	}
+
+	// 验证是否为有效的字符串(包含可打印字符)
+	if !isPrintableString(queryBytes) {
+		return ""
+	}
+
+	query := string(queryBytes)
+	return query
+}
+
+// isPrintableString 检查字节数组是否包含可打印字符
+func isPrintableString(b []byte) bool {
+	if len(b) == 0 {
+		return false
+	}
+	printableCount := 0
+	for _, c := range b {
+		// 允许可打印ASCII字符、换行符、制表符等
+		if c >= 32 && c <= 126 || c == '\n' || c == '\r' || c == '\t' {
+			printableCount++
+		}
+	}
+	// 如果至少80%的字符是可打印的,认为是有效字符串
+	return printableCount*100/len(b) >= 80
+}
+
+// parseExecute 解析 EXECUTE 操作
+// EXECUTE 格式: statement id (short bytes) + ...
+func parseExecute(body []byte) string {
+	if len(body) < 2 {
+		return ""
+	}
+
+	// 读取 short bytes (2 bytes length + bytes)
+	stmtIdLen := int(binary.BigEndian.Uint16(body[0:2]))
+	if stmtIdLen < 0 || stmtIdLen > len(body)-2 {
+		return ""
+	}
+
+	if stmtIdLen == 0 {
+		return ""
+	}
+
+	stmtIdBytes := body[2 : 2+stmtIdLen]
+	stmtId := fmt.Sprintf("%x", stmtIdBytes)
+
+	return fmt.Sprintf("EXECUTE %s /* unknown */", stmtId)
+}
+
+// parseBatch 解析 BATCH 操作
+// BATCH 格式: batch type (1 byte) + queries count (2 bytes) + queries...
+func parseBatch(body []byte) string {
+	if len(body) < 3 {
+		return ""
+	}
+
+	batchType := body[0]
+	queriesCount := int(binary.BigEndian.Uint16(body[1:3]))
+
+	if queriesCount == 0 {
+		return "BATCH (empty)"
+	}
+
+	offset := 3
+	var queries []string
+
+	for i := 0; i < queriesCount && offset < len(body); i++ {
+		if offset >= len(body) {
+			break
+		}
+
+		// 每个查询: kind (1 byte) + query string 或 statement id
+		if offset+1 > len(body) {
+			break
+		}
+
+		kind := body[offset]
+		offset++
+
+		switch kind {
+		case 0: // QUERY
+			if offset+4 > len(body) {
+				break
+			}
+			queryLen := int(binary.BigEndian.Uint32(body[offset : offset+4]))
+			offset += 4
+			if queryLen > 0 && offset+queryLen <= len(body) {
+				queryBytes := body[offset : offset+queryLen]
+				// 移除null terminator(如果存在)
+				if len(queryBytes) > 0 && queryBytes[len(queryBytes)-1] == 0 {
+					queryBytes = queryBytes[:len(queryBytes)-1]
+				}
+				// 验证是否为有效字符串
+				if isPrintableString(queryBytes) {
+					queries = append(queries, string(queryBytes))
+				} else {
+					queries = append(queries, "<binary>")
+				}
+				offset += queryLen
+			} else if queryLen > 0 && offset < len(body) {
+				// 数据被截断
+				availableData := len(body) - offset
+				queryBytes := body[offset : offset+availableData]
+				if isPrintableString(queryBytes) {
+					queries = append(queries, string(queryBytes)+"...")
+				}
+				offset = len(body) // 到达末尾
+			}
+		case 1: // EXECUTE (prepared statement)
+			if offset+2 > len(body) {
+				break
+			}
+			stmtIdLen := int(binary.BigEndian.Uint16(body[offset : offset+2]))
+			offset += 2
+			if stmtIdLen > 0 && offset+stmtIdLen <= len(body) {
+				stmtIdBytes := body[offset : offset+stmtIdLen]
+				stmtId := fmt.Sprintf("%x", stmtIdBytes)
+				queries = append(queries, fmt.Sprintf("EXECUTE %s", stmtId))
+				offset += stmtIdLen
+			}
+		default:
+			// 跳过未知类型
+			break
+		}
+	}
+
+	batchTypeStr := "BATCH"
+	if batchType == 0 {
+		batchTypeStr = "BATCH LOGGED"
+	} else if batchType == 1 {
+		batchTypeStr = "BATCH UNLOGGED"
+	} else if batchType == 2 {
+		batchTypeStr = "BATCH COUNTER"
+	}
+
+	if len(queries) == 0 {
+		return batchTypeStr + " (empty)"
+	}
+
+	if len(queries) == 1 {
+		return fmt.Sprintf("%s: %s", batchTypeStr, queries[0])
+	}
+
+	return fmt.Sprintf("%s: [%d queries]", batchTypeStr, len(queries))
+}
+
+// Parse 方法用于支持预编译语句(需要维护状态)
+func (p *CassandraParser) Parse(payload []byte) string {
+	if len(payload) < CassandraHeaderSize {
+		return ""
+	}
+
+	version := payload[0]
+	if version != 0x04 {
+		return ""
+	}
+
+	// 提取 stream_id (在 payload[2:4] 位置,big-endian)
+	streamId := int16(binary.BigEndian.Uint16(payload[2:4]))
+	opcode := payload[4]
+	bodyLength := int(binary.BigEndian.Uint32(payload[5:9]))
+
+	if len(payload) < CassandraHeaderSize+bodyLength {
+		bodyLength = len(payload) - CassandraHeaderSize
+	}
+
+	body := payload[CassandraHeaderSize:]
+	if len(body) > bodyLength {
+		body = body[:bodyLength]
+	}
+
+	switch opcode {
+	case CassandraOpcodeQuery:
+		return parseQuery(body)
+	case CassandraOpcodeExecute:
+		return p.parseExecuteWithStatements(body)
+	case CassandraOpcodeBatch:
+		return parseBatch(body)
+	case CassandraOpcodePrepare:
+		return p.parsePrepare(body, streamId)
+	case CassandraOpcodeClose:
+		return p.parseClose(body)
+	default:
+		return ""
+	}
+}
+
+// parseExecuteWithStatements 解析 EXECUTE 操作,尝试从预编译语句映射中查找
+func (p *CassandraParser) parseExecuteWithStatements(body []byte) string {
+	if len(body) < 2 {
+		return ""
+	}
+
+	stmtIdLen := int(binary.BigEndian.Uint16(body[0:2]))
+	if stmtIdLen < 0 || stmtIdLen > len(body)-2 {
+		return ""
+	}
+
+	if stmtIdLen == 0 {
+		return ""
+	}
+
+	stmtIdBytes := body[2 : 2+stmtIdLen]
+	stmtId := fmt.Sprintf("%x", stmtIdBytes)
+
+	statement, ok := p.preparedStatements[stmtId]
+	if ok {
+		return fmt.Sprintf("EXECUTE: %s", statement)
+	}
+
+	// 如果找不到预编译语句,尝试使用最近一次 PREPARE 的查询语句(备用方案)
+	if p.lastPrepareQuery != "" {
+		// 使用 lastPrepareQuery,即使 statement id 不匹配
+		return fmt.Sprintf("EXECUTE: %s", p.lastPrepareQuery)
+	}
+
+	// 如果还是找不到,显示简化的 statement id
+	if len(stmtId) > 16 {
+		return fmt.Sprintf("EXECUTE [%s...]", stmtId[:16])
+	}
+	return fmt.Sprintf("EXECUTE [%s]", stmtId)
+}
+
+// parsePrepare 解析 PREPARE 操作,保存预编译语句
+func (p *CassandraParser) parsePrepare(body []byte, streamId int16) string {
+	query := parseQuery(body)
+	if query == "" {
+		return ""
+	}
+
+	// 保存查询语句和 stream_id 的映射,等待响应时关联 statement id
+	p.pendingPrepares[streamId] = query
+	// 同时保存为最近一次 PREPARE 的查询语句(备用方案)
+	p.lastPrepareQuery = query
+	return fmt.Sprintf("PREPARE: %s", query)
+}
+
+// parseClose 解析 CLOSE 操作,清除预编译语句
+func (p *CassandraParser) parseClose(body []byte) string {
+	if len(body) < 2 {
+		return ""
+	}
+
+	stmtIdLen := int(binary.BigEndian.Uint16(body[0:2]))
+	if stmtIdLen < 0 || stmtIdLen > len(body)-2 {
+		return ""
+	}
+
+	if stmtIdLen == 0 {
+		return ""
+	}
+
+	stmtIdBytes := body[2 : 2+stmtIdLen]
+	stmtId := fmt.Sprintf("%x", stmtIdBytes)
+
+	delete(p.preparedStatements, stmtId)
+	return fmt.Sprintf("CLOSE %s", stmtId)
+}
+
+// GetLastPrepareQuery 返回最近一次 PREPARE 的查询语句(用于调试)
+func (p *CassandraParser) GetLastPrepareQuery() string {
+	return p.lastPrepareQuery
+}
+
+// ParseResponse 解析 Cassandra 响应,用于关联 PREPARE 请求和响应
+// 响应格式: version(1) + flags(1) + stream_id(2) + opcode(1) + length(4) + body
+func (p *CassandraParser) ParseResponse(payload []byte) {
+	if len(payload) < CassandraHeaderSize {
+		return
+	}
+
+	version := payload[0]
+	if version != 0x84 { // RESPONSE_FRAME
+		return
+	}
+
+	// 提取 stream_id
+	streamId := int16(binary.BigEndian.Uint16(payload[2:4]))
+	opcode := payload[4]
+	bodyLength := int(binary.BigEndian.Uint32(payload[5:9]))
+
+	if len(payload) < CassandraHeaderSize+bodyLength {
+		bodyLength = len(payload) - CassandraHeaderSize
+	}
+
+	body := payload[CassandraHeaderSize:]
+	if len(body) > bodyLength {
+		body = body[:bodyLength]
+	}
+
+	// 只处理 RESULT 类型的响应(PREPARE 响应是 RESULT 类型)
+	if opcode == 0x08 { // CASSANDRA_OPCODE_RESULT
+		// 检查是否是 PREPARE 响应
+		// PREPARE 响应的 body 格式: result kind (4 bytes) + statement id (short bytes) + ...
+		if len(body) >= 4 {
+			resultKind := binary.BigEndian.Uint32(body[0:4])
+			// result kind = 0x0004 表示 PREPARED
+			if resultKind == 0x0004 {
+				// 提取 statement id
+				if len(body) >= 6 {
+					stmtIdLen := int(binary.BigEndian.Uint16(body[4:6]))
+					if stmtIdLen > 0 && len(body) >= 6+stmtIdLen {
+						stmtIdBytes := body[6 : 6+stmtIdLen]
+						stmtId := fmt.Sprintf("%x", stmtIdBytes)
+
+						// 从 pendingPrepares 中查找对应的查询语句
+						if query, ok := p.pendingPrepares[streamId]; ok {
+							// 关联 statement id 和查询语句
+							p.preparedStatements[stmtId] = query
+							// 删除临时映射
+							delete(p.pendingPrepares, streamId)
+						}
+					}
+				}
+			}
+		}
+	}
+}

+ 1 - 1
ebpftracer/l7/l7.go

@@ -92,7 +92,7 @@ func (p Protocol) ServiceNameString() string {
 	case ProtocolKafka:
 		return "Kafka"
 	case ProtocolCassandra:
-		return "Cassandra"
+		return "CASSANDRA"
 	case ProtocolRabbitmq:
 		return "Rabbitmq"
 	case ProtocolNats:

+ 29 - 23
ebpftracer/tls.go

@@ -30,6 +30,7 @@ const (
 	goExecute                      = "runtime.execute"
 	goNewproc1                     = "runtime.newproc1"
 	goRunqget                      = "runtime.runqget"
+	goGoready                      = "runtime.goready"
 	goServeHTTP                    = "net/http.serverHandler.ServeHTTP"
 	goTransport                    = "net/http.(*Transport).roundTrip"
 	goGrpcServerHandleStream       = "google.golang.org/grpc.(*Server).handleStream"
@@ -39,6 +40,7 @@ const (
 	goGrpcClientLoopyHeaderHandler = "google.golang.org/grpc/internal/transport.(*loopyWriter).headerHandler"
 	goGrpcHttp2ClientNewStream     = "google.golang.org/grpc/internal/transport.(*http2Client).NewStream"
 	goReadContinuedLineSlice       = "net/textproto.(*Reader).readContinuedLineSlice"
+	goGocqlSessionExecuteQuery     = "github.com/gocql/gocql.(*Session).executeQuery"
 )
 
 var (
@@ -560,16 +562,19 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 			klog.Infof("[AttachGoTlsUprobes] STEP 19.4: Matched runtime.runqget symbol (index=%d)", i)
 		case goServeHTTP:
 			matchedSymbols++
-			klog.Infof("[AttachGoTlsUprobes] STEP 19.5: Matched net/http.serverHandler.ServeHTTP symbol (index=%d)", i)
+			klog.Infof("[AttachGoTlsUprobes] STEP 19.6: Matched net/http.serverHandler.ServeHTTP symbol (index=%d)", i)
 		case goTransport:
 			matchedSymbols++
-			klog.Infof("[AttachGoTlsUprobes] STEP 19.6: Matched net/http.Transport.roundTrip symbol (index=%d)", i)
+			klog.Infof("[AttachGoTlsUprobes] STEP 19.7: Matched net/http.Transport.roundTrip symbol (index=%d)", i)
 		case goGrpcClientConnInvoke:
 			matchedSymbols++
-			klog.Infof("[AttachGoTlsUprobes] STEP 19.7: Matched gRPC ClientConn.Invoke symbol (index=%d)", i)
+			klog.Infof("[AttachGoTlsUprobes] STEP 19.8: Matched gRPC ClientConn.Invoke symbol (index=%d)", i)
 		case goGrpcHttp2OperateHeader, goGrpcServerHandleStream, goGrpcServerWritestatus, goGrpcClientLoopyHeaderHandler, goGrpcHttp2ClientNewStream:
 			matchedSymbols++
-			klog.Infof("[AttachGoTlsUprobes] STEP 19.8: Matched gRPC symbol: %s (index=%d)", s.Name, i)
+			klog.Infof("[AttachGoTlsUprobes] STEP 19.9: Matched gRPC symbol: %s (index=%d)", s.Name, i)
+		case goGocqlSessionExecuteQuery:
+			matchedSymbols++
+			klog.Infof("[AttachGoTlsUprobes] STEP 19.10: Matched gocql Session.executeQuery symbol (index=%d)", i)
 		case goReadContinuedLineSlice:
 		default:
 			continue
@@ -615,25 +620,17 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 				return nil, err
 			}
 			links = append(links, l)
-			//sStart := s.Value - textSection.Addr
-			//sEnd := sStart + s.Size
-			//if sEnd > textSectionLen {
-			//	continue
-			//}
-			//sBytes := textSectionData[sStart:sEnd]
-			//returnOffsets := getReturnOffsets(ef.Machine, sBytes)
-			//if len(returnOffsets) == 0 {
-			//	log("failed to attach enter_runtime_newproc1 uprobe", fmt.Errorf("no return offsets found"))
-			//	return nil
-			//}
-			//for _, offset := range returnOffsets {
-			//	l, err := exe.Uprobe(s.Name, t.uprobes["exit_runtime_newproc1"], &link.UprobeOptions{Address: address, Offset: uint64(offset)})
-			//	if err != nil {
-			//		log("failed to attach exit_runtime_newproc1 uprobe", err)
-			//		return nil
-			//	}
-			//	links = append(links, l)
-			//}
+
+		//case goGoready:
+		//klog.Infof("[AttachGoTlsUprobes] STEP 22: Attaching uprobe for runtime.goready, address=0x%x", address)
+		//l, err := attachUprobe(exe, s.Name, "runtime_goready", t.uprobes, address, "failed to attach runtime.goready uprobe", true)
+		//if err != nil {
+		//	klog.Infoln("runtime.goready no")
+		//	return nil, err
+		//}
+		//klog.Infof("[AttachGoTlsUprobes] STEP 22.2: Successfully attached runtime.goready uprobe")
+		//klog.Infoln("runtime.goready ok")
+		//links = append(links, l)
 		case goGrpcClientConnInvoke:
 			retLinks, err := attachUprobeWithReturns(exe, s.Name, "uprobe_ClientConn_Invoke", "uprobe_ClientConn_Invoke_Returns", t.uprobes, address, s, textSection, textSectionLen, textSectionData, ef.Machine, "failed to attach uprobe_ClientConn_Invoke uprobe", true)
 			if err != nil {
@@ -730,6 +727,15 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 					links = append(links, retLinks...)
 				}
 			}
+		case goGocqlSessionExecuteQuery:
+			retLinks, err := attachUprobeWithReturns(exe, s.Name, "uprobe_Session_executeQuery", "uprobe_Session_executeQuery_Returns", t.uprobes, address, s, textSection, textSectionLen, textSectionData, ef.Machine, "failed to attach uprobe_Session_executeQuery uprobe", true)
+			if err != nil {
+				return nil, err
+			}
+			if retLinks != nil {
+				klog.Infoln("uprobe_Session_executeQuery ok")
+				links = append(links, retLinks...)
+			}
 		}
 	}
 	klog.Infof("[AttachGoTlsUprobes] STEP 22: Symbol processing completed, matched symbols=%d, total links=%d", matchedSymbols, len(links))

+ 1 - 1
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -191,7 +191,7 @@ func tracetransformData(sdl []tracesdk.ReadOnlySpan) map[int][]RootDataT {
 					buildDNSMapEvent(&mNode, event)
 				case l7.ProtocolMysql, l7.ProtocolMariaDB, l7.ProtocolPostgres, l7.ProtocolDM:
 					buildSQLMapEvent(&mNode, event)
-				case l7.ProtocolRedis, l7.ProtocolMemcached:
+				case l7.ProtocolRedis, l7.ProtocolMemcached, l7.ProtocolCassandra:
 					buildNoSqlMapEvent(&mNode, event)
 				case l7.ProtocolMongo:
 					buildMongoMapEvent(&mNode, event)

+ 16 - 7
tracing/apm_tracing.go

@@ -552,7 +552,7 @@ func (t *Trace) SQLTraceQueryEvent(l7Type l7.Protocol, semconvVal attribute.KeyV
 //	)
 //}
 
-func (t *Trace) NoSQLTraceQueryEvent(l7Type l7.Protocol, semconvVal attribute.KeyValue, operation, statement string, r *l7.RequestData, destination netaddr.IPPort) {
+func (t *Trace) NoSQLTraceQueryEvent(l7Type l7.Protocol, semconvVal attribute.KeyValue, operation, statement string, r *l7.RequestData, connSrc, connDest netaddr.IPPort) {
 	if t == nil {
 		return
 	}
@@ -562,17 +562,26 @@ func (t *Trace) NoSQLTraceQueryEvent(l7Type l7.Protocol, semconvVal attribute.Ke
 	}
 
 	// 拼接 destination 的 IP 和端口为 ip:port 格式
-	destAddr := fmt.Sprintf("%s:%d", destination.IP().String(), destination.Port())
+
+	srcAddr := r.ComponentSAddr.String()
+	if r.ComponentSAddr.Port() == 0 {
+		srcAddr = connSrc.String()
+	}
+
+	destinationAddr := r.ComponentDAddr.String()
+	if r.ComponentDAddr.Port() == 0 {
+		destinationAddr = connDest.String()
+	}
 
 	var attr []attribute.KeyValue
 	attr = append(attr,
 		semconvVal,
 		semconv.DBStatement(statement),
-		semconv.NetPeerName(destination.IP().String()),
-		semconv.NetPeerPort(int(destination.Port())),
-		attribute.String("nosql.src_addr", r.ComponentSAddr.String()),
-		attribute.String("nosql.destination_addr", r.ComponentDAddr.String()),
-		attribute.String("nosql.dbn", destAddr),
+		semconv.NetPeerName(connDest.IP().String()),
+		semconv.NetPeerPort(int(connDest.Port())),
+		attribute.String("nosql.src_addr", srcAddr),
+		attribute.String("nosql.destination_addr", destinationAddr),
+		attribute.String("nosql.dbn", connDest.String()),
 	)
 
 	// 如果 operation 不为空,添加 DBOperation 属性