Преглед изворни кода

Feature #TASK_QT-18250 跨线程追踪问题

Carl пре 1 месец
родитељ
комит
069a95f605

+ 22 - 21
ebpftracer/ebpf/l7/apm_trace.c

@@ -207,18 +207,18 @@ struct tp_sched_switch {
 };
 
 // ---------- 业务上下文(thread_ctx) ----------
-struct thread_ctx_t {
-	__u64 token;         // 追踪 token / trace_id(入口处写入)
-	__u64 ts_ns;         // 最近一次刷新时间
-	__u64 exp_ns;        // 过期时间(now + TTL)
-	__u32 root_thread;   // 主线程
-	__u32 tgid;          // 所属进程
-	__u32 parent_tid;    // 父线程(用于调试/回溯)
-	__u8  is_main_thread;// 主线程
-	__u16 level;         // 继承层数
-	struct apm_trace_info_t *trace_info;
-	struct apm_trace_key_t trace_key;
-};
+//struct thread_ctx_t {
+//	__u64 token;         // 追踪 token / trace_id(入口处写入)
+//	__u64 ts_ns;         // 最近一次刷新时间
+//	__u64 exp_ns;        // 过期时间(now + TTL)
+//	__u32 root_thread;   // 主线程
+//	__u32 tgid;          // 所属进程
+//	__u32 parent_tid;    // 父线程(用于调试/回溯)
+//	__u8  is_main_thread;// 主线程
+//	__u16 level;         // 继承层数
+//	struct apm_trace_info_t *trace_info;
+//	struct apm_trace_key_t trace_key;
+//};
 
 // ---------- waker→wakee 的“边” ----------
 struct edge_t {
@@ -366,21 +366,23 @@ __u64 get_apm_trace_id(__u32 pid, __u32 tid) {
 			bpf_printk("    [Redis] has context:  %u->%u token=%llu", tid, current_ctx->parent_tid, current_ctx->token);
 			bpf_printk("    [Redis] has context:  %u->%u token=%llu", tid, current_ctx->root_thread, current_ctx->token);
 			bpf_printk("    [Redis] has context:  trace_key tid:%d,pid", current_ctx->trace_key.pid);
+			// 优先查找 root 线程的最新上下文(可能已被新请求刷新)
 			struct thread_ctx_t *has_root_ctx = bpf_map_lookup_elem(&thread_ctx, &current_ctx->trace_key.pid);
+			if (has_root_ctx && has_root_ctx->exp_ns > bpf_ktime_get_ns()) {
+				// root 线程有有效上下文,使用其最新 token(解决跨请求 token 过期问题)
+				bpf_printk("    [R0] root active: root_tid=%u token=%llu (child cached=%llu)",
+				           current_ctx->trace_key.pid, has_root_ctx->token, current_ctx->token);
+				return has_root_ctx->token;
+			}
 			if (!has_root_ctx) {
 				bpf_printk("thead is inactive.");
-				// 线程没有 context,尝试回退查找同一进程的 root thread context
-				// 这解决了仅依赖 sched_wakeup/sched_switch 无法关联的问题
+				// root 线程没有 context,回退通过 tgid_root_thread 查找
 				bpf_printk("    [R1] tid:%d has NO context, trying fallback: pid=%u (tgid)", tid, pid);
-				// 通过 tgid 查找 root thread
 				__u32 *root_tid_ptr = bpf_map_lookup_elem(&tgid_root_thread, &pid);
 				if (root_tid_ptr) {
 					__u32 root_tid = *root_tid_ptr;
 					bpf_printk("    [R2] Found root thread: tgid=%u -> root_tid=%u", pid, root_tid);
-
-					// 通过 root thread 查找 context
 					struct thread_ctx_t *root_ctx = bpf_map_lookup_elem(&thread_ctx, &root_tid);
-//					if (root_ctx ) {
 					if (root_ctx && root_ctx->exp_ns > bpf_ktime_get_ns()) {
 						bpf_printk("    [R3] Found root context: token=%llu", root_ctx->token);
 						return root_ctx->token;
@@ -389,9 +391,9 @@ __u64 get_apm_trace_id(__u32 pid, __u32 tid) {
 					}
 				} else {
 					bpf_printk("    [R4] No root thread mapping found for tgid=%u", pid);
-					bpf_printk("    [R5] This thread may have been created before trace start or was not captured by sched_wakeup");
 				}
 			}
+			// root 已过期且回退也找不到,使用子线程缓存的 token
 			return current_ctx->token;
 		} else {
 			// 线程没有 context,尝试回退查找同一进程的 root thread context
@@ -717,8 +719,7 @@ void cw_save_current_tracking_span(struct apm_span_context *sc) {
 
 static __inline __attribute__((__always_inline__))
 struct apm_span_context *
-cw_get_current_tracking_span(struct apm_trace_info_t *trace_info, struct apm_trace_key_t origin_trace_key,
-                             unsigned char assumed_app_id[APM_ASSUMED_APP_ID_SIZE],unsigned char span_id[APM_SPAN_ID_SIZE]) {
+cw_get_current_tracking_span(struct apm_trace_info_t *trace_info, struct apm_trace_key_t origin_trace_key) {
 	struct apm_trace_key_t trace_key = {0};
 	if (trace_info){
 		trace_key = trace_info->trace_key;

+ 6 - 5
ebpftracer/ebpf/l7/l7.c

@@ -647,8 +647,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
 //	    bpf_printk("   [HTTP] [Start] payload:%s",  payload);
 
         // cw_bpf_debug("l7.c111 addr is --------:%d,%s",conn->sport,conn->saddr);
-	    struct apm_span_context *sc = cw_get_current_tracking_span(trace_info, trace_key, req->assumed_app_id,
-	                                                               req->span_id);
+	    struct apm_span_context *sc = cw_get_current_tracking_span(trace_info, trace_key);
 	    if (sc) {
 		    cw_copy_byte_arrays(sc->assumed_app_id, req->assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
 		    cw_copy_byte_arrays(sc->span_id, req->span_id, APM_SPAN_ID_SIZE);
@@ -695,7 +694,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
 	    cw_bpf_debug("[Enter][MEMCACHE]:TGID:%d|type:%s|FD:%d\n",k.pid,"type",k.fd);
         req->protocol = PROTOCOL_MEMCACHED;
     } else if (is_mysql_query(payload, size, &req->request_type)) {
-        cw_bpf_debug("[Enter][Mysql]:thread_id:%d\n",tid);
+        bpf_printk("[Enter][Mysql]:thread_id:%d\n",tid);
         if (req->request_type == MYSQL_COM_STMT_CLOSE) {
 	        return 0;
             struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
@@ -1874,7 +1873,9 @@ int trace_epoll_wait(struct trace_event_raw_sys_enter__stub *ctx)
 	int maxevents = ctx->args[2];
 	int timeout   = ctx->args[3];
 //
-	bpf_printk("[epoll_wait-enter] tgid=%u tid=%u epfd=%d",tgid, tid, epfd);
+	if (tgid != tid ){
+		bpf_printk("[epoll_wait-enter] tgid=%u tid=%u epfd=%d",tgid, tid, epfd);
+	}
 	return 0;
 }
 
@@ -1930,7 +1931,7 @@ int trace_exit_epoll_pwait(struct trace_event_raw_sys_exit__stub *ctx)
 
 #define MS_TO_NSS(x)  ((x##ULL) * 1000 * 1000)
 #define EXP_WAKEE_MS 30ULL * 1000 * 1000
-#define EXP_WAKER_MS 300
+#define EXP_WAKER_MS 300ULL * 1000 * 1000
 
 static __always_inline void clone_ctx_and_update(__u32 dst_tid,
                                                  const struct thread_ctx_t *src,

+ 2 - 2
ebpftracer/ebpf/utrace/go/net/grpc.client.probe.bpf.c

@@ -288,8 +288,8 @@ done:
     // COPY_PAYLOAD(e->payload + grpc_span->method_size, grpc_span->target_size, grpc_span->target);
     // cw_bpf_debug("e->payload is %s\n", e->payload);
     // e->payload_size += grpc_span->target_size;
-    
-    struct  apm_span_context * sc = cw_get_current_tracking_span(trace_info);
+
+	struct apm_span_context *sc = cw_get_current_tracking_span(trace_info, trace_key);
     if (sc) {
         cw_copy_byte_arrays(sc->assumed_app_id, e->assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
         cw_copy_byte_arrays(sc->span_id, e->span_id, APM_SPAN_ID_SIZE);

+ 1 - 1
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/client_apm.go

@@ -115,7 +115,7 @@ func (d *client) newApmRequest(body []byte, mapLen int, codeType CodeType) (requ
 	r.Header.Set("Content-Type", "text/plain;charset=utf-8")
 
 	r.Header.Set("routingKey", codeType.Topic())
-	klog.Debugln(codeType.Topic())
+	klog.Debugf("codeType %d,routingKey:%s", codeType, codeType.Topic())
 	r.Header.Set("DataCount", strconv.Itoa(mapLen))
 	// 对接op
 	r.Header.Set("AccountId", strconv.Itoa(utils.GetAccountID()))

+ 2 - 2
utils/modelse/code_type.go

@@ -74,8 +74,8 @@ func (p CodeType) Topic() string {
 	//	return "JAVA_AOT"
 	//case CodeTypePHP:
 	//	return "PHP"
-	//case CodeTypePython:
-	//	return "PYTHON"
+	case CodeTypePython:
+		return "pythonTopic"
 	//case CodeTypeDotNet:
 	//	return "DOTNET"
 	//case CodeTypeNode: