Browse Source

Fixed #TASK_GK-2944 横向串联

Carl 2 years ago
parent
commit
16f836ee2f

+ 5 - 2
containers/container.go

@@ -1,6 +1,7 @@
 package containers
 
 import (
+	"github.com/coroot/coroot-node-agent/utils"
 	"os"
 	"strings"
 	"sync"
@@ -143,7 +144,8 @@ type Container struct {
 
 	done chan struct{}
 
-	traceMap map[uint64]*tracing.Trace
+	traceMap   map[uint64]*tracing.Trace
+	instanceID utils.ID
 }
 
 func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32) (*Container, error) {
@@ -447,6 +449,7 @@ func (c *Container) onFileOpen(pid uint32, fd uint64) {
 	}
 }
 
+// set
 func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
 	if common.PortFilter.ShouldBeSkipped(addr.Port()) {
 		return
@@ -1090,7 +1093,7 @@ func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) {
 		p.openSslUprobesChecked = true
 	}
 	if !p.goTlsUprobesChecked {
-		p.uprobes = append(p.uprobes, tracer.AttachGoTlsUprobes(pid)...)
+		p.uprobes = append(p.uprobes, tracer.AttachGoTlsUprobes(pid, c.instanceID)...)
 		p.goTlsUprobesChecked = true
 	}
 }

+ 18 - 1
containers/container_apm.go

@@ -4,6 +4,7 @@ import (
 	"fmt"
 	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
 	"github.com/coroot/coroot-node-agent/tracing"
+	"github.com/coroot/coroot-node-agent/utils"
 	"inet.af/netaddr"
 )
 
@@ -88,7 +89,7 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 
 		apmTrace, ok := c.getTrace(r.TraceId)
 		if ok {
-			apmTrace.HttpTraceRequest(method, path, hostIp, port, r.Status, r.Duration)
+			apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
 		}
 
 	case l7.ProtocolHTTP2:
@@ -157,3 +158,19 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 	}
 	return nil
 }
+
+func (c *Container) buildInstanceID() {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+	for address, val := range c.getListens() {
+		if val == 1 {
+			ip := address.IP()
+			if ip.Is4() && !ip.IsLoopback() {
+				// 获取端口号
+				port := address.Port()
+				c.instanceID.IntVal, c.instanceID.HashtVal = utils.SetInsID(fmt.Sprintf("%s:%d", ip, port))
+				break
+			}
+		}
+	}
+}

+ 1 - 1
containers/registry.go

@@ -231,9 +231,9 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 
 			case ebpftracer.EventTypeListenOpen:
 				//fmt.Println("ebpftracer.EventTypeConnectionOpen==================", e.Pid)
