Browse Source

Feature #TASK_QT-31498 kafka-producer

Carl 6 tháng trước cách đây
mục cha
commit
242e9b1db8

+ 8 - 0
containers/container_apm.go

@@ -477,6 +477,14 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 	case l7.ProtocolKafka:
 		stats.observe(r.Status.String(), "", r.Duration)
 		if c.l7Attach && c.valuableTrace(r.TraceId) {
+			if c.AppInfo.AppName != "" {
+				klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, r.DestAddrString)
+			}
+			apmTrace, err := c.getOrInitTrace(r.TraceId)
+			if err == nil {
+				apmTrace.MQTraceQueryEvent(r.Protocol, semconv.MessagingKafkaClientID("kafka"), "", "", r, conn.Src, conn.ActualDest)
+				c.SendEvent(apmTrace, r.TraceId)
+			}
 		}
 	/**
 	 * RabbitMQ / NATS

+ 10 - 0
ebpftracer/ebpf/include/apm_trace.h

@@ -36,6 +36,9 @@
 //#define CW_STREAM_HEADER_LEN 135 // CW_HEADER_KEY_LENGTH + 2 + CW_HEADER_VAL_LENGTH + 2 + 1
 #define CW_STREAM_HEADER_LEN (CW_HEADER_KEY_LENGTH + 2 + CW_HEADER_VAL_LENGTH + 2 + 1)
 
+#define MAX_MQ_TOPIC_SIZE 256  // Max MQ topic size (e.g., Kafka topic)
+#define MAX_MQ_KEY_SIZE 256     // Max MQ key size (e.g., Kafka message key)
+
 /***********************************************************
  * Trace struct
  ***********************************************************/
@@ -80,6 +83,13 @@ struct apm_span_context {
 	unsigned char assumed_app_id[APM_ASSUMED_APP_ID_SIZE];
 	unsigned char span_id[APM_SPAN_ID_SIZE];
 };
