Ver código fonte

Feature #TASK_QT-31498 kafka consumer

Carl 6 meses atrás
pai
commit
07568e3830

+ 20 - 12
containers/container_apm.go

@@ -29,10 +29,6 @@ import (
 	"inet.af/netaddr"
 )
 
-const (
-	TRACE_STATUS = 1
-)
-
 func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
 	trace, ok := c.traceMap[traceId]
 	return trace, ok
@@ -154,7 +150,7 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 	//}
 	//klog.Infof("====ProtocolTrace+++++ start==== %d %d", pid, r.TraceId)
 	// klog.Infof("====ProtocolTrace===== start==== %d %d", r.Protocol == l7.ProtocolTrace, c.l7Attach)
-	if r.Protocol == l7.ProtocolTrace && c.l7Attach && c.valuableTrace(r.TraceId) {
+	if c.l7Attach && c.valuableTrace(r.TraceId) {
 		// klog.Infof("====ProtocolTrace---- start==== %d %d", pid, r.TraceId)
 		if r.TraceStart == TRACE_STATUS {
 			// klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId)
@@ -163,7 +159,8 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 				klog.Debugf("->>> [%s] -> payload:[%s]", c.AppInfo.AppName, r.Payload)
 			}
 			if err == nil {
-				if r.TraceType == 0 {
+				switch r.Protocol {
+				case l7.ProtocolHTTP:
 					method, requestURI, sn, sport, userAgent := l7.ParseHttpHostWithUserAgent(r.Payload, r.IsTls)
 					// userAgent 可以在这里使用,例如传递给 trace.TraceStartEvent
 					ip, _ := netaddr.ParseIP(sn)
@@ -174,16 +171,27 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 					}
 					trace.TraceStartEvent(method, requestURI, sn, userAgent, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo(), container_id)
 					c.SendEvent(trace, r.TraceId)
-				} else if r.TraceType == 1 {
+				case l7.ProtocolGrpc:
+					// gRPC
 					ipAddr, port, containerID := c.getGrpcServerNetworkInfo()
 					trace.GrpcServerTraceStartEvent(ipAddr, port, r, c.GetAppInfo(), containerID)
 					c.SendEvent(trace, r.TraceId)
+				case l7.ProtocolKafka:
+					var sn string
+					var sport uint16
+					var container_id string
+					conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
+					if conn != nil {
+						sn = conn.ActualDest.IP().String()
+						sport = conn.ActualDest.Port()
+					}
 
-					// apmTrace, err := c.getOrInitTrace(r.TraceId)
-					// if err == nil {
-					// 	apmTrace.GrpcServerTraceQueryEvent(r, c.GetAppInfo())
-					// 	c.SendEvent(apmTrace, r.TraceId)
-					// }
+					if c.cgroup != nil {
+						container_id = c.cgroup.ContainerId
+					}
+					// MQ
+					trace.MQConsumerTraceStartEvent(sn, sport, r, c.GetAppInfo(), container_id)
+					c.SendEvent(trace, r.TraceId)
 				}
 			}
 

+ 1 - 0
ebpftracer/ebpf/ebpf.c

@@ -64,6 +64,7 @@
 #include "utrace/go/net/grpc.client.probe.bpf.c"
 #include "utrace/go/db/gocql.probe.bpf.c"
 #include "utrace/go/mq/kafka/producer.probe.bpf.c"
+#include "utrace/go/mq/kafka/consumer.probe.bpf.c"
 
 #include "utrace/java/net/server.probe.bpf.c"
 #include "utrace/java/net/client.probe.bpf.c"

+ 21 - 0
ebpftracer/ebpf/include/apm_trace.h

@@ -133,4 +133,25 @@ static __always_inline void cw_copy_byte_arrays(unsigned char *src, unsigned cha
 	}
 }
 
+// 设置 trace 开始标记
+// 用法: SET_TRACE_START(e, PROTOCOL_HTTP)
+#define SET_TRACE_START(e, p) do { \
+	(e)->trace_start = PROTOCOL_TRACE; \
+	(e)->trace_end = 0; \
+	(e)->protocol = (p); \
+} while (0)
+
+// 设置 trace 结束标记
+// 用法: SET_TRACE_END(e, PROTOCOL_HTTP)
+#define SET_TRACE_END(e, p) do { \
+	(e)->trace_start = 0; \
+	(e)->trace_end = PROTOCOL_TRACE; \
+	(e)->protocol = (p); \
+} while (0)
+
+#define SET_TRACE_METHOD(e) do { \
+	(e)->trace_start = 0; \
+	(e)->trace_end = 0; \
+} while (0)
+
 #endif //EUSPACES_APM_TRACE_H

+ 6 - 0
ebpftracer/ebpf/include/socket_trace_common.h