-
 				if c := r.getOrCreateContainer(e.Pid); c != nil {
 					c.onListenOpen(e.Pid, e.SrcAddr, false)
+					c.buildInstanceID()
 					c.attachTlsUprobes(r.tracer, e.Pid)
 				} else {
 					klog.Infoln("TCP listen open from unknown container", e)

+ 1 - 0
ebpftracer/ebpf/ebpf.c

@@ -5,6 +5,7 @@
 //#include <net/sock.h>
 //#include <net/net_namespace.h>
 //#include <uapi/linux/bpf.h>
+#include "include/apm_trace.h"
 #include "include/socket_trace.h"
 //#include "vmlinux.h"// #include <asm/ptrace.h>
 //#include <bpf/bpf_helpers.h>// bpf_base.h deepclash

+ 85 - 0
ebpftracer/ebpf/include/apm_trace.h

@@ -0,0 +1,85 @@
+//
+// Created by Carl.Guo on 2024/5/14.
+//
+#include "bpf_base.h"
+
+#ifndef EUSPACES_APM_TRACE_H
+#define EUSPACES_APM_TRACE_H
+
+#define APM_REMOTE_ADDR_MAX_LEN 256
+
+#define APM_TYPE_FROM_STRING_SIZE 2
+#define APM_SAMPLE_STRING_SIZE 2
+#define APM_HOST_ID_STRING_SIZE 16
+#define APM_APP_ID_STRING_SIZE 16
+#define APM_INSTANCE_ID_STRING_SIZE 16
+#define APM_TRACE_ID_STRING_SIZE 32
+#define APM_ASSUMED_APP_ID_STRING_SIZE 16
+#define APM_SPAN_ID_STRING_SIZE 16
+
+#define APM_TYPE_FROM_SIZE 1
+#define APM_SAMPLE_SIZE 1
+#define APM_HOST_ID_SIZE 8
+#define APM_APP_ID_SIZE 8
+#define APM_INSTANCE_ID_SIZE 8
+#define APM_TRACE_ID_SIZE 16
+#define APM_ASSUMED_APP_ID_SIZE 8
+#define APM_SPAN_ID_SIZE 8
+
+#define CW_HEADER_LENGTH 123
+
+/***********************************************************
+ * Trace struct
+ ***********************************************************/
+struct apm_trace_key_t {
+	__u32 tgid;
+	__u32 pid;
+	__u64 goid;
+};
+
+struct fd_trace_key_t {
+	__u32 tgid;
+	__u32 fd;
+};
+
+
+//struct apm_span_context {
+//	unsigned char TraceID[APM_TRACE_ID_SIZE];
+//	unsigned char SpanID[APM_SPAN_ID_SIZE];
+//};
+
+struct apm_span_context {
+	unsigned char type_from[APM_TYPE_FROM_SIZE];
+	unsigned char sample[APM_SAMPLE_SIZE];
+	unsigned char host_id[APM_HOST_ID_SIZE];
+	unsigned char app_id[APM_APP_ID_SIZE];
+	unsigned char instance_id[APM_INSTANCE_ID_SIZE];
+	unsigned char trace_id[APM_TRACE_ID_SIZE];
+	unsigned char assumed_app_id[APM_ASSUMED_APP_ID_SIZE];
+	unsigned char span_id[APM_SPAN_ID_SIZE];
+};
+
+struct apm_trace_info_t {
+	/*
+	 * Whether traceID is zero ?
+	 * For the client to actively send request, set traceID to zero.
+	 */
+	//	bool is_trace_id_zero;
+	//	__u32 update_time; // 从系统开机开始到创建/更新时的间隔时间单位是秒
+	//	__u32 peer_fd;	   // 用于socket之间的关联
+	//	__u64 thread_trace_id; // 线程追踪ID
+	//	__u64 socket_id; // Records the socket associated when tracing was created (记录创建追踪时关联的socket)
+	__u64 trace_id;
+	struct apm_span_context psc;
+
+	struct apm_span_context sc;
+};
+
+static __always_inline void cw_copy_byte_arrays(unsigned char *src, unsigned char *dst, __u32 size)
+{
+	for (int i = 0; i < size; i++)
+	{
+		dst[i] = src[i];
+	}
+}
+#endif //EUSPACES_APM_TRACE_H

+ 14 - 9
ebpftracer/ebpf/include/socket_trace_common.h

@@ -1,7 +1,7 @@
 #ifndef DF_BPF_SOCKET_TRACE_COMMON_H
 #define DF_BPF_SOCKET_TRACE_COMMON_H
-#define CAP_DATA_SIZE 1024		// For no-brust send buffer
-#define BURST_DATA_BUF_SIZE 8192	// For brust send buffer
+#define CAP_DATA_SIZE 1024        // For no-brust send buffer
+#define BURST_DATA_BUF_SIZE 8192    // For brust send buffer
 
 enum endpoint_role {
 	ROLE_UNKNOWN,
@@ -43,8 +43,8 @@ struct __socket_data {
 
 	/* 追踪数据信息 */
 	__u64 timestamp;     // 数据捕获时间戳
-	__u8  direction: 1;  // bits[0]: 方向,值为T_EGRESS(0), T_INGRESS(1)
-	__u8  msg_type:  7;  // bits[1-7]: 信息类型,值为MSG_UNKNOWN(0), MSG_REQUEST(1), MSG_RESPONSE(2)
+	__u8 direction: 1;  // bits[0]: 方向,值为T_EGRESS(0), T_INGRESS(1)
+	__u8 msg_type: 7;  // bits[1-7]: 信息类型,值为MSG_UNKNOWN(0), MSG_REQUEST(1), MSG_RESPONSE(2)
 
 	__u64 syscall_len;   // 本次系统调用读、写数据的总长度
 	__u64 data_seq;      // cap_data在Socket中的相对顺序号
@@ -71,6 +71,9 @@ struct trace_conf_t {
 	__u64 go_tracing_timeout;
 	__u64 io_event_collect_mode;
 	__u64 io_event_minimal_duration;
+
+	unsigned char host_id[APM_HOST_ID_SIZE];
+	unsigned char app_id[APM_APP_ID_SIZE];
 };
 
 struct trace_stats {
@@ -95,12 +98,12 @@ struct socket_info_t {
 	 */
 	__u8 prev_data[4];
 	__u8 direction: 1;
-	__u8 msg_type: 2;	// 保存数据类型,值为MSG_UNKNOWN(0), MSG_REQUEST(1), MSG_RESPONSE(2)
+	__u8 msg_type: 2;    // 保存数据类型,值为MSG_UNKNOWN(0), MSG_REQUEST(1), MSG_RESPONSE(2)
 	__u8 role: 5;           // 标识socket角色:ROLE_CLIENT, ROLE_SERVER, ROLE_UNKNOWN
 	bool need_reconfirm;    // l7协议推断是否需要再次确认。
 	__s32 correlation_id;   // 目前用于kafka协议推断。
 
-	__u32 peer_fd;		// 用于记录socket间数据转移的对端fd。
+	__u32 peer_fd;        // 用于记录socket间数据转移的对端fd。
 
 	/*
 	 * 一旦有数据读/写就会更新这个时间,这个时间是从系统开机开始
@@ -125,7 +128,7 @@ struct trace_info_t {
 	 */
 	bool is_trace_id_zero;
 	__u32 update_time; // 从系统开机开始到创建/更新时的间隔时间单位是秒
-	__u32 peer_fd;	   // 用于socket之间的关联
+	__u32 peer_fd;       // 用于socket之间的关联
 	__u64 thread_trace_id; // 线程追踪ID
 	__u64 socket_id; // Records the socket associated when tracing was created (记录创建追踪时关联的socket)
 } __attribute__((packed));
@@ -174,7 +177,7 @@ enum offsets_index {
 struct ebpf_proc_info {
 	__u32 version;
 	__u16 offsets[OFFSET_IDX_MAX];
-	
+
 	// In golang, itab represents type, and in interface, struct is represented
 	// by the address of itab. We use itab to judge the structure type, and 
 	// find the fd representing the connection after multiple jumps. These
@@ -187,7 +190,9 @@ struct ebpf_proc_info {
 
 //	__u64 start_addr;
 //	__u64 end_addr;
-} __attribute__((packed)) ;
+	unsigned char instance_id[APM_INSTANCE_ID_SIZE];
+
+} __attribute__((packed));
 
 enum {
 	/*

+ 73 - 75
ebpftracer/ebpf/l7/apm_trace.c

@@ -1,81 +1,20 @@
 //
 // Created by Carl.Guo on 2024/4/1.
 //
-/***********************************************************
- * Trace struct
- ***********************************************************/
-struct apm_trace_key_t {
-	__u32 tgid;
-	__u32 pid;
-	__u64 goid;
-};
-
-struct fd_trace_key_t {
-	__u32 tgid;
-	__u32 fd;
-};
-
-
-//#define APM_TRACE_ID_SIZE 16
-//#define APM_SPAN_ID_SIZE 8
-#define APM_REMOTE_ADDR_MAX_LEN 256
-
-#define APM_TYPE_FROM_STRING_SIZE 2
-#define APM_SAMPLE_STRING_SIZE 2
-#define APM_HOST_ID_STRING_SIZE 16
-#define APM_APP_ID_STRING_SIZE 16
-#define APM_INSTANCE_ID_STRING_SIZE 16
-#define APM_TRACE_ID_STRING_SIZE 32
-#define APM_ASSUMED_APP_ID_STRING_SIZE 16
-#define APM_SPAN_ID_STRING_SIZE 16
-
-#define APM_TYPE_FROM_SIZE 1
-#define APM_SAMPLE_SIZE 1
-#define APM_HOST_ID_SIZE 8
-#define APM_APP_ID_SIZE 8
-#define APM_INSTANCE_ID_SIZE 8
-#define APM_TRACE_ID_SIZE 16
-#define APM_ASSUMED_APP_ID_SIZE 8
-#define APM_SPAN_ID_SIZE 8
-
-#define CW_HEADER_LENGTH 123
-
-//struct apm_span_context {
-//	unsigned char TraceID[APM_TRACE_ID_SIZE];
-//	unsigned char SpanID[APM_SPAN_ID_SIZE];
-//};
-
-struct apm_span_context {
-	unsigned char type_from[APM_TYPE_FROM_SIZE];
-	unsigned char sample[APM_SAMPLE_SIZE];
-	unsigned char host_id[APM_HOST_ID_SIZE];
-	unsigned char app_id[APM_APP_ID_SIZE];
-	unsigned char instance_id[APM_INSTANCE_ID_SIZE];
-	unsigned char trace_id[APM_TRACE_ID_SIZE];
-	unsigned char assumed_app_id[APM_ASSUMED_APP_ID_SIZE];
-	unsigned char span_id[APM_SPAN_ID_SIZE];
-};
-
-struct apm_trace_info_t {
-	/*
-	 * Whether traceID is zero ?
-	 * For the client to actively send request, set traceID to zero.
-	 */
-	//	bool is_trace_id_zero;
-	//	__u32 update_time; // 从系统开机开始到创建/更新时的间隔时间单位是秒
-	//	__u32 peer_fd;	   // 用于socket之间的关联
-	//	__u64 thread_trace_id; // 线程追踪ID
-	//	__u64 socket_id; // Records the socket associated when tracing was created (记录创建追踪时关联的socket)
-	__u64 trace_id;
-	struct apm_span_context psc;
-};
 
 struct {
 	__uint(type, BPF_MAP_TYPE_LRU_HASH);
 	__uint(key_size, sizeof(struct apm_trace_key_t));
 	__uint(value_size, sizeof(struct apm_span_context));
 	__uint(max_entries, 1);
-} apm_span_context_map SEC(".maps");
+} apm_parent_span_context_map SEC(".maps");
+
+struct {
+	__uint(type, BPF_MAP_TYPE_LRU_HASH);
+	__uint(key_size, sizeof(struct apm_trace_key_t));
+	__uint(value_size, sizeof(struct apm_span_context));
+	__uint(max_entries, 1);
+} apm_current_span_context_map SEC(".maps");
 
 struct {
 	__uint(type, BPF_MAP_TYPE_LRU_HASH);
@@ -153,7 +92,7 @@ static __inline __attribute__((__always_inline__))
 void cw_save_parent_tracking_span(struct apm_span_context *sc) {
 	struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
 	long err = 0;
-	err = bpf_map_update_elem(&apm_span_context_map, &trace_key, sc, BPF_ANY);
+	err = bpf_map_update_elem(&apm_parent_span_context_map, &trace_key, sc, BPF_ANY);
 	if (err != 0) {
 		bpf_printk("Failed to update tracked_spans map: %ld", err);
 		return;
@@ -164,11 +103,11 @@ static __inline __attribute__((__always_inline__))
 struct apm_span_context *cw_get_parent_tracking_span() {
 	struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
 	struct apm_span_context *apm_sc = {0};
-	struct apm_span_context *span_contexts = bpf_map_lookup_elem(&apm_span_context_map, &trace_key);
+	struct apm_span_context *span_contexts = bpf_map_lookup_elem(&apm_parent_span_context_map, &trace_key);
 	bpf_printk("-------");
 
 	if (span_contexts) {
-		for (int i = 0; i < APM_TYPE_FROM_SIZE; i++) {
+		/*for (int i = 0; i < APM_TYPE_FROM_SIZE; i++) {
 			bpf_printk("type_from[%d] = %02x", i, span_contexts->type_from[i]);
 		}
 		for (int i = 0; i < APM_SAMPLE_SIZE; i++) {
@@ -191,7 +130,7 @@ struct apm_span_context *cw_get_parent_tracking_span() {
 		}
 		for (int i = 0; i < APM_SPAN_ID_SIZE; i++) {
 			bpf_printk("span_id[%d] = %02x", i, span_contexts->span_id[i]);
-		}
+		}*/
 		apm_sc = span_contexts;
 	}
 	bpf_printk("-------end");
@@ -200,9 +139,16 @@ struct apm_span_context *cw_get_parent_tracking_span() {
 }
 
 static __inline __attribute__((__always_inline__))
-__u64 clear_span_context() {
+__u64 clear_parent_span_context() {
+	struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
+	bpf_map_delete_elem(&apm_parent_span_context_map, &trace_key);
+	return 0;
+}
+
+static __inline __attribute__((__always_inline__))
+__u64 clear_current_span_context() {
 	struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
-	bpf_map_delete_elem(&apm_span_context_map, &trace_key);
+	bpf_map_delete_elem(&apm_current_span_context_map, &trace_key);
 	return 0;
 }
 
@@ -214,4 +160,56 @@ __u64 clear_trace(__u32 pid, __u32 tid, __u32 fd) {
 	bpf_map_delete_elem(&trace_info_heap, &trace_key);
 	bpf_map_delete_elem(&fd_trace_info_heap, &fd_trace_key);
 	return 0;
+}
+
+// Client当前sc
+static __inline __attribute__((__always_inline__))
+void cw_save_current_tracking_span(struct apm_span_context *sc) {
+	struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
+	long err = 0;
+	err = bpf_map_update_elem(&apm_current_span_context_map, &trace_key, sc, BPF_ANY);
+	if (err != 0) {
+		bpf_printk("Failed to update apm_current_span_context_map map: %ld", err);
+		return;
+	}
+}
+
+
+static __inline __attribute__((__always_inline__))
+struct apm_span_context *cw_get_current_tracking_span() {
+	struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
+	struct apm_span_context *apm_sc = {0};
+	struct apm_span_context *span_contexts = bpf_map_lookup_elem(&apm_current_span_context_map, &trace_key);
+	bpf_printk("-------");
+
+	if (span_contexts) {
+		/*for (int i = 0; i < APM_TYPE_FROM_SIZE; i++) {
+			bpf_printk("cw_get_current_tracking_span-type_from[%d] = %02x", i, span_contexts->type_from[i]);
+		}
+		for (int i = 0; i < APM_SAMPLE_SIZE; i++) {
+			bpf_printk("cw_get_current_tracking_span-sample[%d] = %02x", i, span_contexts->sample[i]);
+		}
+		for (int i = 0; i < APM_HOST_ID_SIZE; i++) {
+			bpf_printk("cw_get_current_tracking_span-host_id[%d] = %02x", i, span_contexts->host_id[i]);
+		}
+		for (int i = 0; i < APM_APP_ID_SIZE; i++) {
+			bpf_printk("cw_get_current_tracking_span-app_id[%d] = %02x", i, span_contexts->app_id[i]);
+		}
+		for (int i = 0; i < APM_INSTANCE_ID_SIZE; i++) {
+			bpf_printk("cw_get_current_tracking_span-instance_id[%d] = %02x", i, span_contexts->instance_id[i]);
+		}
+		for (int i = 0; i < APM_TRACE_ID_SIZE; i++) {
+			bpf_printk("cw_get_current_tracking_span-trace_id[%d] = %02x", i, span_contexts->trace_id[i]);
+		}
+		for (int i = 0; i < APM_ASSUMED_APP_ID_SIZE; i++) {
+			bpf_printk("cw_get_current_tracking_span-assumed_app_id[%d] = %02x", i, span_contexts->assumed_app_id[i]);
+		}
+		for (int i = 0; i < APM_SPAN_ID_SIZE; i++) {
+			bpf_printk("cw_get_current_tracking_span-span_id[%d] = %02x", i, span_contexts->span_id[i]);
+		}*/
+		apm_sc = span_contexts;
+	}
+	bpf_printk("-------end");
+
+	return apm_sc;
 }

+ 31 - 4
ebpftracer/ebpf/l7/l7.c

@@ -59,7 +59,7 @@
 #include "dns.c"
 #include "apm_trace.c"
 
-
+// go type l7Event struct && type RequestData struct
 struct l7_event {
     __u64 fd;
     __u64 connection_timestamp;
@@ -74,9 +74,17 @@ struct l7_event {
     __u64 trace_id;
     __u32 trace_start;
     __u32 trace_end;
+	unsigned char assumed_app_id[APM_ASSUMED_APP_ID_SIZE];
+	unsigned char span_id[APM_SPAN_ID_SIZE];
+	unsigned char trace_id_from[APM_TRACE_ID_SIZE];
+	unsigned char called_id[APM_ASSUMED_APP_ID_SIZE];
+	unsigned char instance_id_from[APM_INSTANCE_ID_SIZE];
+	unsigned char app_id_from[APM_APP_ID_SIZE];
+	unsigned char span_id_from[APM_SPAN_ID_SIZE];
+
 //    __u32 test_id;
     char payload[MAX_PAYLOAD_SIZE];
-};
+} ;
 
 struct test_t {
     __u32 test_id;
@@ -307,6 +315,18 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         e->trace_id = trace_id;
         e->payload_size = size;
         COPY_PAYLOAD(e->payload, size, payload);
+		// psc
+	    struct apm_span_context *cw_psc = cw_get_parent_tracking_span();
+	    if(cw_psc){
+		    cw_copy_byte_arrays(cw_psc->trace_id, e->trace_id_from, APM_TRACE_ID_SIZE);
+		    cw_copy_byte_arrays(cw_psc->assumed_app_id, e->called_id, APM_ASSUMED_APP_ID_SIZE);
+		    cw_copy_byte_arrays(cw_psc->instance_id, e->instance_id_from, APM_INSTANCE_ID_SIZE);
+		    cw_copy_byte_arrays(cw_psc->app_id, e->app_id_from, APM_APP_ID_SIZE);
+		    cw_copy_byte_arrays(cw_psc->span_id, e->span_id_from, APM_SPAN_ID_SIZE);
+		    for (int i = 0; i < APM_TRACE_ID_SIZE; i++) {
+			    bpf_printk("trace_enter_write - trace_id = %02x", e->trace_id_from[i]);
+		    }
+	    }
         bpf_map_delete_elem(&active_l7_requests, &k);
         bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
         // 发送事件到用户空间 end
@@ -620,8 +640,15 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
         e->trace_id = trace_id;
         cw_bpf_debug("[Kernel Response][HTTP] [Trace ID] trace_id:%llu", e->trace_id);
         cw_bpf_debug("[Kernel Response][HTTP]:TGID:%d|type:%s|FD:%d\n",k.pid,"",k.fd);
-
-//        cw_bpf_debug("[Response][HTTP222]:thread_id:%d|type:%s|FD:%d\n",k.pid,"",k.fd);
+	    struct  apm_span_context * sc = cw_get_current_tracking_span();
+	    if (sc) {
+		    cw_copy_byte_arrays(sc->assumed_app_id, e->assumed_app_id, APM_ASSUMED_APP_ID_SIZE);
+		    cw_copy_byte_arrays(sc->span_id, e->span_id, APM_SPAN_ID_SIZE);
+		    for (int i = 0; i < APM_ASSUMED_APP_ID_SIZE; i++) {
+			    bpf_printk("assumed_app_id-assumed_app_id[%d] = %02x", i, sc->assumed_app_id[i]);
+		    }
+	    }
+		//        cw_bpf_debug("[Response][HTTP222]:thread_id:%d|type:%s|FD:%d\n",k.pid,"",k.fd);
 //        cw_bpf_debug("[Response][HTTP222] trace_id:%llu", trace_id);
 //        // 请求报文
 //        cw_bpf_debug("[Response][HTTP222] req-payload:%s",e->payload);

+ 7 - 9
ebpftracer/ebpf/utrace/go/include/utils.h

@@ -76,28 +76,26 @@ static __always_inline void copy_byte_arrays(unsigned char *src, unsigned char *
     }
 }
 
-static __always_inline void custom_hash(char *str, unsigned char *hash,u32 size) {
+static __always_inline void custom_hash(const char *str, unsigned char *hash,u32 size) {
 	u64 int_hash = 0;
 #pragma unroll
 	for (int i = 0; i < APM_REMOTE_ADDR_MAX_LEN; i++) { // Assuming maximum string length is 256
 		if (str[i] == '\0') break;
-		int_hash = (int_hash * 31) + str[i];
+		int_hash = (int_hash * 31) + str[i] ;
 	}
-
 	// Convert int_hash to 16 bytes
 	for (int i = 0; i < size; i++) {
 		hash[i] = int_hash % 10;
 		int_hash /= 10;
 	}
+	if (hash[0] == 0) hash[0] = 1;
+
 }
 
 static __always_inline void set_assumed_app_id_arrays(char *host, unsigned char *dist, u32 size) {
-	unsigned char hash[APM_ASSUMED_APP_ID_STRING_SIZE];
-	custom_hash(host, hash, APM_ASSUMED_APP_ID_STRING_SIZE);
-	for (int i = 0; i < APM_ASSUMED_APP_ID_STRING_SIZE; i++) {
-		bpf_printk("%u", hash[i]);
-	}
-	hex_string_to_bytes((char *) hash, APM_ASSUMED_APP_ID_STRING_SIZE, dist);
+	unsigned char hash[size];
+	custom_hash(host, hash, size);
+	hex_string_to_bytes((char *) hash, size, dist);
 }
 
 static __always_inline void bpf_memset(unsigned char *dst, u32 size, unsigned char value)

+ 30 - 6
ebpftracer/ebpf/utrace/go/net/client.probe.bpf.c

@@ -268,6 +268,8 @@ static __always_inline long cw_inject_header(void* headers_ptr, struct apm_span_
 SEC("uprobe/Transport_roundTrip")
 int uprobe_Transport_roundTrip(struct pt_regs *ctx) {
 	bpf_printk("[Uprobe HTTP Client] start");
+	__u64 goroutine_id = GOROUTINE(ctx);
+	bpf_printk("[Uprobe HTTP Client] goroutine_id:%llu",goroutine_id);
 
     u64 request_pos = 2;
     void *req_ptr = get_argument(ctx, request_pos);
@@ -314,17 +316,16 @@ int uprobe_Transport_roundTrip(struct pt_regs *ctx) {
 //	    }
 
 
-	    struct apm_span_context *cw_sc = cw_get_parent_tracking_span();
-		if(cw_sc){
+	    struct apm_span_context *cw_psc = cw_get_parent_tracking_span();
+		if(cw_psc){
 //			httpReq->apm_sc = *cw_sc;
-			bpf_probe_read(&httpReq->apm_psc, sizeof(httpReq->apm_psc), cw_sc);
+			bpf_probe_read(&httpReq->apm_psc, sizeof(httpReq->apm_psc), cw_psc);
 			// copy trace_id
 //			copy_byte_arrays(httpReq->apm_psc.instance_id, httpReq->apm_sc.instance_id, APM_INSTANCE_ID_SIZE);
 			copy_byte_arrays(httpReq->apm_psc.trace_id, httpReq->apm_sc.trace_id, APM_TRACE_ID_SIZE);
 			// new spanid
 			generate_random_bytes(httpReq->apm_sc.span_id, APM_SPAN_ID_SIZE);
 
-
 			// copy TraceID
 //			copy_byte_arrays(httpReq->apm_psc.trace_id, httpReq->sc.TraceID, TRACE_ID_SIZE);
 //			copy_byte_arrays(httpReq->psc.TraceID, httpReq->sc.TraceID, TRACE_ID_SIZE);
@@ -360,8 +361,31 @@ int uprobe_Transport_roundTrip(struct pt_regs *ctx) {
 //	}
 //	unsigned char hash[APM_ASSUMED_APP_ID_STRING_SIZE];
 
-	set_assumed_app_id_arrays(httpReq->host, httpReq->apm_sc.assumed_app_id, APM_ASSUMED_APP_ID_STRING_SIZE);
+	// set host_id/appid
+	u32 k0 = 0;
+	struct trace_conf_t *trace_conf = trace_conf_map__lookup(&k0);
+	if (trace_conf) {
+		for (int i = 0; i < 8; ++i) {
+//			bpf_printk("[Client] host_id:%02x", trace_conf->host_id[i]);
+		}
+		copy_byte_arrays(trace_conf->host_id, httpReq->apm_sc.host_id, APM_HOST_ID_SIZE);
+		copy_byte_arrays(trace_conf->app_id, httpReq->apm_sc.app_id, APM_APP_ID_SIZE);
+	}
+
+	__u64 pid_tgid = bpf_get_current_pid_tgid();
+	__u32 tgid = pid_tgid >> 32;
+	struct ebpf_proc_info *proc_info =
+			bpf_map_lookup_elem(&proc_info_map, &tgid);
+	if (proc_info) {
+		for (int i = 0; i < 8; ++i) {
+//			bpf_printk("[Client] instance_id:%02x", proc_info->instance_id[i]);
+		}
+		copy_byte_arrays(proc_info->instance_id, httpReq->apm_sc.instance_id, APM_APP_ID_SIZE);
+	}
 
+	// set assumed_app_id
+	set_assumed_app_id_arrays(httpReq->host, httpReq->apm_sc.assumed_app_id, APM_ASSUMED_APP_ID_STRING_SIZE);
+	cw_save_current_tracking_span(&httpReq->apm_sc);
 	// get proto from Request
     if (!get_go_string_from_user_ptr((void *)(req_ptr+request_proto_pos), httpReq->proto, sizeof(httpReq->proto))) {
         bpf_printk("uprobe_Transport_roundTrip: Failed to get proto from Request");
@@ -387,7 +411,7 @@ int uprobe_Transport_roundTrip(struct pt_regs *ctx) {
 SEC("uprobe/Transport_roundTrip")
 int uprobe_Transport_roundTrip_Returns(struct pt_regs *ctx) {
     bpf_printk("[Uprobe HTTP Client] Return start");
-
+	clear_current_span_context();
     u64 end_time = bpf_ktime_get_ns();
     void *req_ctx_ptr = get_Go_context(ctx, 2, ctx_ptr_pos, false);
     void *key = get_consistent_key(ctx, req_ctx_ptr);

+ 11 - 9
ebpftracer/ebpf/utrace/go/net/server.probe.bpf.c

@@ -88,14 +88,6 @@ struct {
 	__uint(max_entries, 1);
 } http_server_uprobe_storage_map SEC(".maps");
 
-//struct
-//{
-//	__uint(type, BPF_MAP_TYPE_LRU_HASH);
-//	__uint(key_size, sizeof(struct apm_trace_key_t));
-//	__uint(value_size, sizeof(struct span_context));
-//	__uint(max_entries, 1);
-//} apm_span_context_map SEC(".maps");
-
 struct {
 	__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
 } events SEC(".maps");
@@ -306,6 +298,16 @@ int uprobe_HandlerFunc_ServeHTTP(struct pt_regs *ctx) {
 		generate_random_bytes(http_server_span->sc.SpanID, SPAN_ID_SIZE);
 	} else {
 		http_server_span->sc = generate_span_context();
+
+		struct apm_span_context *cw_parent_span_context = bpf_map_lookup_elem(&cw_parent_span_context_storage_map, &map_id);
+		if (!cw_parent_span_context) {
+			return 0;
+		}
+		struct apm_span_context context = {};
+		generate_random_bytes(context.trace_id, TRACE_ID_SIZE);
+//		generate_random_bytes(context.span_id, SPAN_ID_SIZE);
+		// 保存 trace_id 到psc
+		cw_save_parent_tracking_span(&context);
 	}
 
 	if (req_ctx_ptr == NULL) {
@@ -386,7 +388,7 @@ int uprobe_HandlerFunc_ServeHTTP_Returns(struct pt_regs *ctx) {
 	stop_tracking_span(&http_server_span->sc, &http_server_span->psc);
 
 
-	clear_span_context();
+	clear_parent_span_context();
 	bpf_printk("HTTP_END");
 	return 0;
 }

+ 19 - 11
ebpftracer/l7/l7.go

@@ -8,7 +8,7 @@ import (
 type Protocol uint8
 
 const (
-	ProtocolTrace     Protocol = 200
+	ProtocolTrace Protocol = 200
 
 	ProtocolHTTP      Protocol = 1
 	ProtocolPostgres  Protocol = 2
@@ -23,7 +23,6 @@ const (
 	ProtocolHTTP2     Protocol = 11
 	ProtocolDubbo2    Protocol = 12
 	ProtocolDNS       Protocol = 13
-
 )
 
 func (p Protocol) String() string {
@@ -139,13 +138,22 @@ func (s Status) Error() bool {
 }
 
 type RequestData struct {
-	Protocol    Protocol
-	Status      Status
-	Duration    time.Duration
-	Method      Method
-	StatementId uint32
-	Payload     []byte
-	TraceId     uint64
-	TraceStart  uint32
-	TraceEnd    uint32
+	Protocol          Protocol
+	Status            Status
+	Duration          time.Duration
+	Method            Method
+	StatementId       uint32
+	Payload           []byte
+	TraceId           uint64
+	TraceStart        uint32
+	TraceEnd          uint32
+	AssumedAppId      string
+	SpanId            string
+	ParentSpanContext struct {
+		TraceIdFrom    string
+		CalledId       string
+		InstanceIdFrom string
+		AppIdFrom      string
+		SpanIdFrom     string
+	}
 }

+ 3 - 1
ebpftracer/tls.go

@@ -8,6 +8,7 @@ import (
 	"errors"
 	"fmt"
 	"github.com/coroot/coroot-node-agent/ebpftracer/tracer"
+	"github.com/coroot/coroot-node-agent/utils"
 	"os"
 	"regexp"
 	"strconv"
@@ -114,7 +115,7 @@ func (t *Tracer) AttachOpenSslUprobes(pid uint32) []link.Link {
 	return links
 }
 
-func (t *Tracer) AttachGoTlsUprobes(pid uint32) []link.Link {
+func (t *Tracer) AttachGoTlsUprobes(pid uint32, insID utils.ID) []link.Link {
 	if t.disableL7Tracing {
 		return nil
 	}
@@ -219,6 +220,7 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32) []link.Link {
 			info.NetTCPConnItab = uint64(0)
 			info.CryptoTLSConnItab = uint64(0)
 			info.CredentialsSyscallConnItab = uint64(0)
+			info.InstanceId = insID.HashtVal
 			_, err = tracer.UpdateProcInfoToMap(t.collection, pid, info)
 			if err != nil {
 				klog.Error("failed to update program info", err)

+ 35 - 8
ebpftracer/tracer.go

@@ -3,6 +3,7 @@ package ebpftracer
 import (
 	"bytes"
 	"encoding/binary"
+	"encoding/hex"
 	"errors"
 	"fmt"
 	"github.com/cilium/ebpf"
@@ -12,6 +13,7 @@ import (
 	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
 	"github.com/coroot/coroot-node-agent/ebpftracer/tracer"
 	"github.com/coroot/coroot-node-agent/proc"
+	"github.com/coroot/coroot-node-agent/utils"
 	"golang.org/x/mod/semver"
 	"golang.org/x/sys/unix"
 	"inet.af/netaddr"
@@ -401,6 +403,14 @@ type l7Event struct {
 	TraceId             uint64
 	TraceStart          uint32
 	TraceEnd            uint32
+	AssumedAppId        utils.HashByte
+	SpanId              utils.HashByte
+
+	TraceIdFrom    utils.HashByte16
+	CalledId       utils.HashByte
+	InstanceIdFrom utils.HashByte
+	AppIdFrom      utils.HashByte
+	SpanIdFrom     utils.HashByte
 }
 
 type SocketDataBufferddd struct {
@@ -566,16 +576,33 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				klog.Warningln("failed to read msg:", err)
 				continue
 			}
+			fmt.Println("v.TraceIdFrom")
+			fmt.Println(v.TraceIdFrom)
+			a := hex.EncodeToString(v.TraceIdFrom[:])
+			//for _, b := range v.AssumedAppId {
+			//	fmt.Printf("v.AssumedAppId- %02\n", b)
+			//}
+			fmt.Println(a)
+
 			payload := reader.Bytes()
 			req := &l7.RequestData{
-				Protocol:    l7.Protocol(v.Protocol),
-				Status:      l7.Status(v.Status),
-				Duration:    time.Duration(v.Duration),
-				Method:      l7.Method(v.Method),
-				StatementId: v.StatementId,
-				TraceId:     v.TraceId,
-				TraceStart:  v.TraceStart,
-				TraceEnd:    v.TraceEnd,
+				Protocol:     l7.Protocol(v.Protocol),
+				Status:       l7.Status(v.Status),
+				Duration:     time.Duration(v.Duration),
+				Method:       l7.Method(v.Method),
+				StatementId:  v.StatementId,
+				TraceId:      v.TraceId,
+				TraceStart:   v.TraceStart,
+				TraceEnd:     v.TraceEnd,
+				AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]),
+				SpanId:       hex.EncodeToString(v.SpanId[:]),
+			}
+			if v.TraceEnd == 1 {
+				req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])
+				req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:])
+				req.ParentSpanContext.InstanceIdFrom = hex.EncodeToString(v.InstanceIdFrom[:])
+				req.ParentSpanContext.AppIdFrom = hex.EncodeToString(v.AppIdFrom[:])
+				req.ParentSpanContext.SpanIdFrom = hex.EncodeToString(v.SpanIdFrom[:])
 			}
 			switch {
 			case v.PayloadSize == 0:

+ 1 - 0
ebpftracer/tracer/btf_vmlinux.go

@@ -68,6 +68,7 @@ func bpf_table_pre_set_value(collectionSpec *ebpf.CollectionSpec, opts *ebpf.Col
 		key := make([]byte, 4)                // Assuming int key size is 4 bytes
 		binary.LittleEndian.PutUint32(key, 0) // Assuming the key is an integer
 		if err = newMap.Update(key, data, ebpf.UpdateAny); err != nil {
+			fmt.Println(err)
 			return ETR_UPDATE_MAP_FAILD
 		}
 		opts.MapReplacements[mapName] = newMap

+ 30 - 0
ebpftracer/tracer/common.go

@@ -1,5 +1,7 @@
 package tracer
 
+import "github.com/coroot/coroot-node-agent/utils"
+
 const (
 	ETR_OK               = 0
 	ETR_NOTEXIST         = -4
@@ -64,6 +66,10 @@ const (
 	PROTO_NUM        = 130
 )
 
+const (
+	HASH_SIZE = 8
+)
+
 var EbpfConfigProtocolFilter [PROTO_NUM]uint32
 
 type testStruct struct {
@@ -120,14 +126,38 @@ type traceConf struct {
 	GoTracingTimeout       uint64
 	IOEventCollectMode     uint64
 	IOEventMinimalDuartion uint64
+	HostID                 utils.HashByte
+	APPID                  utils.HashByte
 }
 
+/*
+	struct ebpf_proc_info {
+		__u32 version;
+		__u16 offsets[OFFSET_IDX_MAX];
+
+		// In golang, itab represents type, and in interface, struct is represented
+		// by the address of itab. We use itab to judge the structure type, and
+		// find the fd representing the connection after multiple jumps. These
+		// types are not available in Go ELF files without a symbol table.
+		// Go 用 itab 表示类型, 在 interface 中通过 itab 确定具体的 struct, 并根据
+		// struct 找到表示连接的 fd.
+		__u64 net_TCPConn_itab;
+		__u64 crypto_tls_Conn_itab; // TLS_HTTP1,TLS_HTTP2
+		__u64 credentials_syscallConn_itab; // gRPC
+
+//	__u64 start_addr;
+//	__u64 end_addr;
+
+		unsigned char instance_id[APM_INSTANCE_ID_SIZE];
+	} __attribute__((packed))
+*/
 type EbpfProcInfo struct {
 	Version                    uint32
 	Offsets                    [OFFSET_IDX_MAX]uint16
 	NetTCPConnItab             uint64
 	CryptoTLSConnItab          uint64 // TLS_HTTP1,TLS_HTTP2
 	CredentialsSyscallConnItab uint64 // gRPC
+	InstanceId                 utils.HashByte
 }
 
 type allowPortBitmap struct {

+ 22 - 3
ebpftracer/tracer/socket.go

@@ -4,6 +4,7 @@ import (
 	"fmt"
 	"github.com/cilium/ebpf"
 	"github.com/cilium/ebpf/btf"
+	"github.com/coroot/coroot-node-agent/utils"
 	"k8s.io/klog/v2"
 	"net"
 	"os"
@@ -138,7 +139,18 @@ func set_offset_map(collectionSpec *ebpf.CollectionSpec, opts *ebpf.CollectionOp
 	}
 }
 
+// __u64 socket_id;       // 会话标识
+// __u64 coroutine_trace_id;  // 同一协程的数据转发关联
+// __u64 thread_trace_id; // 同一进程/线程的数据转发关联,用于多事务流转场景
+// __u64 data_limit_max;  // Maximum number of data transfers
+// __u64 go_tracing_timeout;
+// __u64 io_event_collect_mode;
+// __u64 io_event_minimal_duration;
 func set_conf_map_default(collectionSpec *ebpf.CollectionSpec, opts *ebpf.CollectionOptions) {
+	fmt.Println("GetHostID")
+	_, charHostID := utils.GetHostID()
+	_, charAppID := utils.GetAppID()
+
 	uidBase := uint64(time.Now().UnixNano()/int64(time.Millisecond)) & 0xffffffffffffff
 	numCPU := runtime.NumCPU()
 	tConf := make([]any, numCPU)
@@ -152,6 +164,8 @@ func set_conf_map_default(collectionSpec *ebpf.CollectionSpec, opts *ebpf.Collec
 			GoTracingTimeout:       120,
 			IOEventCollectMode:     1,
 			IOEventMinimalDuartion: 1000000,
+			HostID:                 charHostID,
+			APPID:                  charAppID,
 		}
 		tConf[i] = tracerConf
 	}
@@ -313,6 +327,7 @@ func update_offsets_table(collectionSpec *ebpf.CollectionSpec, opts *ebpf.Collec
 
 func SetConstants(collectionSpec *ebpf.CollectionSpec) {
 	consts := map[string]interface{}{
+		// TODO go Process
 		"buckets_ptr_pos":  int64(16),
 		"ctx_ptr_pos":      int64(232),
 		"headers_ptr_pos":  int64(56),
@@ -326,10 +341,14 @@ func SetConstants(collectionSpec *ebpf.CollectionSpec) {
 		"req_ptr_pos":      int64(8),
 		"status_code_pos":  int64(120),
 		"url_ptr_pos":      int64(16),
-		// todo in process
+		// TODO in process ***
+		"start_addr": int64(139627412217856),
+		"end_addr":   int64(139627412283392),
+
+		// TODO 全局 ***
 		"total_cpus": int64(2),
-		"start_addr": int64(139867793264640),
-		"end_addr":   int64(139867793330176),
+		//"apm_app_id":  int64(0),
+		//"apm_host_id": int64(0),
 	}
 	err := collectionSpec.RewriteConstants(consts)
 	fmt.Println("err----------------", err)

+ 26 - 9
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -2,6 +2,7 @@ package otlptrace
 
 import (
 	"crypto/md5"
+	"encoding/json"
 	"fmt"
 	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform"
 	tracesdk "go.opentelemetry.io/otel/sdk/trace"
@@ -33,9 +34,9 @@ type RootDataT struct {
 	AgentId        int64         `json:"agent_id"`
 	AgentVersion   string        `json:"agent_version"`
 	AppId          int64         `json:"app_id"`
-	AppIdFrom      int           `json:"app_id_from"` // from header
+	AppIdFrom      int64         `json:"app_id_from"` // from header app_id
 	AppName        string        `json:"app_name"`
-	CalledId       int           `json:"called_id"` // from header
+	CalledId       int64         `json:"called_id"` // from header assumed_app_id
 	ClientIp       string        `json:"client_ip"`
 	CollTime       uint64        `json:"coll_time"`
 	Cpu            int           `json:"cpu"`
@@ -45,7 +46,7 @@ type RootDataT struct {
 	HttpCode       int64         `json:"http_code"`
 	HttpMethod     string        `json:"http_method"`
 	InstanceId     int64         `json:"instance_id"`
-	InstanceIdFrom int           `json:"instance_id_from"` // from header
+	InstanceIdFrom int64         `json:"instance_id_from"` // from header instance_id
 	LocalPort      int64         `json:"local_port"`
 	Maps           []MapInfoT    `json:"maps"`
 	MemU           int           `json:"mem_u"`
@@ -60,11 +61,11 @@ type RootDataT struct {
 	ServiceType    string        `json:"service_type"`
 	Sip            string        `json:"sip"`
 	Sn             string        `json:"sn"`
-	SpanIdFrom     string        `json:"span_id_from"` // from header
+	SpanIdFrom     string        `json:"span_id_from"` // from header span_id
 	Sport          int64         `json:"sport"`
 	TId            int           `json:"t_id"`
 	TName          string        `json:"t_name"`
-	TraceId        string        `json:"trace_id"` // from header
+	TraceId        string        `json:"trace_id"` // from header trace_id
 	TransIds       []interface{} `json:"trans_ids"`
 	TypeFrom       string        `json:"type_from"`
 	Uri            string        `json:"uri"`
@@ -94,6 +95,7 @@ type MapInfoT struct {
 	Schema         string   `json:"schema,omitempty"`
 	AssumedAppId   int64    `json:"assumed_app_id,omitempty"`
 	Uri            string   `json:"uri,omitempty"`
+	SpanId         string   `json:"span_id,omitempty"`
 }
 
 type TraceMapT struct {
@@ -138,9 +140,9 @@ func tracetransformData(sdl []tracesdk.ReadOnlySpan) []RootDataT {
 	}
 
 	// Transform the categorized map into a slice
-	//aa, err := json.Marshal(sendData)
-	//fmt.Println(err)
-	//fmt.Println(string(aa))
+	aa, err := json.Marshal(sendData)
+	fmt.Println(err)
+	fmt.Println(string(aa))
 	//fmt.Println(len(sendData))
 	//fmt.Println(len(TraceRootMap))
 	return sendData
@@ -245,6 +247,7 @@ func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) {
 	traceRoot.RootData.CollTime = mNode.StartTime
 	traceRoot.Index = 1
 	for _, attr := range sd.Attributes() {
+		fmt.Println(attr.Key, ":", attr.Value.AsInterface())
 		switch attr.Key {
 		case "http.uri":
 			traceRoot.RootData.Uri = attr.Value.AsString()
@@ -259,6 +262,16 @@ func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) {
 		case "net.peer.port":
 			traceRoot.RootData.Sport = attr.Value.AsInt64()
 			traceRoot.RootData.LocalPort = attr.Value.AsInt64()
+		case "server.trace_id_from":
+			traceRoot.RootData.TraceId = attr.Value.AsString()
+		case "server.called_id":
+			traceRoot.RootData.CalledId = attr.Value.AsInt64()
+		case "server.instance_id_from":
+			traceRoot.RootData.InstanceIdFrom = attr.Value.AsInt64()
+		case "server.app_id_from":
+			traceRoot.RootData.AppIdFrom = attr.Value.AsInt64()
+		case "server.span_id_from":
+			traceRoot.RootData.SpanIdFrom = attr.Value.AsString()
 		}
 	}
 
@@ -281,9 +294,13 @@ func buildHttpMap(mNode *MapInfoT, sd apmTraceSpan) {
 			descAddr += ":" + attr.Value.AsString()
 		case "http.uri":
 			mNode.Uri = attr.Value.AsString()
+		case "http.assumed_app_id":
+			mNode.AssumedAppId = attr.Value.AsInt64()
+		case "http.span_id":
+			mNode.SpanId = attr.Value.AsString()
 		}
 	}
-	mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
+	//mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
 }
 
 func buildMysqlMap(mNode *MapInfoT, sd apmTraceSpan) {

+ 32 - 2
tracing/apm_tracing.go

@@ -9,6 +9,7 @@ import (
 	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
 	"go.opentelemetry.io/otel/trace"
 	"inet.af/netaddr"
+	"strconv"
 	"time"
 )
 
@@ -44,7 +45,28 @@ func (t *Trace) TraceEnd(r *l7.RequestData) {
 	if t == nil {
 		return
 	}
-	t.span.SetAttributes(semconv.HTTPStatusCode(int(r.Status)))
+	t.span.SetAttributes(
+		semconv.HTTPStatusCode(int(r.Status)),
+		attribute.String("server.trace_id_from", r.ParentSpanContext.TraceIdFrom),
+	)
+
+	CalledId, err := strconv.ParseInt(r.ParentSpanContext.CalledId, 10, 64)
+	if err == nil && CalledId != 0 {
+		t.span.SetAttributes(attribute.Int64("server.called_id", CalledId))
+	}
+
+	InstanceIdFrom, err := strconv.ParseInt(r.ParentSpanContext.InstanceIdFrom, 10, 64)
+	if err == nil && InstanceIdFrom != 0 {
+		t.span.SetAttributes(attribute.Int64("server.instance_id_from", InstanceIdFrom))
+	}
+
+	AppIdFrom, err := strconv.ParseInt(r.ParentSpanContext.AppIdFrom, 10, 64)
+	if err == nil && AppIdFrom != 0 {
+		t.span.SetAttributes(attribute.Int64("server.app_id_from", AppIdFrom))
+	}
+	if r.ParentSpanContext.SpanIdFrom != "0000000000000000" {
+		t.span.SetAttributes(attribute.String("server.span_id_from", r.ParentSpanContext.SpanIdFrom))
+	}
 	t.span.End(trace.WithTimestamp(time.Now()))
 }
 
@@ -101,16 +123,24 @@ func (t *Trace) RedisTraceQuery(cmd, args string, error bool, duration time.Dura
 	)
 }
 
-func (t *Trace) HttpTraceRequest(method, path, ip string, port uint16, status l7.Status, duration time.Duration) {
+func (t *Trace) HttpTraceRequest(method, path, ip string, port uint16, r *l7.RequestData) {
 	if t == nil || method == "" {
 		return
 	}
+	assumedAppID, err := strconv.ParseInt(r.AssumedAppId, 10, 64)
+	if err != nil {
+		assumedAppID = 0
+	}
+	status := r.Status
+	duration := r.Duration
 	t.createTraceSpan(l7.ProtocolHTTP.String(), duration, status >= 400,
 		semconv.HTTPURL(fmt.Sprintf("http://%s%s", t.destination.String(), path)),
 		semconv.HTTPMethod(method),
 		semconv.HTTPStatusCode(int(status)),
 		attribute.String("http.uri", path),
 		attribute.String("http.ip", ip),
+		attribute.Int64("http.assumed_app_id", assumedAppID),
+		attribute.String("http.span_id", r.SpanId),
 		attribute.Int("http.port", int(port)),
 	)
 }

+ 236 - 0
utils/id.go

@@ -0,0 +1,236 @@
+package utils
+
+import (
+	"crypto/md5"
+	"fmt"
+	"math"
+	"os"
+	"path"
+	"strconv"
+	"strings"
+)
+
+const (
+	HASH_SIZE_8  = 8
+	HASH_SIZE_16 = 16
+)
+
+// apm_span_context
+type ApmSpanContextT struct {
+	type_from      [1]byte
+	sample         [1]byte
+	host_id        [8]byte
+	app_id         [8]byte
+	instance_id    [8]byte
+	trace_id       [16]byte
+	assumed_app_id [8]byte
+	span_id        [8]byte
+}
+
+type HashByte [HASH_SIZE_8]byte
+
+type HashByte16 [HASH_SIZE_16]byte
+
+type Id_T struct {
+	AppID      ID
+	HostID     ID
+	InstanceID ID
+}
+
+type ID struct {
+	IntVal   int64
+	HashtVal HashByte
+}
+
+func init() {
+
+}
+
+var (
+	APP_ID_INT64      int64
+	HOST_ID_INT64     int64
+	INSTANCE_ID_INT64 int64
+
+	APP_ID_BYTE      HashByte
+	HOST_ID_BYTE     HashByte
+	INSTANCE_ID_BYTE HashByte
+)
+
+func SetInsID(str string) (int64, HashByte) {
+	srcCode := md5.Sum([]byte(str))
+	code := fmt.Sprintf("%x", srcCode)
+	id_string := md5ToDec(code)
+	fmt.Println(id_string)
+	id, err := strconv.ParseInt(id_string, 10, 64)
+	if err != nil {
+		return 0, HashByte{}
+	}
+	INSTANCE_ID_INT64 = id
+	var charArray HashByte
+	hexStringToBPFBytes(id_string, &charArray)
+	INSTANCE_ID_BYTE = charArray
+	return id, charArray
+}
+
+func GetHostID() (int64, HashByte) {
+	if HOST_ID_INT64 != 0 {
+		return HOST_ID_INT64, HOST_ID_BYTE
+	}
+	uuid := MachineID()
+	str := fmt.Sprintf("%s:%s", "110", uuid)
+	srcCode := md5.Sum([]byte(str))
+	code := fmt.Sprintf("%x", srcCode)
+	host_id_string := md5ToDec(code)
+	fmt.Println(host_id_string)
+	id, err := strconv.ParseInt(host_id_string, 10, 64)
+	if err != nil {
+		return 0, HashByte{}
+	}
+
+	fmt.Println(host_id_string)
+	HOST_ID_INT64 = id
+
+	var charArray HashByte
+	hexStringToBPFBytes(host_id_string, &charArray)
+	HOST_ID_BYTE = charArray
+	// 将rune切片复制到char数组中
+	//for i := 0; i < HASH_SIZE; i++ {
+	//	charArray[i] = []byte(host_id_string)[i]
+	//}
+	return id, charArray
+}
+
+func GetAppID() (int64, HashByte) {
+	if APP_ID_INT64 != 0 {
+		return APP_ID_INT64, APP_ID_BYTE
+	}
+	//str := fmt.Sprintf("%s:%s", "110", uuid)
+	//srcCode := md5.Sum([]byte(str))
+	//code := fmt.Sprintf("%x", srcCode)
+	//id_string := md5ToDec(code)
+	id_string := "5410049101545798"
+	fmt.Println(id_string)
+	id, err := strconv.ParseInt(id_string, 10, 64)
+	if err != nil {
+		return 0, HashByte{}
+	}
+	APP_ID_INT64 = id
+	var charArray HashByte
+	hexStringToBPFBytes(id_string, &charArray)
+	APP_ID_BYTE = charArray
+	return id, charArray
+}
+
+//func GetHostID2() uint64 {
+//	if HOST_ID_UINT64 != 0 {
+//		return HOST_ID_UINT64
+//	}
+//	uuid := MachineID()
+//	hash := make([]byte, HASH_SIZE)
+//	customHash(uuid, hash, HASH_SIZE)
+//
+//	fmt.Print("Hashed string: ")
+//	for i := 0; i < HASH_SIZE; i++ {
+//		fmt.Printf("%d", hash[i])
+//	}
+//	hashInt := byteArrayToInt(hash)
+//	fmt.Println("HashByte as integer:", hashInt)
+//	str := strconv.FormatInt(int64(hashInt), 10)
+//
+//	// 将 hash 转换为字符串(可以使用 Base64 或其他编码)
+//	fmt.Println("HashByte as string:", str)
+//	HOST_ID_UINT64 = hashInt
+//	os.Exit(1)
+//	return hashInt
+//}
+
+func MachineID() string {
+	for _, p := range []string{"sys/devices/virtual/dmi/id/product_uuid", "etc/machine-id", "var/lib/dbus/machine-id"} {
+		payload, err := os.ReadFile(path.Join("/proc/1/root", p))
+		if err != nil {
+			continue
+		}
+		id := strings.TrimSpace(strings.Replace(string(payload), "-", "", -1))
+		return id
+	}
+	return ""
+}
+
+func customHash(str string, hash []byte, size int) {
+	var intHash uint64 = 0
+	for i := 0; i < len(str); i++ {
+		intHash = (intHash * 31) + uint64(str[i])
+	}
+
+	// Convert intHash to 16 bytes
+	for i := 0; i < size; i++ {
+		hash[i] = byte(intHash % 10)
+		intHash /= 10
+	}
+	if hash[0] == 0 {
+		hash[0] = 1
+	}
+}
+
+func byteArrayToInt(hash []byte) uint64 {
+	var result uint64
+	for _, b := range hash {
+		result = result*10 + uint64(b)
+	}
+	return result
+}
+
+func hexdec(hexStr string) int {
+	dec, _ := strconv.ParseInt(hexStr, 16, 64)
+	return int(dec)
+}
+
+func md5ToDec(str string) string {
+	strCode := ""
+	for i := 0; i < 16; i++ {
+		strCode += strconv.Itoa(int(math.Floor(float64(hexdec(string(str[i]))) / 16.0 * 10)))
+		if i == 0 && strings.TrimSpace(strCode) == "0" {
+			strCode = "1" + strCode
+		}
+	}
+	return strCode
+}
+
+//func hexStringToBPFBytes(str string, size int) []byte {
+//	out := make([]byte, size/2)
+//	for i := 0; i < size/2; i++ {
+//		ch0 := str[2*i]
+//		ch1 := str[2*i+1]
+//		nib0 := (ch0 & 0x0F) + (ch0 >> 6) | ((ch0 >> 3) & 0x08)
+//		nib1 := (ch1 & 0x0F) + (ch1 >> 6) | ((ch1 >> 3) & 0x08)
+//		out[i] = (nib0 << 4) | nib1
+//	}
+//	return out
+//}
+
+func hexStringToBPFBytes(str string, out *HashByte) {
+	for i := 0; i < len(str)/2; i++ {
+		ch0 := str[2*i]
+		ch1 := str[2*i+1]
+		nib0 := (ch0 & 0x0F) + (ch0 >> 6) | ((ch0 >> 3) & 0x08)
+		nib1 := (ch1 & 0x0F) + (ch1 >> 6) | ((ch1 >> 3) & 0x08)
+		(*out)[i] = (nib0 << 4) | nib1
+	}
+}
+
+// hex 表示十六进制字符
+var hex = []byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}
+
+// bytesToHexString 将字节数组转换为十六进制字符串
+func BytesToHexString(pin HashByte) string {
+	size := len(pin)
+	out := make([]byte, size*2)
+	pout := 0
+	for i := 0; i < size; i++ {
+		out[pout] = hex[(pin[i]>>4)&0xF]
+		pout++
+		out[pout] = hex[pin[i]&0xF]
+		pout++
+	}
+	return string(out)
+}