+
+// MQ 信息结构体(用于 Kafka、RabbitMQ 等消息队列)
+struct mq_info_t {
+	char topic[MAX_MQ_TOPIC_SIZE];
+	char key[MAX_MQ_KEY_SIZE];
+};
+
 /*
 	 * Whether traceID is zero ?
 	 * For the client to actively send request, set traceID to zero.

+ 10 - 10
ebpftracer/ebpf/include/socket_trace.h

@@ -303,16 +303,16 @@ struct go_interface {
 	void *ptr;
 };
 
-struct go_slice {
-	void *ptr;
-	unsigned long long len;
-	unsigned long long cap;
-};
-
-struct go_string {
-	const char *ptr;
-	unsigned long long len;
-};
+//struct go_slice {
+//	void *ptr;
+//	unsigned long long len;
+//	unsigned long long cap;
+//};
+
+//struct go_string {
+//	const char *ptr;
+//	unsigned long long len;
+//};
 
 struct tls_conn {
 	int fd;

+ 10 - 0
ebpftracer/ebpf/include/socket_trace_common.h

@@ -214,6 +214,16 @@ struct ebpf_proc_info {
 	// io writer
 	__u64 io_writer_buf_ptr_pos;
 	__u64 io_writer_n_pos;
+	//Kafka
+	__u64 kafka_message_key_pos;
+	__u64 kafka_message_topic_pos;
+	__u64 kafka_message_headers_pos;
+	__u64 kafka_message_time_pos;
+	__u64 kafka_writer_topic_pos;
+	__u64 kafka_writer_addr_pos;  // Writer.Brokers field offset
+	// net.TCPAddr offsets
+	__u64 tcp_addr_ip_offset;    // net.TCPAddr.IP field offset
+	__u64 tcp_addr_port_offset;  // net.TCPAddr.Port field offset
 } __attribute__((packed));
 
 enum {

+ 5 - 1
ebpftracer/ebpf/l7/l7.c

@@ -14,6 +14,8 @@
 #define PROTOCOL_DUBBO2    12
 #define PROTOCOL_DNS       13
 #define PROTOCOL_DM        14
+#define PROTOCOL_MARIADB   15
+#define PROTOCOL_GRPC      16
 
 
 
@@ -97,11 +99,13 @@ struct l7_event {
 	unsigned char app_id_from[APM_APP_ID_SIZE];
 	unsigned char span_id_from[APM_SPAN_ID_SIZE];
     unsigned char type_from[APM_TYPE_FROM_SIZE];
-    unsigned char rpc_target[64];
+    unsigned char target_addr[64];
 
     // 错误消息字段
     unsigned char error_message[ERROR_MSG_PAYLOAD_SIZE];
 //    __u32 test_id;
+    // MQ 相关信息(用于 Kafka、RabbitMQ 等消息队列)
+    struct mq_info_t mq;
     char payload[MAX_PAYLOAD_SIZE];
 } ;
 

+ 1 - 4
ebpftracer/ebpf/utrace/go/db/gocql.probe.bpf.c

@@ -429,10 +429,7 @@ int uprobe_Session_executeQuery_cassandra_Returns(struct pt_regs *ctx) {
     if (trace_info) {
         e->trace_id = trace_info->trace_id;
     }
-    
-    if (e->trace_id == 0) {
-        e->trace_id = get_apm_trace_id(tgid, pid_tgid);
-    }
+
     // 发送事件
     long error = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
     if (error == 0) {

+ 76 - 118
ebpftracer/ebpf/utrace/go/include/alloc.h

@@ -44,6 +44,33 @@ struct
 //    __uint(pinning, LIBBPF_PIN_BY_NAME);
 } proc_alloc_map SEC(".maps");
 
+static __always_inline u64 get_area_start_ot(u64 start_addr,u64 end_addr,u64 total_cpus) {
+	s64 partition_size = (end_addr - start_addr) / total_cpus;  // Fixed: was (start_addr - end_addr)
+	u32 current_cpu = bpf_get_smp_processor_id();
+	s32 start_index = 0;
+	u64 *start = (u64 *)bpf_map_lookup_elem(&alloc_map, &start_index);
+	if (start == NULL || *start == 0) {
+		u64 current_start_addr = start_addr + (partition_size * current_cpu);
+		bpf_map_update_elem(&alloc_map, &start_index, &current_start_addr, BPF_ANY);
+		return current_start_addr;
+	} else {
+		return *start;
+	}
+}
+
+static __always_inline u64 get_area_end_ot(u64 start,u64 start_addr,u64 end_addr,u64 total_cpus) {
+	s64 partition_size = (end_addr - start_addr) / total_cpus;
+	s32 end_index = 1;
+	u64 *end = (u64 *)bpf_map_lookup_elem(&alloc_map, &end_index);
+	if (end == NULL || *end == 0) {
+		u64 current_end_addr = start + partition_size;
+		bpf_map_update_elem(&alloc_map, &end_index, &current_end_addr, BPF_ANY);
+		return current_end_addr;
+	} else {
+		return *end;
+	}
+}
+
 static __always_inline u64 get_area_start(u64 start_addr,u64 end_addr)
 {
 	u32 k0 = 0;
@@ -134,136 +161,67 @@ static __always_inline s32 bound_number(s32 num, s32 min, s32 max)
     return num;
 }
 
-static __always_inline void *write_target_data(void *data, s32 size)
-{
-    if (!data || data == NULL)
-    {
-        return NULL;
-    }
-	u64 start_from_proc;
-	u64 end_from_proc;
+static __always_inline void *write_target_data(void *data, s32 size) {
 	__u32 tgid = (__u32)(bpf_get_current_pid_tgid() >> 32);
 	struct ebpf_proc_info *info = bpf_map_lookup_elem(&proc_info_map, &tgid);
 	if (!info) {
+		return 0;
+	}
+	u64 start_addr = info->start_addr;
+	u64 end_addr = info->end_addr;
+
+	if (!data || data == NULL) {
 		return NULL;
 	}
-	start_from_proc = info->start_addr;
-	end_from_proc = info->end_addr;
 
-	u64 start = get_area_start(start_from_proc, end_from_proc);
-    u64 end = get_area_end(start,start_from_proc,end_from_proc);
-    if (end - start < size)
-    {
-        cw_bpf_debug("reached end of CPU memory block, going to the start again");
-        s32 start_index = 0;
-        bpf_map_delete_elem(&alloc_map, &start_index);
-        start = get_area_start(start_from_proc, end_from_proc);
-    }
+	// Use get_area_start/get_area_end which support multi-process (use proc_alloc_map with process ID in key)
+	u64 start = get_area_start(start_addr, end_addr);
+	u64 end = get_area_end(start, start_addr, end_addr);
+	if (end - start < size) {
+		bpf_printk("reached end of CPU memory block, going to the start again");
+		s32 start_index = 0;
+		u64 alloc_map_index = ((u64)(bpf_get_current_pid_tgid() >> 32) << 32) | start_index;
+		bpf_map_delete_elem(&proc_alloc_map, &alloc_map_index);
+		start = get_area_start(start_addr, end_addr);
+	}
 
-    void *target = (void *)start;
-    size = bound_number(size, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
-    u64 page_offset = (u64)target & 0xFFF;
-    u64 dist_to_next_page = 4096 - page_offset;
-    if (dist_to_next_page < size)
-    {
-        target += dist_to_next_page;
-    }
-    u64 target_u = (u64)target;
-    if (target_u > end_from_proc || target_u < start_from_proc) {
-	    cw_bpf_debug("TARGET ADDRESS IS OUT OF BOUNDS: 0x%llx", target);
-        return NULL;
-    }
+	void *target = (void *)start;
+	size = bound_number(size, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+	u64 page_offset = (u64)target & 0xFFF;
+	u64 dist_to_next_page = 4096 - page_offset;
+	if (dist_to_next_page < size) {
+		target += dist_to_next_page;
+	}
+	u64 target_u = (u64)target;
+	if (target_u > end_addr || target_u < start_addr) {
+		bpf_printk("TARGET ADDRESS IS OUT OF BOUNDS: 0x%llx", target);
+		return NULL;
+	}
 
-    long success = bpf_probe_write_user(target, data, size);
-    if (success == 0)
-    {
-        s32 start_index = 0;
-        // Update the start position of this chunk, taking into account possible adjustments
-        // we made to be page aligned
-        u64 updated_start = target_u + size;
+	long success = bpf_probe_write_user(target, data, size);
+	if (success == 0) {
+		s32 start_index = 0;
+		u64 alloc_map_index = ((u64)(bpf_get_current_pid_tgid() >> 32) << 32) | start_index;
+		// Update the start position of this chunk, taking into account possible adjustments
+		// we made to be page aligned
+		u64 updated_start = target_u + size;
 
-        // align updated_start to 8 bytes
-        if (updated_start % 8 != 0) {
-            updated_start += 8 - (updated_start % 8);
-        }
+		// align updated_start to 8 bytes
+		if (updated_start % 8 != 0) {
+			updated_start += 8 - (updated_start % 8);
+		}
 
-        bpf_map_update_elem(&alloc_map, &start_index, &updated_start, BPF_ANY);
-        return target;
-    }
-    else
-    {
-	    cw_bpf_debug("failed to write to userspace, error code: %d, addr: %lx, size: %d", success, target, size);
-        return NULL;
-    }
+		bpf_map_update_elem(&proc_alloc_map, &alloc_map_index, &updated_start, BPF_ANY);
+		return target;
+	} else {
+		bpf_printk("failed to write to userspace, error code: %d, addr: %lx, size: %d",
+		           success,
+		           target,
+		           size);
+		return NULL;
+	}
 }
 
-//static __always_inline void *cw_write_target_data(void *data, s32 size, struct ebpf_proc_info* info)
-//{
-//    if (!data || data == NULL)
-//    {
-//	    bpf_printk("163 err");
-//        return NULL;
-//    }
-//	u64 start_from_proc;
-//	u64 end_from_proc;
-////	__u32 tgid = (__u32)(bpf_get_current_pid_tgid() >> 32);
-////	struct ebpf_proc_info *info = bpf_map_lookup_elem(&proc_info_map, &tgid);
-////	if (!info) {
-////		return NULL;
-////	}
-//	start_from_proc = info->start_addr;
-//	end_from_proc = info->end_addr;
-//
-//	u64 start = get_area_start(start_from_proc, end_from_proc);
-//    u64 end = get_area_end(start,start_from_proc,end_from_proc);
-//    if (end - start < size)
-//    {
-//        cw_bpf_debug("reached end of CPU memory block, going to the start again");
-//        s32 start_index = 0;
-//        bpf_map_delete_elem(&alloc_map, &start_index);
-//        start = get_area_start(start_from_proc, end_from_proc);
-//    }
-//
-//    void *target = (void *)start;
-//    size = bound_number(size, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
-//    u64 page_offset = (u64)target & 0xFFF;
-//    u64 dist_to_next_page = 4096 - page_offset;
-//    if (dist_to_next_page < size)
-//    {
-//        target += dist_to_next_page;
-//    }
-//    u64 target_u = (u64)target;
-//    if (target_u > end_from_proc || target_u < start_from_proc) {
-//	    cw_bpf_debug("TARGET ADDRESS IS OUT OF BOUNDS: 0x%llx", target);
-////	    bpf_printk("no target_u:%llu start:%llu end:%llu", target_u, start_from_proc, end_from_proc);
-//	    bpf_printk("197 err");
-//        return NULL;
-//    }
-//
-//    long success = bpf_probe_write_user(target, data, size);
-//    if (success == 0)
-//    {
-//        s32 start_index = 0;
-//        // Update the start position of this chunk, taking into account possible adjustments
-//        // we made to be page aligned
-//        u64 updated_start = target_u + size;
-//
-//        // align updated_start to 8 bytes
-//        if (updated_start % 8 != 0) {
-//            updated_start += 8 - (updated_start % 8);
-//        }
-//
-//        bpf_map_update_elem(&alloc_map, &start_index, &updated_start, BPF_ANY);
-//        return target;
-//    }
-//    else
-//    {
-//        bpf_printk("failed to write to userspace, error code: %d, addr: %lx, size: %d", success, target, size);
-//        return NULL;
-//    }
-//}
-
-
 static __always_inline void *cw_write_target_data(void *data, s32 size, struct ebpf_proc_info* info)
 {
 	if (!data || data == NULL)

+ 13 - 2
ebpftracer/ebpf/utrace/go/include/go_types.h

@@ -23,12 +23,23 @@
  Keep a power of 2 to help with masks */
 #define MAX_SLICE_ARRAY_SIZE 1024
 
