Преглед на файлове

Feature #TASK_QT-31498 【Q4】eBPF-Go cassandra V2 适配

Carl преди 6 месеца
родител
ревизия
8090de7c26
променени са 2 файла, в които са добавени 166 реда и са изтрити 2 реда
  1. 153 2
      ebpftracer/ebpf/utrace/go/db/gocql.probe.bpf.c
  2. 13 0
      ebpftracer/tls.go

+ 153 - 2
ebpftracer/ebpf/utrace/go/db/gocql.probe.bpf.c

@@ -14,7 +14,9 @@
 // Query 结构体中 stmt 字段的偏移量
 // 如果 stmt 是 Query 结构体的第一个字段,偏移量为 0
 // 可以通过 DWARF 信息动态获取,这里使用可配置的偏移量
-volatile const __u64 query_stmt_offset = 0;
+
+// internalQuery 结构体中 originalQuery 字段的偏移量
+// internalQuery.originalQuery 是指向 Query 的指针
 
 // 存储 gocql 查询请求的上下文
 struct gocql_query_context_t {
@@ -146,7 +148,7 @@ int uprobe_Session_executeQuery(struct pt_regs *ctx) {
 	// 从 Query 结构体中读取 stmt 字段
     // stmt 是一个 string 类型,在 Go 中是 go_string_ot 结构体
     struct go_string_ot stmt_str = {0};
-    void *stmt_ptr = (void *)((char *)qry_ptr + query_stmt_offset);
+    void *stmt_ptr = (void *)((char *)qry_ptr + 0);
 //    cw_bpf_debug("[gocql] stmt_ptr=%llx", (__u64)stmt_ptr);
 
     // 读取 string 结构体
@@ -293,3 +295,152 @@ int uprobe_Session_executeQuery_Returns(struct pt_regs *ctx) {
     cw_bpf_debug("[gocql] Returns done %llu",e->trace_id);
     return 0;
 }
+
+// This instrumentation attaches uprobe to the following function:
+// func (s *Session) executeQuery(qry *internalQuery) (it *Iter)
+// internalQuery 包含 originalQuery *Query 字段
+SEC("uprobe/Session_executeQuery_cassandra")
+int uprobe_Session_executeQuery_cassandra(struct pt_regs *ctx) {
+    __u64 pid_tgid = bpf_get_current_pid_tgid();
+    __u32 tgid = pid_tgid >> 32;
+    
+    bpf_printk("[cassandra] enter Session.executeQuery");
+    
+    struct ebpf_proc_info *info = bpf_map_lookup_elem(&proc_info_map, &tgid);
+    if (!info) {
+        cw_bpf_debug("[cassandra] no proc_info");
+        return 0;
+    }
+
+    // 获取第二个参数 qry *internalQuery
+    // 参数顺序:1 = s *Session (receiver), 2 = qry *internalQuery
+    void *internal_qry_ptr = get_argument(ctx, 2);
+    if (internal_qry_ptr == 0) {
+        cw_bpf_debug("[cassandra] internal_qry_ptr is null");
+        return 0;
+    }
+
+    // 从 internalQuery 中读取 originalQuery *Query 指针
+    void *original_query_ptr = 0;
+    void *original_query_ptr_addr = (void *)((char *)internal_qry_ptr + 0);
+    long res = bpf_probe_read(&original_query_ptr, sizeof(original_query_ptr), original_query_ptr_addr);
+    if (res != 0 || original_query_ptr == 0) {
+        cw_bpf_debug("[cassandra] failed to read originalQuery pointer");
+        return 0;
+    }
+
+    // 获取 consistent key (使用 goroutine 地址作为 key)
+    void *key = (void *)GOROUTINE(ctx);
+    
+    // 检查是否已经存在该请求
+    struct gocql_query_context_t *existing = bpf_map_lookup_elem(&gocql_query_events, &key);
+    if (existing != NULL) {
+        cw_bpf_debug("[cassandra] already exists");
+        return 0;
+    }
+
+    // 从 Query 结构体中读取 stmt 字段
+    struct go_string_ot stmt_str = {0};
+    void *stmt_ptr = (void *)((char *)original_query_ptr + 0);
+    
+    // 读取 string 结构体
+    res = bpf_probe_read(&stmt_str, sizeof(stmt_str), stmt_ptr);
+    if (res != 0) {
+        cw_bpf_debug("[cassandra] failed to read stmt_str");
+        return 0;
+    }
+    // 分配存储空间
+    u32 zero = 0;
+    struct gocql_query_context_t *query_ctx = bpf_map_lookup_elem(&gocql_query_storage_map, &zero);
+    if (query_ctx == NULL) {
+        cw_bpf_debug("[cassandra] failed to get storage");
+        return -1;
+    }
+
+    __builtin_memset(query_ctx, 0, sizeof(struct gocql_query_context_t));
+
+    query_ctx->start_time = bpf_ktime_get_ns();
+    query_ctx->stmt_size = stmt_str.len;
+
+    // 读取 stmt 字符串内容
+    res = bpf_probe_read(query_ctx->stmt, sizeof(query_ctx->stmt), stmt_str.str);
+    if (res != 0) {
+        cw_bpf_debug("[cassandra] read stmt failed");
+        return 0;
+    }
+
+    // 保存到 map
+    bpf_map_update_elem(&gocql_query_events, &key, query_ctx, 0);
+
+    return 0;
+}
+
+// This instrumentation attaches uprobe to the return of Session.executeQuery
+SEC("uprobe/Session_executeQuery_cassandra")
+int uprobe_Session_executeQuery_cassandra_Returns(struct pt_regs *ctx) {
+    __u64 pid_tgid = bpf_get_current_pid_tgid();
+    __u32 tgid = pid_tgid >> 32;
+
+    cw_bpf_debug("[cassandra] enter Session.executeQuery Returns");
+    struct ebpf_proc_info *info = bpf_map_lookup_elem(&proc_info_map, &tgid);
+    if (!info) {
+        cw_bpf_debug("[cassandra] Returns no info");
+        return 0;
+    }
+
+    // 获取 consistent key
+    void *key = (void *)GOROUTINE(ctx);
+    
+    // 查找对应的请求
+    struct gocql_query_context_t *query_ctx = bpf_map_lookup_elem(&gocql_query_events, &key);
+    if (query_ctx == 0) {
+        cw_bpf_debug("[cassandra] Returns no ctx");
+        return 0;
+    }
+
+    // 更新结束时间
+    query_ctx->end_time = bpf_ktime_get_ns();
+
+    __u32 zero = 0;
+    struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+    if (!e) {
+        cw_bpf_debug("[cassandra] Returns no event");
+        bpf_map_delete_elem(&gocql_query_events, &key);
+        return 0;
+    }
+
+    // 设置事件属性
+    e->protocol = PROTOCOL_CASSANDRA;
+    e->pid = tgid;
+    e->start_at = query_ctx->start_time;
+    e->end_at = query_ctx->end_time;
+    e->duration = e->end_at - e->start_at;
+    e->payload_size = query_ctx->stmt_size;
+    
+    COPY_PAYLOAD(e->payload, query_ctx->stmt_size, query_ctx->stmt);
+
+    struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
+    struct apm_trace_info_t *trace_info = get_apm_trace_info_by_trace_key(trace_key);
+    
+    if (trace_info == 0) {
+        trace_info = get_apm_trace_info_v3(trace_key, pid_tgid, tgid, pid_tgid);
+    }
+    
+    if (trace_info) {
+        e->trace_id = trace_info->trace_id;
+    }
+    
+    if (e->trace_id == 0) {
+        e->trace_id = get_apm_trace_id(tgid, pid_tgid);
+    }
+    // 发送事件
+    long error = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+    if (error == 0) {
+        cw_add_event_count(e->trace_id);
+        cw_bpf_debug("[cassandra] Returns success");
+    }
+    // 清理
+    bpf_map_delete_elem(&gocql_query_events, &key);
+    cw_bpf_debug("[cassandra] Returns done %llu", e->trace_id);
+    return 0;
+}

+ 13 - 0
ebpftracer/tls.go

@@ -41,6 +41,7 @@ const (
 	goGrpcHttp2ClientNewStream     = "google.golang.org/grpc/internal/transport.(*http2Client).NewStream"
 	goReadContinuedLineSlice       = "net/textproto.(*Reader).readContinuedLineSlice"
 	goGocqlSessionExecuteQuery     = "github.com/gocql/gocql.(*Session).executeQuery"
+	goGocqlSessionExecuteQueryV2   = "github.com/apache/cassandra-gocql-driver/v2.(*Session).executeQuery"
 )
 
 var (
@@ -575,6 +576,9 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 		case goGocqlSessionExecuteQuery:
 			matchedSymbols++
 			klog.Infof("[AttachGoTlsUprobes] STEP 19.10: Matched gocql Session.executeQuery symbol (index=%d)", i)
+		case goGocqlSessionExecuteQueryV2:
+			matchedSymbols++
+			klog.Infof("[AttachGoTlsUprobes] STEP 19.11: Matched cassandra Session.executeQuery symbol (index=%d)", i)
 		case goReadContinuedLineSlice:
 		default:
 			continue
@@ -736,6 +740,15 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 				klog.Infoln("uprobe_Session_executeQuery ok")
 				links = append(links, retLinks...)
 			}
+		case goGocqlSessionExecuteQueryV2:
+			retLinks, err := attachUprobeWithReturns(exe, s.Name, "uprobe_Session_executeQuery_cassandra", "uprobe_Session_executeQuery_cassandra_Returns", t.uprobes, address, s, textSection, textSectionLen, textSectionData, ef.Machine, "failed to attach uprobe_Session_executeQuery_cassandra uprobe", true)
+			if err != nil {
+				return nil, err
+			}
+			if retLinks != nil {
+				klog.Infoln("uprobe_Session_executeQuery_cassandra ok")
+				links = append(links, retLinks...)
+			}
 		}
 	}
 	klog.Infof("[AttachGoTlsUprobes] STEP 22: Symbol processing completed, matched symbols=%d, total links=%d", matchedSymbols, len(links))