@@ -219,8 +219,14 @@ struct ebpf_proc_info {
 	__u64 kafka_message_topic_pos;
 	__u64 kafka_message_headers_pos;
 	__u64 kafka_message_time_pos;
+	__u64 kafka_message_partition_pos;  // Message.Partition field offset
+	__u64 kafka_message_offset_pos;     // Message.Offset field offset
+	__u64 kafka_message_value_pos;      // Message.Value field offset
 	__u64 kafka_writer_topic_pos;
 	__u64 kafka_writer_addr_pos;  // Writer.Brokers field offset
+	__u64 kafka_reader_config_pos;      // Reader.Config field offset
+	__u64 kafka_reader_config_group_id_pos;  // Reader.Config.GroupID field offset
+	__u64 kafka_reader_config_topics_pos;    // Reader.Config.Topics field offset
 	// net.TCPAddr offsets
 	__u64 tcp_addr_ip_offset;    // net.TCPAddr.IP field offset
 	__u64 tcp_addr_port_offset;  // net.TCPAddr.Port field offset

+ 14 - 0
ebpftracer/ebpf/l7/apm_trace.c

@@ -78,6 +78,20 @@ struct {
 	__uint(max_entries, 10240);
 } pid_of_connection_ptr_maps SEC(".maps");
 
+//struct {
+//	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+//	__uint(key_size, sizeof(__u32));
+//	__uint(value_size, sizeof(struct apm_span_context));
+//	__uint(max_entries, 1);
+//} parent_span_context_storage_map SEC(".maps");
+
+struct {
+	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+	__uint(key_size, sizeof(__u32));
+	__uint(value_size, sizeof(struct apm_span_context));
+	__uint(max_entries, 1);
+} cw_parent_span_context_storage_map SEC(".maps");
+
 static __inline __attribute__((__always_inline__))
 struct apm_trace_key_t get_apm_trace_key(__u64 timeout, bool is_socket_io) {
 	__u64 pid_tgid = bpf_get_current_pid_tgid();

+ 4 - 7
ebpftracer/ebpf/l7/l7.c

@@ -80,7 +80,7 @@ struct l7_event {
 	__u64 end_at;
     __u32 trace_start;
     __u32 trace_end;
-    __u32 trace_type;           // 0: normal, 1: grpc-server, 2: https
+    __u32 trace_type;           // 0: normal, 1: grpc-server, 2: https 3 mq
     __u32 event_count;
     __u16 sport;
     __u16 dport;
@@ -457,18 +457,16 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
 	        cw_clear_trace(pid, tid, fd);
             return 0;
         }
+	    SET_TRACE_END(e, PROTOCOL_HTTP);
 	    e->start_at = req->ns;
 	    cw_bpf_debug("req->ns:%llu",req->ns);
 	    e->end_at = bpf_ktime_get_ns();
         e->duration = e->end_at - e->start_at;
 //        cw_bpf_debug("[Response][HTTP]:duration->ns:%d\n",e->duration);
-        e->protocol = PROTOCOL_TRACE;
         e->status = http_status;
         e->pid = k.pid;
         e->fd = k.fd;
         // e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
-        e->trace_start = 0;
-        e->trace_end = 1;
         e->trace_type = 0;
         e->trace_id = trace_id;
         e->payload_size = size;
@@ -855,10 +853,8 @@ int trace_exit_read_common(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long in
 //	    __u64 uid_base = bpf_ktime_get_ns();
 //	    trace_info.trace_id = bpf_get_current_pid_tgid() + uid_base;
 
-        e->trace_start = 1;
-        e->trace_end = 0;
+        SET_TRACE_START(e, PROTOCOL_HTTP);
         e->trace_type = 0;
-        e->protocol = PROTOCOL_TRACE;
         e->trace_id = trace_info.trace_id;
 	    cw_bpf_debug("\n");
 		cw_bpf_debug("[Trace Start in l7][HTTP]pid:[%d]--[%lld]--trace_id:%llu\n", pid, bpf_ktime_get_ns(),trace_info.trace_id);
@@ -1244,6 +1240,7 @@ int trace_exit_read_common(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long in
 	e->end_at = bpf_ktime_get_ns();
 	e->start_at = req->ns;
     e->duration = e->end_at - e->start_at;
+	SET_TRACE_METHOD(e);
     send_event(ctx, e, cid, conn);
     return 0;
 }

+ 2 - 0
ebpftracer/ebpf/utrace/go/db/gocql.probe.bpf.c

@@ -285,6 +285,7 @@ int uprobe_Session_executeQuery_Returns(struct pt_regs *ctx) {
         e->trace_id = get_apm_trace_id(tgid, pid_tgid);
     }
     // 发送事件
+	SET_TRACE_METHOD(e);
     long error = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
     if (error == 0) {
         cw_add_event_count(e->trace_id);
@@ -431,6 +432,7 @@ int uprobe_Session_executeQuery_cassandra_Returns(struct pt_regs *ctx) {
     }
 
     // 发送事件
+	SET_TRACE_METHOD(e);
     long error = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
     if (error == 0) {
         cw_add_event_count(e->trace_id);

+ 403 - 0
ebpftracer/ebpf/utrace/go/mq/kafka/consumer.probe.bpf.c

@@ -1,3 +1,406 @@
 //
 // Created by Carl.Guo on 2025/11/17.
 //
+// Copyright The OpenTelemetry Authors
+// SPDX-License-Identifier: Apache-2.0
+
+#include "arguments.h"
+#include "span_context.h"
+#include "go_context.h"
+#include "go_types.h"
+#include "uprobe.h"
+#include "apm_trace.h"
+
+#define MAX_CONCURRENT 50
+// https://github.com/apache/kafka/blob/0.10.2/core/src/main/scala/kafka/common/Topic.scala#L30C3-L30C34
+#define MAX_TOPIC_SIZE 256
+// No constraint on the key size, but we must have a limit for the verifier
+#define MAX_KEY_SIZE 256
+#define MAX_CONSUMER_GROUP_SIZE 128
+#define MAX_HEADERS 20
+
+struct kafka_consumer_request_t {
+	u64 start_time;
+	u64 end_time;
+//	struct apm_span_context sc;
+	struct apm_span_context psc;
+	char topic[MAX_TOPIC_SIZE];
+	char key[MAX_KEY_SIZE];
+	char consumer_group[MAX_CONSUMER_GROUP_SIZE];
+	s64 offset;
+	s64 partition;
+} __attribute__((packed));
+
+struct {
+	__uint(type, BPF_MAP_TYPE_HASH);
+	__type(key, void*);
+	__type(value, struct kafka_consumer_request_t);
+	__uint(max_entries, MAX_CONCURRENT);
+} kafka_consumer_events SEC(".maps");
+
+struct {
+	__uint(type, BPF_MAP_TYPE_HASH);
+	__type(key, void*);
+	__type(value, void*);
+	__uint(max_entries, MAX_CONCURRENT);
+} goroutine_to_go_context SEC(".maps");
+
+struct {
+	__uint(type, BPF_MAP_TYPE_HASH);
+	__type(key, void*);
+	__type(value, void*);
+	__uint(max_entries, MAX_CONCURRENT);
+} kafka_reader_to_conn SEC(".maps");
+
+struct {
+	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+	__uint(key_size, sizeof(u32));
+	__uint(value_size, sizeof(struct kafka_consumer_request_t));
+	__uint(max_entries, 1);
+} kafka_consumer_request_storage_map SEC(".maps");
+
+// Storage for temporary strings in extract_span_context_from_headers to avoid stack overflow
+struct kafka_extract_temp_t {
+	char key[CW_HEADER_KEY_LENGTH];
+	char current_key[CW_HEADER_KEY_LENGTH];
+	char val[CW_HEADER_VAL_LENGTH];
+};
+
+struct {
+	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+	__uint(key_size, sizeof(u32));
+	__uint(value_size, sizeof(struct kafka_extract_temp_t));
+	__uint(max_entries, 1);
+} kafka_extract_temp_storage_map SEC(".maps");
+
+// Storage for parent span context to avoid stack overflow
+
+
+static __always_inline long extract_span_context_from_headers(void *message, struct apm_span_context *parent_span_context, struct ebpf_proc_info *proc_info) {
+
+	// Read the headers slice descriptor
+	void *headers = (void *)(message + proc_info->kafka_message_headers_pos);
+	struct go_slice headers_slice = {0};
+	bpf_probe_read(&headers_slice, sizeof(headers_slice), headers);
+
+	char key[CW_HEADER_KEY_LENGTH] = CW_HEADER_KEY_VAL;
+	char current_key[CW_HEADER_KEY_LENGTH];
+
+	for (u64 i = 0; i < headers_slice.len; i++) {
+		if (i >= MAX_HEADERS) {
+			break;
+		}
+		// Read the header
+		struct kafka_header_t header = {0};
+		bpf_probe_read(&header, sizeof(header), headers_slice.array + (i * sizeof(header)));
+		// Check if it is the traceparent header
+		if (header.key.len == CW_HEADER_KEY_LENGTH && header.value.len == CW_HEADER_VAL_LENGTH) {
+			bpf_probe_read_user(current_key, sizeof(current_key), header.key.str);
+			if (bpf_memcmp(key, current_key, sizeof(key))) {
+				// Found the traceparent header, extract the span context
+//				char val[CW_HEADER_VAL_LENGTH];
+//				bpf_probe_read(val, CW_HEADER_VAL_LENGTH, header.value.array);
+//				w3c_string_to_span_context(val, parent_span_context);
+				cw_string_to_span_context(header.value.array, parent_span_context);
+				return 0;
+			}
+		}
+	}
+	return -1;
+}
+
+// This instrumentation attaches uprobe to the following function:
+// func (r *Reader) FetchMessage(ctx context.Context) (Message, error)
+// Entry probe: End the span that was started when the last call returned
+SEC("uprobe/FetchMessage")
+int uprobe_FetchMessage(struct pt_regs *ctx) {
+	/* FetchMessage is a blocking function, hence its execution time is not a good indication for the time it took to handle the message.
+	Instead, we use the entry to this function to end the span which was started when it's last call returned. (A typical consumer calls FetchMessage in a loop)
+	A less confusing way of looking at it is as follows 
+	1. Entry to FetchMessage
+	2. internal kafka code before blocking
+	3. Blocking wait for message
+	4. internal kafka code after blocking
+	5. Return from FetchMessage
+	Steps 2-4 are executed in a separate goroutine from the one the user of the library.
+	*/
+	void *reader = get_argument(ctx, 1);
+//	struct go_iface go_context = {0};
+//	get_Go_context(ctx, 2, 0, true, &go_context);
+	void *goroutine = (void *)GOROUTINE(ctx);
+
+	// Save reader to map for use in return probe
+	if (reader != NULL) {
+		void *reader_ptr = reader;
+		bpf_map_update_elem(&kafka_reader_to_conn, &goroutine, &reader_ptr, BPF_ANY);
+	}
+	
+	struct kafka_consumer_request_t *kafka_request = bpf_map_lookup_elem(&kafka_consumer_events, &goroutine);
+	if (kafka_request == NULL || reader == NULL) {
+//		bpf_printk("kafka goto end");
+		// The current goroutine has no kafka request,
+		// this can happen in the first time FetchMessage is called
+		// Save the context for the return probe for in-process context propagation
+		goto save_context;
+	}
+
+	// Get consumer group from reader config
+	__u64 pid_tgid = bpf_get_current_pid_tgid();
+	__u32 tgid = pid_tgid >> 32;
+	struct ebpf_proc_info *proc_info = bpf_map_lookup_elem(&proc_info_map, &tgid);
+	if (!proc_info) {
+		return 0;
+	}
+	get_go_string_from_user_ptr((void *)(reader + proc_info->kafka_reader_config_pos + proc_info->kafka_reader_config_group_id_pos),
+	                            kafka_request->consumer_group,
+	                            sizeof(kafka_request->consumer_group));
+
+
+	kafka_request->end_time = bpf_ktime_get_ns();
+
+	// Output span event
+//	__u64 pid_tgid = bpf_get_current_pid_tgid();
+//	__u32 tgid = pid_tgid >> 32;
+
+	u32 zero = 0;
+	struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+	if (e) {
+		struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
+		struct apm_trace_info_t * start_trace_info = get_apm_trace_info_by_trace_key(trace_key);
+		if (!start_trace_info) {
+			return -1;
+		}
+
+		cw_copy_byte_arrays(kafka_request->psc.trace_id, e->trace_id_from, APM_TRACE_ID_SIZE);
+		cw_copy_byte_arrays(kafka_request->psc.assumed_app_id, e->called_id, APM_ASSUMED_APP_ID_SIZE);
+		cw_copy_byte_arrays(kafka_request->psc.instance_id, e->instance_id_from, APM_INSTANCE_ID_SIZE);
+		cw_copy_byte_arrays(kafka_request->psc.app_id, e->app_id_from, APM_APP_ID_SIZE);
+		cw_copy_byte_arrays(kafka_request->psc.span_id, e->span_id_from, APM_SPAN_ID_SIZE);
+		cw_copy_byte_arrays(kafka_request->psc.type_from, e->type_from, APM_TYPE_FROM_SIZE);
+
+		__u32 event_count = cw_get_event_count(start_trace_info->trace_id);
+
+		e->method = METHOD_CONSUME;
+		SET_TRACE_END(e, PROTOCOL_KAFKA);
+		e->start_at = kafka_request->start_time;
+		e->end_at = kafka_request->end_time;
+		e->duration = e->end_at - e->start_at;
+		e->trace_id = start_trace_info->trace_id;
+		e->event_count = event_count;
+
+		// 填充 MQ 相关信息
+		bpf_probe_read(&e->mq.topic, sizeof(e->mq.topic), kafka_request->topic);
+		bpf_probe_read(&e->mq.key, sizeof(e->mq.key), kafka_request->key);
+		bpf_printk("kafka_request->key %s",kafka_request->key);
+		bpf_printk("kafka_request->topic %s",kafka_request->topic);
+
+		bpf_map_delete_elem(&trace_event_count_heap, &e->trace_id);
+		// 清除业务层trace信息
+		clear_parent_span_context_by_trace_key(start_trace_info->trace_key);
+		// 清除trace信息
+		__u32 tid =  (__u32)pid_tgid;
+		cw_clear_trace(tgid, tid, 0);
+
+		bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+
+	}
+
+//	stop_tracking_span(&kafka_request->sc, &kafka_request->psc);
+	bpf_map_delete_elem(&kafka_consumer_events, &goroutine);
+
+save_context:
+	// Save the context for the return probe
+//	bpf_map_update_elem(&goroutine_to_go_context, &goroutine, &go_context.data, 0);
+	return 0;
+}
+
+// This instrumentation attaches uprobe to the following function:
+// func (r *Reader) FetchMessage(ctx context.Context) (Message, error)
+// Return probe: Start tracking the span for the message that was just fetched
+SEC("uprobe/FetchMessage")
+int uprobe_FetchMessage_Returns(struct pt_regs *ctx) {
+	/* The FetchMessage function returns a message to the user after it read it from a channel.
+	The user consuming this message will handle it after this probe,
+	thus it is a good place to start track the span corresponds to this message. In addition we save the message
+	in a hash map to be read by the entry probe of FetchMessage, which will end this span */
+	void *goroutine = (void *)GOROUTINE(ctx);
+	u32 map_id = 0;
+	struct kafka_consumer_request_t *kafka_request = bpf_map_lookup_elem(&kafka_consumer_request_storage_map, &map_id);
+	if (kafka_request == NULL) {
+		bpf_printk("uprobe/FetchMessage_Returns: kafka_request is NULL");
+		return 0;
+	}
+
+	// Get proc_info to access Kafka offsets
+	__u64 pid_tgid = bpf_get_current_pid_tgid();
+	__u32 tgid = pid_tgid >> 32;
+	struct ebpf_proc_info *proc_info = bpf_map_lookup_elem(&proc_info_map, &tgid);
+	if (proc_info == NULL) {
+		bpf_printk("uprobe/FetchMessage_Returns: proc_info is NULL");
+		return 0;
+	}
+
+	// The message returned on the stack since it returned as a struct and not a pointer
+	void *message = (void *)(PT_REGS_SP(ctx) + 8);
+	if (proc_info->kafka_message_value_pos != 0) {
+		struct go_slice_ot value_slice = {0};
+		bpf_probe_read(&value_slice, sizeof(value_slice), (void *)(message + proc_info->kafka_message_value_pos));
+//		bpf_printk("len %d",value_slice.len);
+		if (value_slice.array == NULL) {
+//			bpf_printk("no val");
+			return 0;
+		} else {
+			bpf_printk("has val %llu", goroutine);
+		}
+	} else {
+		return 0;
+	}
+
+	// Zero the request
+	u32 zero_id = 0;
+	struct kafka_consumer_request_t *zero_kafka_request = bpf_map_lookup_elem(&kafka_consumer_request_storage_map, &zero_id);
+	if (zero_kafka_request != NULL) {
+		__builtin_memcpy(kafka_request, zero_kafka_request, sizeof(struct kafka_consumer_request_t));
+	}
+
+	kafka_request->start_time = bpf_ktime_get_ns();
+
+//	bpf_printk("start %llu",message);
+
+//	struct go_iface go_context = {0};
+//	get_Go_context(ctx, 2, 0, true, &go_context);
+
+	// Get the parent span context from the message headers
+	// Use per-cpu map to avoid stack overflow
+//	u32 parent_sc_storage_id = 0;
+//	struct apm_span_context *parent_span_context = bpf_map_lookup_elem(&cw_parent_span_context_storage_map, &parent_sc_storage_id);
+//	if (parent_span_context == NULL) {
+//		cw_bpf_debug("uprobe/FetchMessage_Returns: Failed to get parent_span_context storage");
+//		return 0;
+//	}
+	// Manual zeroing to avoid memset issues
+//	parent_span_context->TraceFlags = 0;
+//	__builtin_memset(parent_span_context->trace_id, 0, TRACE_ID_SIZE);
+//	__builtin_memset(parent_span_context->span_id, 0, SPAN_ID_SIZE);
+
+	// Print message Value field before extracting span context from headers
+	// Value is a byte slice, similar to Key
+
+	long extract_result = extract_span_context_from_headers(message, &kafka_request->psc, proc_info);
+//	if (extract_result != 0) {
+//		generate_random_bytes(kafka_request->psc.trace_id, TRACE_ID_SIZE);
+//	}
+	if (extract_result == 0) {
+		bpf_printk("find");
+		for (int i = 0; i < APM_TRACE_ID_SIZE; i++) {
+			bpf_printk("cw_get_current_tracking_span-trace_id[%d] = %02x", i, kafka_request->psc.trace_id[i]);
+		}
+		// Successfully extracted parent span context from headers
+//		kafka_request->psc = *parent_span_context;
+	} else {
+		// No parent span context in headers, try to get from Go context
+		generate_random_bytes(kafka_request->psc.trace_id, TRACE_ID_SIZE);
+//		struct span_context *parent_sc = get_parent_span_context(go_context.data);
+//		if (parent_sc != NULL) {
+//			kafka_request->psc = *parent_sc;
+//		}
+		bpf_printk("no header");
+	}
+
+	// Collecting message attributes
+	// topic
+	get_go_string_from_user_ptr(
+			(void *)(message + proc_info->kafka_message_topic_pos), kafka_request->topic, sizeof(kafka_request->topic));
+	bpf_printk("topic %s",kafka_request->topic);
+	// Key is a byte slice, first read the slice
+	if (proc_info->kafka_message_key_pos != 0) {
+		struct go_slice_ot key_slice = {0};
+		bpf_probe_read(&key_slice, sizeof(key_slice), (void *)(message + proc_info->kafka_message_key_pos));
+		u64 size_to_read = key_slice.len > MAX_KEY_SIZE ? MAX_KEY_SIZE : key_slice.len;
+		size_to_read &= 0xFF;
+		// Then read the actual key
+		if (size_to_read > 0 && key_slice.array != NULL) {
+			bpf_probe_read_user(kafka_request->key, size_to_read, key_slice.array);
+		}
+	}
+
+	u32 zero = 0;
+	struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+
+	if (e) {
+		e->method = METHOD_CONSUME;
+
+		SET_TRACE_START(e, PROTOCOL_KAFKA);
+		e->start_at = kafka_request->start_time;
+		e->end_at = kafka_request->end_time;
+		e->duration = e->end_at - e->start_at;
+
+		struct apm_trace_info_t trace_info = cw_save_trace_info(pid_tgid, tgid, 0);
+		e->trace_id = trace_info.trace_id;
+
+		// 填充 MQ 相关信息
+		bpf_probe_read(&e->mq.topic, sizeof(e->mq.topic), kafka_request->topic);
+		bpf_probe_read(&e->mq.key, sizeof(e->mq.key), kafka_request->key);
+
+		// Send event
+		long error = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+		if (error == 0) {
+			bpf_printk("send start ok");
+//			cw_add_event_count(e->trace_id);
+//			cw_bpf_debug("[kafka consumer] send success trace %llu", e->trace_id);
+		}
+	} else {
+		bpf_printk("no e");
+	}
+//	return 0;
+
+	// Initialize child span context
+//	generate_random_bytes(kafka_request->sc.trace_id, TRACE_ID_SIZE);
+//	generate_random_bytes(kafka_request->sc.span_id, SPAN_ID_SIZE);
+//
+//	// If we have a parent span, use its trace ID
+//	if (extract_result == 0 && parent_span_context != NULL) {
+//		copy_byte_arrays(parent_span_context->trace_id, kafka_request->sc.trace_id, TRACE_ID_SIZE);
+//	}
+//
+//	// Collecting message attributes
+//	// topic
+//	get_go_string_from_user_ptr((void *)(message + proc_info->kafka_message_topic_pos), kafka_request->topic, sizeof(kafka_request->topic));
+//	// partition
+//	if (proc_info->kafka_message_partition_pos != 0) {
+//		bpf_probe_read(&kafka_request->partition, sizeof(kafka_request->partition), (void *)(message + proc_info->kafka_message_partition_pos));
+//	}
+//	// offset
+//	if (proc_info->kafka_message_offset_pos != 0) {
+//		bpf_probe_read(&kafka_request->offset, sizeof(kafka_request->offset), (void *)(message + proc_info->kafka_message_offset_pos));
+//	}
+//	// Key is a byte slice, first read the slice descriptor
+//	struct go_slice_ot key_slice = {0};
+//	bpf_probe_read(&key_slice, sizeof(key_slice), (void *)(message + proc_info->kafka_message_key_pos));
+//	u64 size_to_read = key_slice.len > MAX_KEY_SIZE ? MAX_KEY_SIZE : key_slice.len;
+//	size_to_read &= 0xFF;
+//	// Then read the actual key
+//	if (size_to_read > 0 && key_slice.array != NULL) {
+//		bpf_probe_read_user(kafka_request->key, size_to_read, key_slice.array);
+//	}
+//
+//	// Set app_id and instance_id from proc_info
+//	// proc_info is already obtained above
+//	if (proc_info) {
+//		copy_byte_arrays(proc_info->app_id, kafka_request->sc.app_id, APM_APP_ID_SIZE);
+//		copy_byte_arrays(proc_info->instance_id, kafka_request->sc.instance_id, APM_INSTANCE_ID_SIZE);
+//	}
+
+	bpf_map_update_elem(&kafka_consumer_events, &goroutine, kafka_request, 0);
+	// We are start tracking the consumer span in the return probe,
+	// hence we can't read Go's context directly from the registers as we usually do.
+	// Using the goroutine address as a key to the map that contains the context.
+//	void *context_data_ptr = bpf_map_lookup_elem(&goroutine_to_go_context, &goroutine);
+//	if (context_data_ptr != NULL) {
+//		bpf_probe_read_kernel(&context_data_ptr, sizeof(context_data_ptr), context_data_ptr);
+//		start_tracking_span(context_data_ptr, &kafka_request->sc);
+//		bpf_map_delete_elem(&goroutine_to_go_context, &goroutine);
+//	}
+
+	return 0;
+}

+ 2 - 0
ebpftracer/ebpf/utrace/go/mq/kafka/producer.probe.bpf.c

@@ -460,6 +460,7 @@ int uprobe_WriteMessages_Returns(struct pt_regs *ctx) {
 	__u32 tgid = pid_tgid >> 32;
 
 	e->protocol = PROTOCOL_KAFKA;
+	e->method = METHOD_PRODUCE;
 	e->trace_end = 0;
 	e->trace_start = 0;
 	e->start_at = kafka_request->start_time;
@@ -527,6 +528,7 @@ int uprobe_WriteMessages_Returns(struct pt_regs *ctx) {
 		cw_copy_byte_arrays(kafka_request->msgs[i].sc.assumed_app_id, e->assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
 		cw_copy_byte_arrays(kafka_request->msgs[i].sc.span_id, e->span_id, APM_SPAN_ID_SIZE);
 		// 发送事件
+		SET_TRACE_METHOD(e);
 		long error = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
 		if (error == 0) {
 			cw_add_event_count(e->trace_id);

+ 1 - 0
ebpftracer/ebpf/utrace/go/net/grpc.client.probe.bpf.c

@@ -298,6 +298,7 @@ done:
     e->end_at = bpf_ktime_get_ns();
     e->start_at = grpc_span->start_time;
     e->duration = e->end_at - e->start_at;
+	SET_TRACE_METHOD(e);
     // cw_bpf_debug("send_event trace_id is444 %llu\n", e->trace_id);
     long error = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
 	if (error ==0){

+ 3 - 9
ebpftracer/ebpf/utrace/go/net/grpc.server.probe.bpf.c

@@ -246,10 +246,8 @@ handleStream(struct pt_regs *ctx, void *stream_ptr, struct go_iface *go_context)
 
     struct apm_trace_info_t trace_info = cw_save_trace_info(id,pid, k.fd);
     
-    e->trace_start = 1;
-    e->trace_end = 0;
+    SET_TRACE_START(e, PROTOCOL_GRPC);
     e->trace_type = 1;
-    e->protocol = PROTOCOL_TRACE;
     e->trace_id = trace_info.trace_id;
 
 
@@ -394,13 +392,11 @@ int uprobe_server_handleStream_Returns(struct pt_regs *ctx) {
 	// cw_bpf_debug("req->ns:%llu",req->ns);
 	e->end_at = bpf_ktime_get_ns();
     e->duration = e->end_at - e->start_at;
-    e->protocol = PROTOCOL_TRACE;
     e->status = http_status;
     e->pid = k.pid;
     e->fd = k.fd;
     // e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
-    e->trace_start = 0;
-    e->trace_end = 1;
+    SET_TRACE_END(e, PROTOCOL_GRPC);
     e->trace_type = 1;
     e->trace_id = trace_id;
     e->payload_size = 0;
@@ -573,13 +569,11 @@ lookup:
 	// cw_bpf_debug("req->ns:%llu",req->ns);
 	e->end_at = bpf_ktime_get_ns();
     e->duration = e->end_at - e->start_at;
-    e->protocol = PROTOCOL_TRACE;
     e->status = http_status;
     e->pid = k.pid;
     e->fd = k.fd;
     // e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
-    e->trace_start = 0;
-    e->trace_end = 1;
+    SET_TRACE_END(e, PROTOCOL_GRPC);
     e->trace_type = 1;
     e->trace_id = trace_id;
     e->payload_size = 0;

+ 13 - 13
ebpftracer/ebpf/utrace/go/net/server.probe.bpf.c

@@ -67,19 +67,19 @@ struct {
 	__uint(max_entries, MAX_CONCURRENT);
 } header_range SEC(".maps");
 
-struct {
-	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
-	__uint(key_size, sizeof(u32));
-	__uint(value_size, sizeof(struct span_context));
-	__uint(max_entries, 1);
-} parent_span_context_storage_map SEC(".maps");
-
-struct {
-	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
-	__uint(key_size, sizeof(u32));
-	__uint(value_size, sizeof(struct apm_span_context));
-	__uint(max_entries, 1);
-} cw_parent_span_context_storage_map SEC(".maps");
+//struct {
+//	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+//	__uint(key_size, sizeof(u32));
+//	__uint(value_size, sizeof(struct span_context));
+//	__uint(max_entries, 1);
+//} parent_span_context_storage_map SEC(".maps");
+
+//struct {
+//	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+//	__uint(key_size, sizeof(u32));
+//	__uint(value_size, sizeof(struct apm_span_context));
+//	__uint(max_entries, 1);
+//} cw_parent_span_context_storage_map SEC(".maps");
 
 struct {
 	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);

+ 1 - 0
ebpftracer/l7/l7.go

@@ -194,6 +194,7 @@ func (s Status) Error() bool {
 
 type RequestData struct {
 	Protocol          Protocol
+	Pid               uint32
 	Status            Status
 	Duration          time.Duration
 	Method            Method

+ 25 - 4
ebpftracer/tls.go

@@ -44,6 +44,7 @@ const (
 	goGocqlSessionExecuteQuery     = "github.com/gocql/gocql.(*Session).executeQuery"
 	goGocqlSessionExecuteQueryV2   = "github.com/apache/cassandra-gocql-driver/v2.(*Session).executeQuery"
 	goKafkaWriterWriteMessages     = "github.com/segmentio/kafka-go.(*Writer).WriteMessages"
+	goKafkaReaderFetchMessage      = "github.com/segmentio/kafka-go.(*Reader).FetchMessage"
 )
 
 var (
@@ -448,14 +449,22 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 				&info.IoWriterBufPtrPos:   tracer.NewID("std", "bufio", "Writer", "buf"),
 				&info.IoWriterNPos:        tracer.NewID("std", "bufio", "Writer", "n"),
 				// Kafka Message fields
-				&info.KafkaMessageKeyPos:     tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Key"),
-				&info.KafkaMessageTopicPos:   tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Topic"),
-				&info.KafkaMessageHeadersPos: tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Headers"),
-				&info.KafkaMessageTimePos:    tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Time"),
+				&info.KafkaMessageKeyPos:       tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Key"),
+				&info.KafkaMessageTopicPos:     tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Topic"),
+				&info.KafkaMessageHeadersPos:   tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Headers"),
+				&info.KafkaMessageTimePos:      tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Time"),
+				&info.KafkaMessagePartitionPos: tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Partition"),
+				&info.KafkaMessageOffsetPos:    tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Offset"),
+				&info.KafkaMessageValuePos:     tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Value"),
 				// Kafka Writer fields
 				&info.KafkaWriterTopicPos: tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Writer", "Topic"),
 				// Try both Brokers and Addr fields - Addr is a string, Brokers is []string
 				&info.KafkaWriterAddrPos: tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Writer", "Addr"),
+				// Kafka Reader fields
+				// Note: Reader.config is unexported, try both "config" and "Config"
+				&info.KafkaReaderConfigPos:        tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Reader", "config"),
+				&info.KafkaReaderConfigGroupIDPos: tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "ReaderConfig", "GroupID"),
+				&info.KafkaReaderConfigTopicsPos:  tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "ReaderConfig", "topics"),
 				// net.TCPAddr offsets (standard library)
 				&info.TcpAddrIPOffset:   tracer.NewID("std", "net", "TCPAddr", "IP"),
 				&info.TcpAddrPortOffset: tracer.NewID("std", "net", "TCPAddr", "Port"),
@@ -562,6 +571,9 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 		case goKafkaWriterWriteMessages:
 			matchedSymbols++
 			klog.Infof("[AttachGoTlsUprobes] STEP 19.13: Matched kafka Writer.WriteMessages symbol (index=%d)", i)
+		case goKafkaReaderFetchMessage:
+			matchedSymbols++
+			klog.Infof("[AttachGoTlsUprobes] STEP 19.14: Matched kafka Reader.FetchMessage symbol (index=%d)", i)
 		case goReadContinuedLineSlice:
 		default:
 			continue
@@ -758,6 +770,15 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 				klog.Infoln("uprobe_WriteMessages ok")
 				links = append(links, retLinks...)
 			}
+		case goKafkaReaderFetchMessage:
+			retLinks, err := attachUprobeWithReturns(exe, s.Name, "uprobe_FetchMessage", "uprobe_FetchMessage_Returns", t.uprobes, address, s, textSection, textSectionLen, textSectionData, ef.Machine, "failed to attach uprobe_FetchMessage uprobe", true)
+			if err != nil {
+				return nil, err
+			}
+			if retLinks != nil {
+				klog.Infoln("uprobe_FetchMessage ok")
+				links = append(links, retLinks...)
+			}
 		}
 	}
 	klog.Infof("[AttachGoTlsUprobes] Symbol processing completed, matched symbols=%d, total links=%d", matchedSymbols, len(links))

+ 2 - 1
ebpftracer/tracer.go

@@ -774,6 +774,7 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 			payload := reader.Bytes()
 			req := &l7.RequestData{
 				Protocol:       l7.Protocol(v.Protocol),
+				Pid:            v.Pid,
 				Status:         l7.Status(v.Status),
 				Duration:       time.Duration(v.Duration),
 				Method:         l7.Method(v.Method),
@@ -799,7 +800,7 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				klog.Debugf("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())
 				klog.Debugf("runEventsReader ComponentDAddr.String %s", req.ComponentDAddr.String())
 			}
-			if v.TraceEnd == 1 {
+			if v.TraceEnd == TRACE_STATUS {
 				req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])
 				req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:])
 				req.ParentSpanContext.InstanceIdFrom = hex.EncodeToString(v.InstanceIdFrom[:])

+ 22 - 3
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -423,6 +423,7 @@ func buildLevelFromEvent(sdl *RootDataT) {
 func initRootDataFromEvent() RootDataT {
 	hostID := utils.GetHostID()
 	accountID := utils.GetAccountID()
+	sip := utils.GetHostIP()
 	sysTag := utils.GetSysTag()
 	systemUUID := utils.GetSystemUUID()
 	data := RootDataT{
@@ -455,7 +456,7 @@ func initRootDataFromEvent() RootDataT {
 		Sampling:        0,
 		ServiceName:     "",
 		ServiceType:     APP_SERVICE_TYPE,
-		Sip:             "",
+		Sip:             sip,
 		Sn:              "",
 		SpanIdFrom:      "",
 		Sport:           0,
@@ -471,7 +472,7 @@ func initRootDataFromEvent() RootDataT {
 		DestinationAddr: "",
 		Sys:             sysTag,
 		SystemUUID:      systemUUID,
-		UserAgent:       "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36",
+		UserAgent:       "",
 	}
 	return data
 }
@@ -743,7 +744,7 @@ func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int {
 		case "net.peer.name":
 			// TODO 修改 ClientIp sip获取方式
 			traceRoot.ClientIp = attr.Value.AsString()
-			traceRoot.Sip = attr.Value.AsString()
+			//traceRoot.Sip = attr.Value.AsString()
 			traceRoot.Sn = attr.Value.AsString()
 		case "net.peer.port":
 			traceRoot.Sport = attr.Value.AsInt64()
@@ -782,6 +783,12 @@ func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int {
 		case "rpc.oper_type":
 			traceRoot.OperType = "PROVIDER"
 			mNode.OperType = "PROVIDER"
+		// map tag
+		case "service_type":
+			mNode.ServiceType = attr.Value.AsString()
+		// map tag
+		case "oper_type":
+			mNode.OperType = attr.Value.AsString()
 		case "server.app_id":
 			traceRoot.AppId = attr.Value.AsInt64()
 		case "server.agent_id":
@@ -798,6 +805,18 @@ func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int {
 			traceRoot.ContainerID = attr.Value.AsString()
 		case "server.user_agent":
 			traceRoot.UserAgent = attr.Value.AsString()
+		case "method_name":
+			mNode.MethodName = attr.Value.AsString()
+		case "mq.ip":
+			mNode.Ip = attr.Value.AsString()
+		case "mq.port":
+			mNode.Port = attr.Value.AsInt64()
+		// root tag
+		case "server.service_type":
+			traceRoot.ServiceType = attr.Value.AsString()
+		// root tag
+		case "server.oper_type":
+			traceRoot.OperType = attr.Value.AsString()
 		}
 	}
 	traceRoot.Maps = append(traceRoot.Maps, mNode)

+ 26 - 0
tracing/apm_tracing.go

@@ -151,6 +151,32 @@ func (t *Trace) GrpcServerTraceStartEvent(sn string, sport uint16, r *l7.Request
 	t.startReady()
 }
 
+func (t *Trace) MQConsumerTraceStartEvent(sn string, sport uint16, r *l7.RequestData, appInfo AppInfo, container_id string) {
+	t.span.SetAttributes(attribute.String("rpc.uri", string(r.Payload)))
+	r.Protocol = l7.ProtocolKafka
+	t.commonAttrs = []attribute.KeyValue{
+		semconv.NetPeerName(sn),
+		semconv.NetPeerPort(int(sport)),
+		// buildAppMapFromEvent
+		attribute.Int("server.code_type", appInfo.CodeType.Int()),
+		attribute.String("server.app_name", appInfo.AppName),
+		attribute.String("server.service_name", r.Protocol.ServiceNameString()),
+		attribute.String("service_type", "MQ"),
+		attribute.String("oper_type", "CONSUMER"),
+		attribute.String("server.service_type", "TASK"),
+		attribute.Int64("server.app_id", appInfo.AppIdHash.IntVal),
+		attribute.Int64("server.agent_id", appInfo.AgentId),
+		attribute.Int64("server.instance_id", appInfo.InstanceIdHash.IntVal),
+		attribute.String("server.container_id", container_id),
+		attribute.String("http.uri", "/"+r.MQTopic),
+		attribute.Int("server.pid", int(r.Pid)),
+		attribute.String("mq.ip", sn),
+		attribute.Int("mq.port", int(sport)),
+	}
+	t.span.SetAttributes(t.commonAttrs...)
+	t.startReady()
+}
+
 func (t *Trace) TraceStartEvent(method, path, sn, ua string, sport uint16, status l7.Status, addr netaddr.IPPort, pid uint32, appInfo AppInfo, container_id string) {
 	t.span.SetAttributes(semconv.HTTPURL(fmt.Sprintf("http://%s:%d%s", sn, sport, path)),
 		semconv.HTTPMethod(method),

+ 4 - 0
utils/modelse/app_info.go

@@ -22,6 +22,10 @@ const (
 	APP_FUSE_ERROR
 )
 
+const (
+	TRACE_STATUS = 200
+)
+
 func (s APP_TYPE) Error() APP_TYPE {
 	switch s {
 	case APP_UNINSTALL:

+ 12 - 6
utils/modelse/bpf_struct.go

@@ -218,12 +218,18 @@ type EbpfProcInfo struct {
 	IoWriterBufPtrPos uint64
 	IoWriterNPos      uint64
 	// Kafka
-	KafkaMessageKeyPos     uint64
-	KafkaMessageTopicPos   uint64
-	KafkaMessageHeadersPos uint64
-	KafkaMessageTimePos    uint64
-	KafkaWriterTopicPos    uint64
-	KafkaWriterAddrPos     uint64 // Writer.Brokers field offset
+	KafkaMessageKeyPos          uint64
+	KafkaMessageTopicPos        uint64
+	KafkaMessageHeadersPos      uint64
+	KafkaMessageTimePos         uint64
+	KafkaMessagePartitionPos    uint64 // Message.Partition field offset
+	KafkaMessageOffsetPos       uint64 // Message.Offset field offset
+	KafkaMessageValuePos        uint64 // Message.Value field offset
+	KafkaWriterTopicPos         uint64
+	KafkaWriterAddrPos          uint64 // Writer.Brokers field offset
+	KafkaReaderConfigPos        uint64 // Reader.Config field offset
+	KafkaReaderConfigGroupIDPos uint64 // Reader.Config.GroupID field offset
+	KafkaReaderConfigTopicsPos  uint64 // Reader.Config.Topics field offset
 	// net.TCPAddr offsets
 	TcpAddrIPOffset   uint64 // net.TCPAddr.IP field offset
 	TcpAddrPortOffset uint64 // net.TCPAddr.Port field offset