-typedef struct go_string_ot
-{
+typedef struct go_string {
+	char *str;
+	s64 len;
+} go_string;
+
+typedef struct go_string_ot {
 	char *str;
 	s64 len;
 } go_string_ot;
 
+typedef struct go_slice
+{
+	void *array;
+	s64 len;
+	s64 cap;
+} go_slice;
+
 typedef struct go_slice_ot
 {
 	void *array;

+ 4 - 2
ebpftracer/ebpf/utrace/go/include/span_context.h

@@ -22,8 +22,10 @@
 #define W3C_VAL_LENGTH 55
 
 struct span_context {
-	unsigned char TraceID[TRACE_ID_SIZE];
-	unsigned char SpanID[SPAN_ID_SIZE];
+	u8 TraceID[TRACE_ID_SIZE];
+	u8 SpanID[SPAN_ID_SIZE];
+	u8 TraceFlags;
+	u8 padding[7];
 };
 
 

+ 349 - 58
ebpftracer/ebpf/utrace/go/mq/kafka/producer.probe.bpf.c

@@ -8,7 +8,9 @@
 #include "span_context.h"
 #include "go_context.h"
 #include "go_types.h"
+#include "go_net.h"
 #include "uprobe.h"
+#include "apm_trace.h"
 //#include "span_output.h"
 //#include "trace/start_span.h"
 
@@ -25,9 +27,12 @@
 #define MAX_TOPIC_SIZE 256
 // No constraint on the key size, but we must have a limit for the verifier
 #define MAX_KEY_SIZE 256
+#define MAX_BROKER_ADDR_SIZE 21  // Max broker address length (e.g., "192.168.1.1:9092")
+#define MAX_HOSTNAME_SIZE 256    // Max hostname/broker address size
+#define MAX_PAYLOAD_SIZE 1024    // Max payload size (must match l7.c definition)
 
 struct message_attributes_t {
-	struct span_context sc;
+	struct apm_span_context sc;
 	char topic[MAX_TOPIC_SIZE];
 	char key[MAX_KEY_SIZE];
 };
@@ -36,11 +41,27 @@ struct kafka_request_t {
 	// common attributes to all the produced messages
 	u64 start_time;
 	u64 end_time;
-	struct span_context psc;
+	struct apm_span_context apm_sc;
 	// attributes per message
 	struct message_attributes_t msgs[MAX_BATCH_SIZE];
 	char global_topic[MAX_TOPIC_SIZE];
 	u64 valid_messages;
+	char host[MAX_HOSTNAME_SIZE];// First broker address (e.g., "192.168.1.1:9092")
+}__attribute__((packed));
+
+struct kafka_request_t2 {
+	// common attributes to all the produced messages
+	u64 start_time;
+	u64 end_time;
+	struct apm_span_context apm_sc;
+	char host[MAX_HOSTNAME_SIZE];
+
+	// attributes per message
+//	struct message_attributes_t msgs[MAX_BATCH_SIZE];
+//	char global_topic[MAX_TOPIC_SIZE];
+//	char broker_addr[MAX_BROKER_ADDR_SIZE];  // First broker address (e.g., "192.168.1.1:9092")
+//	u64 valid_messages;
+
 }__attribute__((packed));
 
 struct {
@@ -50,6 +71,11 @@ struct {
 	__uint(max_entries, MAX_CONCURRENT);
 } kafka_events SEC(".maps");
 
+struct kafka_header_t {
+	struct go_string_ot key;
+	struct go_slice_ot value;
+};
+
 struct {
 	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
 	__uint(key_size, sizeof(u32));
@@ -57,30 +83,84 @@ struct {
 	__uint(max_entries, 2);
 } kafka_request_storage_map SEC(".maps");
 
-// https://github.com/segmentio/kafka-go/blob/main/protocol/record.go#L48
-struct kafka_header_t {
-	struct go_string_ot key;
-	struct go_slice_ot value;
+struct {
+	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+	__uint(key_size, sizeof(u32));
+	__uint(value_size, sizeof(struct kafka_request_t2));
+	__uint(max_entries, 2);
+} kafka_request_storage_map2 SEC(".maps");
+
+struct {
+	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+	__uint(key_size, sizeof(u32));
+	__uint(value_size, sizeof(struct kafka_header_t));
+	__uint(max_entries, 1);
+} kafka_header_storage_map SEC(".maps");
+
+// Storage for temporary strings in build_contxet_header to avoid stack overflow
+struct kafka_temp_strings_t {
+	char key[CW_HEADER_KEY_LENGTH];
+	char val[CW_HEADER_VAL_LENGTH];
+};
+
+struct {
+	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+	__uint(key_size, sizeof(u32));
+	__uint(value_size, sizeof(struct kafka_temp_strings_t));
+	__uint(max_entries, 1);
+} kafka_temp_strings_storage_map SEC(".maps");
+
+// Storage for temporary Go types in broker address reading to avoid stack overflow
+struct kafka_broker_temp_t {
+	struct go_slice_ot brokers_slice;
+	struct go_string_ot broker_str;
+	struct go_iface addr_iface;  // For net.Addr interface
+	struct go_slice_ot ip_slice;  // For net.TCPAddr.IP
+	u8 ip_bytes[16];              // For IP bytes
 };
 
-// Injected in init
-volatile const u64 message_key_pos;
-volatile const u64 message_topic_pos;
-volatile const u64 message_headers_pos;
-volatile const u64 message_time_pos;
-volatile const u64 writer_topic_pos;
+struct {
+	__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+	__uint(key_size, sizeof(u32));
+	__uint(value_size, sizeof(struct kafka_broker_temp_t));
+	__uint(max_entries, 1);
+} kafka_broker_temp_storage_map SEC(".maps");
+
+// https://github.com/segmentio/kafka-go/blob/main/protocol/record.go#L48
+
+struct tcp_addr {
+	struct go_slice_ot IP;  // offset 0
+	int Port;            // offset 24
+	// 忽略 Zone
+};
 
 #ifndef NO_HEADER_PROPAGATION
 
-static __always_inline int build_contxet_header(struct kafka_header_t *header, struct span_context *span_ctx) {
+static __always_inline int build_contxet_header(struct kafka_header_t *header, struct apm_span_context *span_ctx) {
 	if (header == NULL || span_ctx == NULL) {
 		bpf_printk("build_contxt_header: Invalid arguments");
 		return -1;
 	}
 
-	// Prepare the key string for the user
-	char key[W3C_KEY_LENGTH] = "traceparent";
-	void *ptr = write_target_data(key, W3C_KEY_LENGTH);
+	// Get temporary string storage from per-cpu map to avoid stack overflow
+	u32 temp_strings_id = 0;
+	struct kafka_temp_strings_t *temp_strings = bpf_map_lookup_elem(&kafka_temp_strings_storage_map, &temp_strings_id);
+	if (temp_strings == NULL) {
+		bpf_printk("build_contxt_header: Failed to get temp strings storage");
+		return -1;
+	}
+
+	__u64 pid_tgid = bpf_get_current_pid_tgid();
+	__u32 tgid = pid_tgid >> 32;
+	struct ebpf_proc_info *proc_info = bpf_map_lookup_elem(&proc_info_map, &tgid);
+	if (proc_info == NULL) {
+		bpf_printk("uprobe/WriteMessages: proc_info is NULL");
+		return 0;
+	}
+
+	// Prepare the key string for the user - use per-cpu storage
+	__builtin_memcpy(temp_strings->key, CW_HEADER_KEY_VAL, CW_HEADER_KEY_LENGTH);
+	void *ptr = cw_write_target_data(temp_strings->key, CW_HEADER_KEY_LENGTH, proc_info);
 	if (ptr == NULL) {
 		bpf_printk("build_contxt_header: Failed to write key to user");
 		return -1;
@@ -88,22 +168,39 @@ static __always_inline int build_contxet_header(struct kafka_header_t *header, s
 
 	// build the go string of the key
 	header->key.str = ptr;
-	header->key.len = W3C_KEY_LENGTH;
+	header->key.len = CW_HEADER_KEY_LENGTH;
 
-	// Prepare the value string for the user
-	char val[W3C_VAL_LENGTH];
-	span_context_to_w3c_string(span_ctx, val);
+	// Convert span_context to apm_span_context for CW format
+//	struct apm_span_context apm_sc = {0};
+	// Copy trace_id and span_id from span_context
+//	copy_byte_arrays(span_ctx->TraceID, apm_sc.trace_id, TRACE_ID_SIZE);
+//	copy_byte_arrays(span_ctx->SpanID, apm_sc.span_id, SPAN_ID_SIZE);
 
-	__u64 pid_tgid = bpf_get_current_pid_tgid();
-	__u32 tgid = pid_tgid >> 32;
-	struct ebpf_proc_info *proc_info = bpf_map_lookup_elem(&proc_info_map, &tgid);
-	if(!proc_info)
-	{
-		cw_bpf_debug("go_update_header: proc_info is NULL");
-		return 0;
-	}
+	// Set type_from and sample
+//	apm_sc.type_from[0] = '0';
+//	apm_sc.sample[0] = '0';
+
+	// Get host_id from trace_conf_map
+//	u32 k0 = 0;
+//	struct trace_conf_t *trace_conf = trace_conf_map__lookup(&k0);
+//	if (trace_conf) {
+//		copy_byte_arrays(trace_conf->host_id, apm_sc.host_id, APM_HOST_ID_SIZE);
+//	}
+
+	// Get app_id and instance_id from proc_info
+//	copy_byte_arrays(proc_info->app_id, apm_sc.app_id, APM_APP_ID_SIZE);
+
+	// Set assumed_app_id (for Kafka, we can use app_id as assumed_app_id)
+//	copy_byte_arrays(proc_info->app_id, apm_sc.assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
+
+	// Prepare the value string for the user - use per-cpu storage
+//	generate_random_bytes(apm_sc.trace_id, APM_TRACE_ID_SIZE);
+//	generate_random_bytes(apm_sc.span_id, APM_SPAN_ID_SIZE);
+
+	span_context_to_cw_string(span_ctx, temp_strings->val);
 
-	ptr = write_target_data(val, sizeof(val));
+	bpf_printk("%s",temp_strings->val);
+	ptr = cw_write_target_data(temp_strings->val, CW_HEADER_VAL_LENGTH, proc_info);
 	if (ptr == NULL) {
 		bpf_printk("build_contxt_header: Failed to write value to user");
 		return -1;
@@ -111,28 +208,28 @@ static __always_inline int build_contxet_header(struct kafka_header_t *header, s
 
 	// build the go slice of the value
 	header->value.array = ptr;
-	header->value.len = W3C_VAL_LENGTH;
-	header->value.cap = W3C_VAL_LENGTH;
+	header->value.len = CW_HEADER_VAL_LENGTH;
+	header->value.cap = CW_HEADER_VAL_LENGTH;
 	bpf_printk("build_contxt_header success");
 	return 0;
 }
 
-static __always_inline int inject_kafka_header(void *message, struct kafka_header_t *header) {
-	append_item_to_slice(header, sizeof(*header), (void *) (message + message_headers_pos));
+static __always_inline int inject_kafka_header(void *message, struct kafka_header_t *header, u64 msg_headers_pos) {
+	append_item_to_slice(header, sizeof(*header), (void *) (message + msg_headers_pos));
 	return 0;
 }
 
 #endif
 
-static __always_inline long collect_kafka_attributes(void *message, struct message_attributes_t *attrs, bool collect_topic) {
+static __always_inline long collect_kafka_attributes(void *message, struct message_attributes_t *attrs, bool collect_topic, u64 msg_key_pos, u64 msg_topic_pos) {
 	if (collect_topic) {
 		// Topic might be globally set for a writer, or per message
-		get_go_string_from_user_ptr((void *) (message + message_topic_pos), attrs->topic, sizeof(attrs->topic));
+		get_go_string_from_user_ptr((void *) (message + msg_topic_pos), attrs->topic, sizeof(attrs->topic));
 	}
 
 	// Key is a byte slice, first read the slice
 	struct go_slice_ot key_slice = {0};
-	bpf_probe_read(&key_slice, sizeof(key_slice), (void *) (message + message_key_pos));
+	bpf_probe_read(&key_slice, sizeof(key_slice), (void *) (message + msg_key_pos));
 	u64 size_to_read = key_slice.len > MAX_KEY_SIZE ? MAX_KEY_SIZE : key_slice.len;
 	size_to_read &= 0xFF;
 	// Then read the actual key
@@ -148,8 +245,6 @@ int uprobe_WriteMessages(struct pt_regs *ctx) {
 	void *msgs_array = get_argument(ctx, 4);
 	u64 msgs_array_len = (u64) get_argument(ctx, 5);
 
-	struct go_iface go_context = {0};
-//	get_Go_context(ctx, 2, 0, true, &go_context);
 	void *key = (void *) GOROUTINE(ctx);
 
 	void *kafka_request_ptr = bpf_map_lookup_elem(&kafka_events, &key);
@@ -158,6 +253,16 @@ int uprobe_WriteMessages(struct pt_regs *ctx) {
 		return 0;
 	}
 
+	// Get proc_info to access Kafka offsets
+	__u64 pid_tgid = bpf_get_current_pid_tgid();
+	__u32 tgid = pid_tgid >> 32;
+	struct ebpf_proc_info *proc_info = bpf_map_lookup_elem(&proc_info_map, &tgid);
+	if (proc_info == NULL) {
+		bpf_printk("uprobe/WriteMessages: proc_info is NULL");
+		return 0;
+	}
+
+
 	u32 zero_id = 0;
 	struct kafka_request_t *zero_kafka_request = bpf_map_lookup_elem(&kafka_request_storage_map, &zero_id);
 	if (zero_kafka_request == NULL) {
@@ -177,22 +282,111 @@ int uprobe_WriteMessages(struct pt_regs *ctx) {
 
 	kafka_request->start_time = bpf_ktime_get_ns();
 
-//	start_span_params_t start_span_params = {
-//			.ctx = ctx,
-//			.go_context = &go_context,
-//			.psc = &kafka_request->psc,
-//			.sc = &kafka_request->msgs[0].sc,
-//			.get_parent_span_context_fn = NULL,
-//			.get_parent_span_context_arg = NULL,
-//	};
-//	start_span(&start_span_params);
+	void *writer_addr_iface_ptr = writer + 0;
+	void *na_ptr = NULL;
+	bpf_probe_read(&na_ptr, sizeof(void *), get_go_interface_instance(writer_addr_iface_ptr));
+	if (!na_ptr)
+		return 0;
+
+	// Zero the host
+	__builtin_memset(kafka_request->host, 0, sizeof(kafka_request->host));
+
+
+//	void * user_str_ptr = na_ptr + 0x10;
+//	if (user_str_ptr == NULL)
+//	{
+//		return false;
+//	}
+//
+//	struct go_string_ot user_str = {0};
+//	long success = 0;
+//	success = bpf_probe_read(&user_str, sizeof(struct go_string_ot), user_str_ptr);
+//	if (success != 0 || user_str.len < 1)
+//	{
+//		return 0;
+//	}
+
+//	u64 size_to_read = user_str.len > 22 ? 22 : user_str.len;
+//	bpf_printk("%s",user_str.str);
+//	return 0;
+//	success = bpf_probe_read(dst, size_to_read, user_str.str);
+
+	// 设置 appid
+	copy_byte_arrays(proc_info->app_id, kafka_request->msgs[0].sc.app_id, APM_APP_ID_SIZE);
+
+	// setting assumed_app_id
+	if (!get_go_string_from_user_ptr((void *) (na_ptr + 0x10), kafka_request->host, sizeof(kafka_request->host))) {
+		cw_bpf_debug("target write failed, aborting ebpf probe");
+		return 0;
+	}
+	set_assumed_app_id_arrays(kafka_request->host, kafka_request->msgs[0].sc.assumed_app_id, APM_ASSUMED_APP_ID_STRING_SIZE);
+
+//	copy_byte_arrays(kafka_request->apm_sc.assumed_app_id, kafka_request->msgs[0].sc.assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
+
+//	if (!kafka_request->broker_addr){
+//		return 0;
+//	}
+//	unsigned char assumed_app_id[APM_TRACE_ID_SIZE];
+//	__builtin_memset(assumed_app_id, 0, sizeof(assumed_app_id));
+//
+//	set_app_id_numeric16(kafka_request->broker_addr, assumed_app_id);
+//	bpf_printk("digit[%d]=%c", 0, assumed_app_id[0]);
+//	return 0;
+
+//	set_assumed_app_id_arrays2(kafka_request->broker_addr, assumed_app_id);
+	/*临时map绕一下*/
+//	struct kafka_request_t2 *kafka_request2 = bpf_map_lookup_elem(&kafka_request_storage_map2, &actual_id);
+//	if (kafka_request2 == NULL) {
+//		bpf_printk("uprobe/WriteMessages: Failed to get kafka_request");
+//		return 0;
+//	}
+//	__builtin_memset(kafka_request2, 0, sizeof(struct kafka_request_t2));
+//
+//
+//	if (!get_go_string_from_user_ptr((void *) (na_ptr + 0x10), kafka_request2->host, sizeof(kafka_request2->host))) {
+//		cw_bpf_debug("target write failed, aborting ebpf probe");
+//		return 0;
+//	}
+//	set_assumed_app_id_arrays(kafka_request2->host, kafka_request2->apm_sc.assumed_app_id, APM_ASSUMED_APP_ID_STRING_SIZE);
+//	copy_byte_arrays(kafka_request2->apm_sc.assumed_app_id, kafka_request->msgs[0].sc.assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
+	/*临时map绕一下*/
+
+
+
+//	__builtin_memcpy(psc.assumed_app_id, psc.assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
+
+
+
+	bpf_printk("Kafka BrokerAddr = %s", kafka_request->host);
+	struct apm_span_context *cw_psc = cw_get_parent_tracking_span();
+
+	if(cw_psc){
+		__builtin_memcpy(kafka_request->msgs[0].sc.trace_id, cw_psc->trace_id, APM_TRACE_ID_SIZE);
+	} else {
+		generate_random_bytes(kafka_request->msgs[0].sc.trace_id, APM_TRACE_ID_SIZE);
+	}
+
+//	generate_random_bytes(kafka_request->msgs[0].sc.span_id, APM_SPAN_ID_SIZE);
+
 
 	// Try to get a global topic from Writer
-	bool global_topic = get_go_string_from_user_ptr((void *) (writer + writer_topic_pos), kafka_request->global_topic,
+	bool global_topic = get_go_string_from_user_ptr((void *) (writer + proc_info->kafka_writer_topic_pos), kafka_request->global_topic,
 	                                                sizeof(kafka_request->global_topic));
 
+	bpf_printk("global_topic %d %s",global_topic,kafka_request->global_topic);
+
+
 	void *msg_ptr = msgs_array;
-	struct kafka_header_t header = {0};
+	// Get header from per-cpu storage instead of stack to avoid stack limit
+	u32 header_storage_id = 0;
+	struct kafka_header_t *header = bpf_map_lookup_elem(&kafka_header_storage_map, &header_storage_id);
+	if (header == NULL) {
+		bpf_printk("uprobe/WriteMessages: Failed to get header storage");
+		return 0;
+	}
+	// Zero the header
+	__builtin_memset(header, 0, sizeof(*header));
+
 	// This is hack to get the message size. This calculation is based on the following assumptions:
 	// 1. "Time" is the last field in the message struct. This looks to be correct for all the versions according to
 	//      https://github.com/segmentio/kafka-go/blob/v0.2.3/message.go#L24C2-L24C6
@@ -200,32 +394,39 @@ int uprobe_WriteMessages(struct pt_regs *ctx) {
 	//      https://github.com/golang/go/blame/master/src/time/time.go#L135
 	// In the future if more libraries will need to get structs sizes we probably want to have similar
 	// mechanism to the one we have for the offsets
-	u16 msg_size = message_time_pos + 8 + 8 + 8;
+	u16 msg_size = proc_info->kafka_message_time_pos + 8 + 8 + 8;
 	kafka_request->valid_messages = 0;
 	// Iterate over the messages
-	for (u64 i = 0; i < MAX_BATCH_SIZE; i++) {
+	for (u8 i = 0; i < MAX_BATCH_SIZE; i++) {
 		if (i >= msgs_array_len) {
 			break;
 		}
 		// Optionally collect the topic, and always collect key
-		collect_kafka_attributes(msg_ptr, &kafka_request->msgs[i], !global_topic);
+		collect_kafka_attributes(msg_ptr, &kafka_request->msgs[i], !global_topic, proc_info->kafka_message_key_pos, proc_info->kafka_message_topic_pos);
+		if (global_topic) {
+			__builtin_memcpy(kafka_request->msgs[i].topic, kafka_request->global_topic, MAX_TOPIC_SIZE);
+		}
 		// Generate span id for each message
+		generate_random_bytes(kafka_request->msgs[i].sc.span_id, APM_SPAN_ID_SIZE);
 		if (i > 0) {
-			generate_random_bytes(kafka_request->msgs[i].sc.SpanID, SPAN_ID_SIZE);
 			// Copy the trace id and trace flags from the first message. This means the sampling decision is done on the first message,
 			// and all the messages in the batch will have the same trace id and trace flags.
 //			kafka_request->msgs[i].sc.TraceFlags = kafka_request->msgs[0].sc.TraceFlags;
-			__builtin_memcpy(kafka_request->msgs[i].sc.TraceID, kafka_request->msgs[0].sc.TraceID, TRACE_ID_SIZE);
+			__builtin_memcpy(kafka_request->msgs[i].sc.trace_id, kafka_request->msgs[0].sc.trace_id, APM_TRACE_ID_SIZE);
+			__builtin_memcpy(kafka_request->msgs[i].sc.app_id, kafka_request->msgs[0].sc.app_id, APM_APP_ID_SIZE);
+			__builtin_memcpy(kafka_request->msgs[i].sc.assumed_app_id, kafka_request->msgs[0].sc.assumed_app_id, APM_APP_ID_SIZE);
 		}
 
 #ifndef NO_HEADER_PROPAGATION
 		// Build the header
-		if (build_contxet_header(&header, &kafka_request->msgs[i].sc) != 0) {
+		if (build_contxet_header(header, &kafka_request->msgs[i].sc) != 0) {
 			bpf_printk("uprobe/WriteMessages: Failed to build header");
 			return 0;
 		}
 		// Inject the header
-		inject_kafka_header(msg_ptr, &header);
+		inject_kafka_header(msg_ptr, header, proc_info->kafka_message_headers_pos);
+		// Zero the header for next iteration to avoid stale data
+		__builtin_memset(header, 0, sizeof(*header));
 #endif
 		kafka_request->valid_messages++;
 		msg_ptr = msg_ptr + msg_size;
@@ -240,7 +441,6 @@ int uprobe_WriteMessages(struct pt_regs *ctx) {
 // func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error
 SEC("uprobe/WriteMessages")
 int uprobe_WriteMessages_Returns(struct pt_regs *ctx) {
-	u64 end_time = bpf_ktime_get_ns();
 	void *key = (void *) GOROUTINE(ctx);
 
 	struct kafka_request_t *kafka_request = bpf_map_lookup_elem(&kafka_events, &key);
@@ -248,7 +448,98 @@ int uprobe_WriteMessages_Returns(struct pt_regs *ctx) {
 		bpf_printk("kafka_request is null\n");
 		return 0;
 	}
-	kafka_request->end_time = end_time;
+	kafka_request->end_time = bpf_ktime_get_ns();
+
+
+	u32 zero = 0;
+	struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+	if (!e) {
+		return 0;
+	}
+	__u64 pid_tgid = bpf_get_current_pid_tgid();
+	__u32 tgid = pid_tgid >> 32;
+
+	e->protocol = PROTOCOL_KAFKA;
+	e->trace_end = 0;
+	e->trace_start = 0;
+	e->start_at = kafka_request->start_time;
+	e->end_at = kafka_request->end_time;
+	e->duration = e->end_at - e->start_at;
+
+
+	bpf_probe_read(&e->target_addr, sizeof(kafka_request->host), kafka_request->host);
+
+	struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
+	struct apm_trace_info_t *trace_info = get_apm_trace_info_by_trace_key(trace_key);
+	if (trace_info == 0) {
+		trace_info = get_apm_trace_info_v3(trace_key, pid_tgid, tgid, pid_tgid);
+	}
+	if (trace_info) {
+		e->trace_id = trace_info->trace_id;
+	}
+
+	for (u8 i = 0; i < MAX_BATCH_SIZE; i++) {
+		if (i >= kafka_request->valid_messages) {
+			break;
+		}
+		// Build payload from host, topic, and key
+//		build_kafka_payload(e, kafka_request, i);
+//		bpf_probe_read(&e->payload, 22, kafka_request->host);
+//		bpf_probe_read(&e->payload , 50, kafka_request->msgs[i].topic);
+
+//		bpf_printk("tttttttttttttt %s", kafka_request->msgs[i].topic);
+//		bpf_printk("kkkkkkkkkk %s", kafka_request->msgs[i].key);
+
+		// 填充 MQ 相关信息
+		bpf_probe_read(&e->mq.topic, sizeof(e->mq.topic), kafka_request->msgs[i].topic);
+		bpf_probe_read(&e->mq.key, sizeof(e->mq.key), kafka_request->msgs[i].key);
+
+//		// 复制 topic(找到实际长度,避免复制过多)
+//		u32 topic_len = 0;
+//		for (u32 j = 0; j < MAX_TOPIC_SIZE && j < sizeof(e->mq_topic) - 1; j++) {
+//			if (kafka_request->msgs[i].topic[j] == '\0') {
+//				topic_len = j;
+//				break;
+//			}
+//		}
+//		if (topic_len == 0) {
+//			topic_len = (MAX_TOPIC_SIZE < sizeof(e->mq_topic) - 1) ? MAX_TOPIC_SIZE : (sizeof(e->mq_topic) - 1);
+//		}
+//		if (topic_len > 0) {
+//			__builtin_memcpy(e->mq_topic, kafka_request->msgs[i].topic, topic_len);
+//			e->mq_topic[topic_len] = '\0';
+//		}
+//
+//		// 复制 key(找到实际长度,避免复制过多)
+//		u32 key_len = 0;
+//		for (u32 j = 0; j < MAX_KEY_SIZE && j < sizeof(e->mq_key) - 1; j++) {
+//			if (kafka_request->msgs[i].key[j] == '\0') {
+//				key_len = j;
+//				break;
+//			}
+//		}
+//		if (key_len == 0) {
+//			key_len = (MAX_KEY_SIZE < sizeof(e->mq_key) - 1) ? MAX_KEY_SIZE : (sizeof(e->mq_key) - 1);
+//		}
+//		__builtin_memcpy(e->mq_key, kafka_request->msgs[i].key, key_len);
+//		e->mq_key[key_len] = '\0';
+
+		cw_copy_byte_arrays(kafka_request->msgs[i].sc.assumed_app_id, e->assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
+		cw_copy_byte_arrays(kafka_request->msgs[i].sc.span_id, e->span_id, APM_SPAN_ID_SIZE);
+		// 发送事件
+		long error = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+		if (error == 0) {
+			cw_add_event_count(e->trace_id);
+			cw_bpf_debug("[kafka] send success trace %llu",e->trace_id);
+		}
+
+//		kafka_request->msgs[i].sc.span_id;
+//		for (int j = 0; j < 1; ++j) {
+//			bpf_printk("valid_messages %02x",kafka_request->msgs[i].sc.span_id[j]);
+//		}
+	}
+
+
 
 //	output_span_event(ctx, kafka_request, sizeof(*kafka_request), &kafka_request->msgs[0].sc);
 	bpf_map_delete_elem(&kafka_events, &key);

+ 1 - 1
ebpftracer/ebpf/utrace/go/net/grpc.client.probe.bpf.c

@@ -279,7 +279,7 @@ done:
         if (grpc_span->target_size < fixed_size) {
             fixed_size = (u32)grpc_span->target_size;
         }
-        bpf_probe_read(&e->rpc_target, fixed_size, grpc_span->target);
+        bpf_probe_read(&e->target_addr, fixed_size, grpc_span->target);
     }
     COPY_PAYLOAD(e->payload, grpc_span->method_size, grpc_span->method);
 

+ 1 - 2
ebpftracer/ebpf/utrace/go/net/grpc.server.probe.bpf.c

@@ -16,7 +16,6 @@
 #define MAX_HEADERS 20
 #define MAX_HEADER_STRING 50
 
-#define PROTOCOL_GRPC 16
 
 struct grpc_request_t {
     BASE_SPAN_PROPERTIES
@@ -645,7 +644,7 @@ int uprobe_http2Server_operateHeader(struct pt_regs *ctx) {
     // 优化循环:减少复杂度,提前退出
     for (s32 i = 0; i < MAX_HEADERS && i < header_fields.len; i++) {
         struct hpack_header_field hf = {};
-        long res = bpf_probe_read(&hf, sizeof(hf), (void *)(header_fields.ptr + (i * sizeof(hf))));
+        long res = bpf_probe_read(&hf, sizeof(hf), (void *)(header_fields.cap + (i * sizeof(hf))));
         if (res != 0) {
             continue; // 读取失败,跳过
         }

+ 6 - 3
ebpftracer/l7/l7.go

@@ -1,9 +1,10 @@
 package l7
 
 import (
-	"inet.af/netaddr"
 	"strconv"
 	"time"
+
+	"inet.af/netaddr"
 )
 
 type Protocol uint8
@@ -90,7 +91,7 @@ func (p Protocol) ServiceNameString() string {
 	case ProtocolMongo:
 		return "Mongo"
 	case ProtocolKafka:
-		return "Kafka"
+		return "KAFKA"
 	case ProtocolCassandra:
 		return "CASSANDRA"
 	case ProtocolRabbitmq:
@@ -211,7 +212,7 @@ type RequestData struct {
 	DAddr             netaddr.IPPort
 	ComponentSAddr    netaddr.IPPort
 	ComponentDAddr    netaddr.IPPort
-	RPCTarget         string
+	DestAddrString    string
 	ParentSpanContext struct {
 		TraceIdFrom    string
 		CalledId       string
@@ -222,4 +223,6 @@ type RequestData struct {
 	}
 	ErrorMsg string
 	IsTls    bool
+	MQTopic  string // MQ topic (e.g., Kafka topic)
+	MQKey    string // MQ key (e.g., Kafka message key)
 }

+ 13 - 1
ebpftracer/tls.go

@@ -432,7 +432,7 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 				klog.Infof("[AttachGoTlsUprobes] STEP 15.3: Basic info initialized, BucketsPtrPos=%d", bucketsOff)
 			}
 
-			klog.Infof("[AttachGoTlsUprobes] STEP 16: Getting offsets for HTTP and gRPC fields")
+			klog.Infof("[AttachGoTlsUprobes] STEP 16: Getting offsets for HTTP, gRPC and Kafka fields")
 			fields := map[*uint64]tracer.ID{
 				&info.MethodPtrPos:        tracer.NewID("std", "net/http", "Request", "Method"),
 				&info.UrlPtrPos:           tracer.NewID("std", "net/http", "Request", "URL"),
@@ -447,6 +447,18 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 				&info.StreamCtxPos:        tracer.NewID("google.golang.org/grpc", "google.golang.org/grpc/internal/transport", "Stream", "ctx"),
 				&info.IoWriterBufPtrPos:   tracer.NewID("std", "bufio", "Writer", "buf"),
 				&info.IoWriterNPos:        tracer.NewID("std", "bufio", "Writer", "n"),
+				// Kafka Message fields
+				&info.KafkaMessageKeyPos:     tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Key"),
+				&info.KafkaMessageTopicPos:   tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Topic"),
+				&info.KafkaMessageHeadersPos: tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Headers"),
+				&info.KafkaMessageTimePos:    tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Message", "Time"),
+				// Kafka Writer fields
+				&info.KafkaWriterTopicPos: tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Writer", "Topic"),
+				// Try both Brokers and Addr fields - Addr is a string, Brokers is []string
+				&info.KafkaWriterAddrPos: tracer.NewID("github.com/segmentio/kafka-go", "github.com/segmentio/kafka-go", "Writer", "Addr"),
+				// net.TCPAddr offsets (standard library)
+				&info.TcpAddrIPOffset:   tracer.NewID("std", "net", "TCPAddr", "IP"),
+				&info.TcpAddrPortOffset: tracer.NewID("std", "net", "TCPAddr", "Port"),
 			}
 
 			successCount := 0

+ 11 - 4
ebpftracer/tracer.go

@@ -7,13 +7,14 @@ import (
 	"encoding/hex"
 	"errors"
 	"fmt"
-	"github.com/coroot/coroot-node-agent/utils"
-	"github.com/coroot/coroot-node-agent/utils/try"
 	"os"
 	"strconv"
 	"strings"
 	"time"
 
+	"github.com/coroot/coroot-node-agent/utils"
+	"github.com/coroot/coroot-node-agent/utils/try"
+
 	"github.com/cilium/ebpf"
 	"github.com/cilium/ebpf/link"
 	"github.com/cilium/ebpf/perf"
@@ -555,6 +556,10 @@ type l7Event struct {
 	TypeFrom            [1]byte
 	RPCTarget           [64]byte
 	ErrorMsg            HashByte128
+	MQ                  struct {
+		Topic [256]byte // MQ topic (e.g., Kafka topic)
+		Key   [256]byte // MQ key (e.g., Kafka message key)
+	}
 }
 
 type SocketDataBufferddd struct {
@@ -784,9 +789,11 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				EndAt:          v.EndtAt,
 				ComponentSAddr: ipPort(v.ComponentSAddr, v.ComponentSport),
 				ComponentDAddr: ipPort(v.ComponentDAddr, v.ComponentDport),
-				RPCTarget:      string(v.RPCTarget[:]),
-				ErrorMsg:       strings.TrimRight(string(v.ErrorMsg[:]), "\x00"),
+				DestAddrString: utils.BytesToString(v.RPCTarget[:]),
+				ErrorMsg:       utils.BytesToString(v.ErrorMsg[:]),
 				IsTls:          v.IsTls > 0,
+				MQTopic:        utils.BytesToString(v.MQ.Topic[:]),
+				MQKey:          utils.BytesToString(v.MQ.Key[:]),
 			}
 			if req.Protocol == l7.ProtocolHTTP {
 				klog.Debugf("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())

+ 11 - 2
flags/flags.go

@@ -13,8 +13,8 @@ import (
 
 var (
 	// apm
-	ConfigServer        = kingpin.Flag("config-server", "The URL of the endpoint to send traces to").Envar("CONFIG_SERVER").Default("http://10.0.16.250:18080").String()
-	DataServer          = kingpin.Flag("data-server", "The URL of the endpoint to send traces to").Envar("DATA_SERVER").Default("http://10.0.16.250:18080").String()
+	ConfigServer        = kingpin.Flag("config-server", "The URL of the endpoint to send traces to").Envar("CONFIG_SERVER").Default("http://10.0.12.192:18080").String()
+	DataServer          = kingpin.Flag("data-server", "The URL of the endpoint to send traces to").Envar("DATA_SERVER").Default("http://10.0.12.192:18080").String()
 	DumpApps            = kingpin.Flag("dump", "Dump app snap").Short('d').Default("false").Bool()
 	DumpRules           = kingpin.Flag("dr", "Dump rule snap").Default("false").Bool()
 	PrintFormat         = kingpin.Flag("output", "Output format (table|json)").Short('o').Default("table").String()
@@ -104,6 +104,15 @@ func init() {
 	kingpin.Parse()
 	if *Test {
 		euspace := figure.NewFigure("TEST", "", true)
+		// 只在用户未显式设置时才应用 test 模式的默认值
+		if os.Getenv("DISABLE_REG_HOST") == "" {
+			*DisableRegisterHost = true
+		}
+		if os.Getenv("DISABLE_E2E_TRACING") == "" {
+			*DisableE2ETracing = false
+		}
+		*ConsoleLog = true
+		*LogLevel = "debug"
 		euspace.Print()
 	} else {
 		euspace := figure.NewColorFigure("Euspace", "slant", "blue", true)

+ 35 - 0
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -27,6 +27,7 @@ const (
 	HTTP_SERVICE_TYPE  = "HTTP"
 	NET_SERVICE_TYPE   = "L7_NET"
 	RPC_SERVICE_TYPE   = "RPC"
+	MQ_SERVICE_TYPE    = "MQ"
 )
 
 const (
@@ -198,6 +199,8 @@ func tracetransformData(sdl []tracesdk.ReadOnlySpan) map[int][]RootDataT {
 					buildMongoMapEvent(&mNode, event)
 				case l7.ProtocolGrpc:
 					buildGrpcMapEvent(&mNode, event)
+				case l7.ProtocolKafka:
+					buildMQMapEvent(&mNode, event)
 				}
 			}
 
@@ -1127,6 +1130,38 @@ func buildNoSqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
 	}
 }
 
+func buildMQMapEvent(mNode *MapInfoT, event tracesdk.Event) {
+	mNode.ServiceName = l7.Protocol(event.ProtocolType).ServiceNameString()
+	mNode.ServiceType = MQ_SERVICE_TYPE
+	for _, attr := range event.Attributes {
+		switch attr.Key {
+		case "net.peer.name":
+			mNode.Ip = attr.Value.AsString()
+		case "net.peer.port":
+			mNode.Port = attr.Value.AsInt64()
+		case "mq.info":
+			query := attr.Value.AsString()
+			mNode.MethodName = query
+		case "mq.topic":
+			mNode.Uri = "/" + attr.Value.AsString()
+		case "mq.destination_addr":
+			mNode.DestinationAddr = attr.Value.AsString()
+		case "mq.exception":
+			if attr.Value.AsBool() {
+				mNode.Exception = 1
+			}
+		case "mq.exception_msg":
+			mNode.ExceptionMsg = attr.Value.AsString()
+		case "mq.oper_type":
+			mNode.OperType = attr.Value.AsString()
+		case "mq.assumed_app_id":
+			mNode.AssumedAppId = attr.Value.AsInt64()
+		case "mq.span_id":
+			mNode.SpanId = attr.Value.AsString()
+		}
+	}
+}
+
 func buildMongoMapEvent(mNode *MapInfoT, event tracesdk.Event) {
 	mNode.ServiceName = MONGO_SERVICE_NAME
 	mNode.ServiceType = NOSQL_SERVICE_TYPE

+ 72 - 3
tracing/apm_tracing.go

@@ -3,6 +3,7 @@ package tracing
 import (
 	"context"
 	"fmt"
+	"net"
 	"os"
 	"strconv"
 	"strings"
@@ -617,6 +618,74 @@ func (t *Trace) NoSQLTraceQueryEvent(l7Type l7.Protocol, semconvVal attribute.Ke
 	t.createTraceEvent(l7Type, ebpftracer.EventTypeL7Request.Int(), attr...)
 }
 
+func (t *Trace) MQTraceQueryEvent(l7Type l7.Protocol, semconvVal attribute.KeyValue, operation, statement string, r *l7.RequestData, connSrc, connDest netaddr.IPPort) {
+	if t == nil {
+		return
+	}
+	t.addEvent()
+
+	if r.MQTopic != "" || r.MQKey != "" {
+		statement = fmt.Sprintf("PRODUCE topic:%s key:%s", r.MQTopic, r.MQKey)
+	}
+
+	if statement == "" {
+		return
+	}
+
+	srcAddr := r.ComponentSAddr.String()
+	if r.ComponentSAddr.Port() == 0 {
+		srcAddr = connSrc.String()
+	}
+
+	// 尝试解析目标地址,如果失败则使用 connDest 作为备用
+	dAddr, err := net.ResolveTCPAddr("tcp", r.DestAddrString)
+	var ip string
+	port := 0
+	if err != nil {
+		klog.Errorln(err)
+	} else {
+		ip = dAddr.IP.String()
+		port = dAddr.Port
+	}
+
+	assumedAppID, err := strconv.ParseInt(r.AssumedAppId, 10, 64)
+	if err != nil {
+		assumedAppID = 0
+	}
+
+	var attr []attribute.KeyValue
+	attr = append(attr,
+		semconvVal,
+		semconv.NetPeerName(ip),
+		semconv.NetPeerPort(port),
+		attribute.String("mq.info", statement),
+		attribute.String("mq.src_addr", srcAddr),
+		attribute.String("mq.destination_addr", r.DestAddrString),
+		attribute.String("mq.topic", r.MQTopic),
+		attribute.String("mq.key", r.MQKey),
+		attribute.Int64("mq.assumed_app_id", assumedAppID),
+		attribute.String("mq.span_id", r.SpanId),
+		attribute.String("mq.oper_type", "PRODUCER"),
+	)
+
+	// 如果 operation 不为空,添加 DBOperation 属性
+	if operation != "" {
+		attr = append(attr, semconv.DBOperation(operation))
+	}
+
+	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
+
+	attr = append(attr,
+		attribute.Bool("mq.exception", r.Status.Error()),
+	)
+	if r.Status.Error() {
+		attr = append(attr,
+			attribute.String("mq.exception_msg", r.ErrorMsg),
+		)
+	}
+	t.createTraceEvent(l7Type, ebpftracer.EventTypeL7Request.Int(), attr...)
+}
+
 func (t *Trace) GrpcServerTraceQueryEvent(r *l7.RequestData, appInfo AppInfo) {
 	if t == nil {
 		return
@@ -663,11 +732,11 @@ func (t *Trace) GrpcClientTraceQueryEvent(r *l7.RequestData) {
 	}
 
 	// 解析 RPCTarget 字符串,格式为 "ip:port"
-	klog.Infof("r.RPCTarget is %s", r.RPCTarget)
+	klog.Infof("r.RPCTarget is %s", r.DestAddrString)
 	var rpcIP, rpcPort string
-	if r.RPCTarget != "" {
+	if r.DestAddrString != "" {
 		// 清理空字节,只保留有效字符
-		cleanTarget := strings.TrimRight(r.RPCTarget, "\x00")
+		cleanTarget := strings.TrimRight(r.DestAddrString, "\x00")
 		cleanTarget = strings.TrimSpace(cleanTarget)
 		klog.Infof("cleanTarget is %s", cleanTarget)
 

+ 10 - 0
utils/modelse/bpf_struct.go

@@ -217,6 +217,16 @@ type EbpfProcInfo struct {
 	// io writer
 	IoWriterBufPtrPos uint64
 	IoWriterNPos      uint64
+	// Kafka
+	KafkaMessageKeyPos     uint64
+	KafkaMessageTopicPos   uint64
+	KafkaMessageHeadersPos uint64
+	KafkaMessageTimePos    uint64
+	KafkaWriterTopicPos    uint64
+	KafkaWriterAddrPos     uint64 // Writer.Brokers field offset
+	// net.TCPAddr offsets
+	TcpAddrIPOffset   uint64 // net.TCPAddr.IP field offset
+	TcpAddrPortOffset uint64 // net.TCPAddr.Port field offset
 }
 
 type allowPortBitmap struct {

+ 4 - 0
utils/util.go

@@ -964,3 +964,7 @@ func ResetTimerWithDrainChan(timer *time.Timer, d time.Duration) {
 	}
 	timer.Reset(d)
 }
+
+func BytesToString(b []byte) string {
+	return strings.TrimRight(string(b), "\x00")
+}