Jelajahi Sumber

Fixed #TASK_GK-2944 trace

Fixed #TASK_GK-2944 trace app

Fixed #TASK_GK-2944 bug from connection

Fixed #TASK_GK-2944 build data

Fixed #TASK_GK-2944 trace

Fixed #TASK_GK-2944 trace app

Fixed #TASK_GK-2944 bug from connection

Fixed #TASK_GK-2944 build data
Carl 2 tahun lalu
induk
melakukan
b49c308712

+ 23 - 3
containers/container.go

@@ -141,6 +141,8 @@ type Container struct {
 	lock sync.RWMutex
 
 	done chan struct{}
+
+	traceMap map[uint64]*tracing.Trace
 }
 
 func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32) (*Container, error) {
@@ -175,7 +177,8 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
 
 		hostConntrack: hostConntrack,
 
-		done: make(chan struct{}),
+		done:     make(chan struct{}),
+		traceMap: make(map[uint64]*tracing.Trace),
 	}
 
 	for _, n := range md.networks {
@@ -884,6 +887,7 @@ func (c *Container) gc(now time.Time) {
 	establishedDst := map[netaddr.IPPort]struct{}{}
 	listens := map[netaddr.IPPort]string{}
 	seenNamespaces := map[string]bool{}
+	fdMap := map[uint64]struct{}{}
 	for _, p := range c.processes {
 		if seenNamespaces[p.NetNsId] {
 			continue
@@ -892,6 +896,14 @@ func (c *Container) gc(now time.Time) {
 		if err != nil {
 			continue
 		}
+
+
+		fds, err := proc.ReadFds(p.Pid)
+		if err == nil {
+			for _, fd := range fds {
+				fdMap[fd.Fd] = struct{}{}
+			}
+		}
 		for _, s := range sockets {
 			if s.Listen {
 				listens[s.SAddr] = s.Inode
@@ -900,6 +912,7 @@ func (c *Container) gc(now time.Time) {
 				establishedDst[s.DAddr] = struct{}{}
 			}
 		}
+
 		seenNamespaces[p.NetNsId] = true
 	}
 
@@ -908,9 +921,7 @@ func (c *Container) gc(now time.Time) {
 			delete(c.ipsByNs, ns)
 		}
 	}
-
 	c.revalidateListens(now, listens)
-
 	for srcDst, conn := range c.connectionsActive {
 		pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
 		if _, ok := established[srcDst]; !ok {
@@ -920,6 +931,7 @@ func (c *Container) gc(now time.Time) {
 			}
 			continue
 		}
+
 		if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
 			delete(c.connectionsActive, srcDst)
 			if conn == c.connectionsByPidFd[pidFd] {
@@ -927,6 +939,14 @@ func (c *Container) gc(now time.Time) {
 			}
 		}
 	}
+
+	for _, conn := range c.connectionsByPidFd {
+
+		if _, ok := fdMap[conn.Fd]; !ok {
+			delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
+		}
+	}
+
 	for dst, at := range c.connectLastAttempt {
 		_, active := establishedDst[dst]
 		if !active && !at.IsZero() && now.Sub(at) > gcInterval {

+ 138 - 0
containers/container_apm.go

@@ -0,0 +1,138 @@
+package containers
+
+import (
+	"fmt"
+	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
+	"github.com/coroot/coroot-node-agent/tracing"
+	"inet.af/netaddr"
+)
+
+func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
+	trace, ok := c.traceMap[traceId]
+	return trace, ok
+}
+
+func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
+	method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
+	ip, err := netaddr.ParseIP(hostIp)
+	if err != nil {
+		return fmt.Errorf("host ip error")
+	}
+	addr := netaddr.IPPortFrom(ip, port)
+	trace := tracing.NewTrace(string(c.id), addr)
+	if trace == nil {
+		return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
+	}
+	c.traceMap[traceId] = trace
+
+	trace.TraceStart(method, path, r.Status, r.Duration)
+	return nil
+}
+
+func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+	if r.Protocol == l7.ProtocolTrace {
+		fmt.Println("r.TraceStart:", r.TraceStart)
+		fmt.Println("r.TraceEnd:", r.TraceEnd)
+
+		if r.TraceStart == 1 {
+			fmt.Println("====ProtocolTrace start1====")
+			err := c.InitTrace(r.TraceId, r)
+			if err != nil {
+				fmt.Println(err)
+			}
+			//fmt.Println("init r.TraceId:", r.TraceId)
+			//trace, _ := c.getTrace(r.TraceId)
+			//fmt.Println("init traceId", trace)
+			//stats.observe(r.Status.Http(), "", r.Duration)
+			//method, path := l7.ParseHttp(r.Payload)
+			//fmt.Println("r.Payload:", string(r.Payload))
+			//fmt.Println("method:", method)
+			//fmt.Println("path:", path)
+			//fmt.Println("====ProtocolTrace start2====")
+			return
+		}
+		if r.TraceEnd == 1 {
+			//fmt.Println("r:", r)
+			//fmt.Println("r.Payload:", string(r.Payload))
+			//fmt.Println("====ProtocolTrace end2====")
+			trace, ok := c.getTrace(r.TraceId)
+			if ok {
+				trace.TraceEnd(r)
+				delete(c.traceMap, r.TraceId)
+			}
+			fmt.Println("====ProtocolTrace end1====")
+			return
+		}
+	}
+	conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
+	fmt.Println(conn, pid, fd)
+	if conn == nil {
+		return
+	}
+	if timestamp != 0 && conn.Timestamp != timestamp {
+		return
+	}
+	stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
+	trace := tracing.NewTrace(string(c.id), conn.ActualDest)
+	switch r.Protocol {
+	case l7.ProtocolHTTP:
+		stats.observe(r.Status.Http(), "", r.Duration)
+		method, path := l7.ParseHttp(r.Payload)
+		trace.HttpRequest(method, path, r.Status, r.Duration)
+	case l7.ProtocolHTTP2:
+		if conn.http2Parser == nil {
+			conn.http2Parser = l7.NewHttp2Parser()
+		}
+		requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
+		for _, req := range requests {
+			stats.observe(req.Status.Http(), "", req.Duration)
+			trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
+		}
+	case l7.ProtocolPostgres:
+		if r.Method != l7.MethodStatementClose {
+			stats.observe(r.Status.String(), "", r.Duration)
+		}
+		if conn.postgresParser == nil {
+			conn.postgresParser = l7.NewPostgresParser()
+		}
+		query := conn.postgresParser.Parse(r.Payload)
+		trace.PostgresQuery(query, r.Status.Error(), r.Duration)
+	case l7.ProtocolMysql:
+		fmt.Println("mysql mysql")
+		//fmt.Println(conn)
+		if r.Method != l7.MethodStatementClose {
+			stats.observe(r.Status.String(), "", r.Duration)
+		}
+		if conn.mysqlParser == nil {
+			conn.mysqlParser = l7.NewMysqlParser()
+		}
+		query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
+		//trace.MysqlQuery(query, r.Status.Error(), r.Duration)
+
+		trace2, ok := c.getTrace(r.TraceId)
+		fmt.Println("mysql r.TraceId:", r.TraceId)
+		fmt.Println("ok:", ok)
+		fmt.Println("traceMap:", len(c.traceMap))
+		if ok {
+			trace2.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
+		}
+	case l7.ProtocolMemcached:
+		stats.observe(r.Status.String(), "", r.Duration)
+		cmd, items := l7.ParseMemcached(r.Payload)
+		trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
+	case l7.ProtocolRedis:
+		stats.observe(r.Status.String(), "", r.Duration)
+		cmd, args := l7.ParseRedis(r.Payload)
+		trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
+	case l7.ProtocolMongo:
+		stats.observe(r.Status.String(), "", r.Duration)
+		query := l7.ParseMongo(r.Payload)
+		trace.MongoQuery(query, r.Status.Error(), r.Duration)
+	case l7.ProtocolKafka, l7.ProtocolCassandra:
+		stats.observe(r.Status.String(), "", r.Duration)
+	case l7.ProtocolRabbitmq, l7.ProtocolNats:
+		stats.observe(r.Status.String(), r.Method.String(), 0)
+	}
+}

+ 19 - 7
containers/registry.go

@@ -163,8 +163,10 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 				delete(r.containersById, id)
 				c.Close()
 			}
-
 		case e, more := <-ch:
+			if e.Pid == uint32(os.Getpid()) {
+				continue
+			}
 			if !more {
 				return
 			}
@@ -237,11 +239,13 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 					}
 				}
 			case ebpftracer.EventTypeL7Request:
+				fmt.Println("EventTypeL7Request")
+				fmt.Println("e.L7Request Payload:", string(e.L7Request.Payload))
 				if e.L7Request == nil {
 					continue
 				}
 				if c := r.containersByPid[e.Pid]; c != nil {
-					c.onL7Request(e.Pid, e.Fd, e.Timestamp, e.L7Request)
+					c.onL7RequestApm(e.Pid, e.Fd, e.Timestamp, e.L7Request)
 				}
 			}
 		}
@@ -261,7 +265,8 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
 		}
 		return nil
 	}
-	if c := r.containersByCgroupId[cg.Id]; c != nil {
+	cgId := fmt.Sprintf("%s/%d", cg.Id, pid)
+	if c := r.containersByCgroupId[cgId]; c != nil {
 		r.containersByPid[pid] = c
 		return c
 	}
@@ -282,7 +287,8 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
 		return nil
 	}
 	// add ns/workload/podname
-	id, extensionTag := calcId(cg, md)
+	id, extensionTag := calcId(cg, md, pid)
+
 	klog.Infof("calculated container id %d -> %s -> %s", pid, cg.Id, id)
 	if id == "" {
 		if cg.Id == "/init.scope" && pid != 1 {
@@ -306,9 +312,10 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
 		}
 		setK8sTag(c, extensionTag, pid)
 		r.containersByPid[pid] = c
-		r.containersByCgroupId[cg.Id] = c
+		r.containersByCgroupId[cgId] = c
 		return c
 	}
+
 	c, err := NewContainer(id, cg, md, r.hostConntrack, pid)
 	if err != nil {
 		klog.Warningf("failed to create container pid=%d cg=%s id=%s: %s", pid, cg.Id, id, err)
@@ -329,12 +336,12 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
 		return nil
 	}
 	r.containersByPid[pid] = c
-	r.containersByCgroupId[cg.Id] = c
+	r.containersByCgroupId[cgId] = c
 	r.containersById[id] = c
 	return c
 }
 
-func calcId(cg *cgroup.Cgroup, md *ContainerMetadata) (ContainerID, map[string]string) {
+func calcId(cg *cgroup.Cgroup, md *ContainerMetadata, pid uint32) (ContainerID, map[string]string) {
 	extensionTag := map[string]string{Namespace: "", Workload: "", PodName: "", ProcessName: ""}
 	if cg.ContainerType == cgroup.ContainerTypeSystemdService {
 		if strings.HasPrefix(cg.ContainerId, "/system.slice/crio-conmon-") {
@@ -342,6 +349,11 @@ func calcId(cg *cgroup.Cgroup, md *ContainerMetadata) (ContainerID, map[string]s
 		}
 		return ContainerID(cg.ContainerId), extensionTag
 	}
+	if cg.ContainerType == cgroup.ContainerTypeStandaloneProcess {
+		procName := proc.GetProcName(pid)
+		extensionTag[ProcessName] = procName
+		return ContainerID(fmt.Sprintf("/%s/%s/%d", "standalone", proc.GetProcName(pid), pid)), extensionTag
+	}
 	switch cg.ContainerType {
 	case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd, cgroup.ContainerTypeSandbox, cgroup.ContainerTypeCrio:
 	default:

+ 14 - 14
ebpftracer/Dockerfile

@@ -5,14 +5,14 @@ RUN apk add llvm clang libbpf-dev linux-headers
 COPY ebpf /tmp/ebpf
 WORKDIR /tmp/ebpf
 
-RUN clang -g -O2 -target bpf -D__KERNEL_FROM=416 -D__TARGET_ARCH_x86 -c ebpf.c -o ebpf416x86.o && llvm-strip --strip-debug ebpf416x86.o
-RUN clang -g -O2 -target bpf -D__KERNEL_FROM=420 -D__TARGET_ARCH_x86 -c ebpf.c -o ebpf420x86.o && llvm-strip --strip-debug ebpf420x86.o
-RUN clang -g -O2 -target bpf -D__KERNEL_FROM=506 -D__TARGET_ARCH_x86 -c ebpf.c -o ebpf506x86.o && llvm-strip --strip-debug ebpf506x86.o
+#RUN #clang -g -O2 -target bpf -D__KERNEL_FROM=416 -D__TARGET_ARCH_x86 -c ebpf.c -o ebpf416x86.o && llvm-strip --strip-debug ebpf416x86.o
+#RUN clang -g -O2 -target bpf -D__KERNEL_FROM=420 -D__TARGET_ARCH_x86 -c ebpf.c -o ebpf420x86.o && llvm-strip --strip-debug ebpf420x86.o
+#RUN clang -g -O2 -target bpf -D__KERNEL_FROM=506 -D__TARGET_ARCH_x86 -c ebpf.c -o ebpf506x86.o && llvm-strip --strip-debug ebpf506x86.o
 RUN clang -g -O2 -target bpf -D__KERNEL_FROM=512 -D__TARGET_ARCH_x86 -c ebpf.c -o ebpf512x86.o && llvm-strip --strip-debug ebpf512x86.o
-RUN clang -g -O2 -target bpf -D__KERNEL_FROM=416 -D__TARGET_ARCH_arm64 -c ebpf.c -o ebpf416arm64.o && llvm-strip --strip-debug ebpf416arm64.o
-RUN clang -g -O2 -target bpf -D__KERNEL_FROM=420 -D__TARGET_ARCH_arm64 -c ebpf.c -o ebpf420arm64.o && llvm-strip --strip-debug ebpf420arm64.o
-RUN clang -g -O2 -target bpf -D__KERNEL_FROM=506 -D__TARGET_ARCH_arm64 -c ebpf.c -o ebpf506arm64.o && llvm-strip --strip-debug ebpf506arm64.o
-RUN clang -g -O2 -target bpf -D__KERNEL_FROM=512 -D__TARGET_ARCH_arm64 -c ebpf.c -o ebpf512arm64.o && llvm-strip --strip-debug ebpf512arm64.o
+#RUN clang -g -O2 -target bpf -D__KERNEL_FROM=416 -D__TARGET_ARCH_arm64 -c ebpf.c -o ebpf416arm64.o && llvm-strip --strip-debug ebpf416arm64.o
+#RUN clang -g -O2 -target bpf -D__KERNEL_FROM=420 -D__TARGET_ARCH_arm64 -c ebpf.c -o ebpf420arm64.o && llvm-strip --strip-debug ebpf420arm64.o
+#RUN clang -g -O2 -target bpf -D__KERNEL_FROM=506 -D__TARGET_ARCH_arm64 -c ebpf.c -o ebpf506arm64.o && llvm-strip --strip-debug ebpf506arm64.o
+#RUN clang -g -O2 -target bpf -D__KERNEL_FROM=512 -D__TARGET_ARCH_arm64 -c ebpf.c -o ebpf512arm64.o && llvm-strip --strip-debug ebpf512arm64.o
 
 RUN echo -en '// generated - do not edit\npackage ebpftracer\n\nvar ebpfProg = map[string][]struct {\n' > ebpf.go \
 	&& echo -en '\tv string\n' >> ebpf.go \
@@ -20,14 +20,14 @@ RUN echo -en '// generated - do not edit\npackage ebpftracer\n\nvar ebpfProg = m
 	&& echo -en '}{\n' >> ebpf.go \
 	&& echo -en '\t"amd64": {\n' >> ebpf.go \
 	&& echo -en '\t\t{"v5.12", []byte("' >> ebpf.go && hexdump -v -e '"\x" 1/1 "%02x"' ebpf512x86.o >> ebpf.go && echo '")},' >> ebpf.go \
-	&& echo -en '\t\t{"v5.6", []byte("' >> ebpf.go && hexdump -v -e '"\x" 1/1 "%02x"' ebpf506x86.o >> ebpf.go && echo '")},' >> ebpf.go \
-	&& echo -en '\t\t{"v4.20", []byte("' >> ebpf.go && hexdump -v -e '"\x" 1/1 "%02x"' ebpf420x86.o >> ebpf.go && echo '")},' >> ebpf.go \
-	&& echo -en '\t\t{"v4.16", []byte("' >> ebpf.go && hexdump -v -e '"\x" 1/1 "%02x"' ebpf416x86.o >> ebpf.go && echo '")},' >> ebpf.go \
+	&& echo -en '\t\t{"v5.6", []byte("' >> ebpf.go && hexdump -v -e '"\x" 1/1 "%02x"' ebpf512x86.o >> ebpf.go && echo '")},' >> ebpf.go \
+	&& echo -en '\t\t{"v4.20", []byte("' >> ebpf.go && hexdump -v -e '"\x" 1/1 "%02x"' ebpf512x86.o >> ebpf.go && echo '")},' >> ebpf.go \
+	&& echo -en '\t\t{"v4.16", []byte("' >> ebpf.go && hexdump -v -e '"\x" 1/1 "%02x"' ebpf512x86.o >> ebpf.go && echo '")},' >> ebpf.go \
 	&& echo -en '\t},\n'>> ebpf.go \
 	&& echo -en '\t"arm64": {\n' >> ebpf.go \
-	&& echo -en '\t\t{"v5.12", []byte("' >> ebpf.go && hexdump -v -e '"\x" 1/1 "%02x"' ebpf512arm64.o >> ebpf.go && echo '")},' >> ebpf.go \
-	&& echo -en '\t\t{"v5.6", []byte("' >> ebpf.go && hexdump -v -e '"\x" 1/1 "%02x"' ebpf506arm64.o >> ebpf.go && echo '")},' >> ebpf.go \
-	&& echo -en '\t\t{"v4.20", []byte("' >> ebpf.go && hexdump -v -e '"\x" 1/1 "%02x"' ebpf420arm64.o >> ebpf.go && echo '")},' >> ebpf.go \
-	&& echo -en '\t\t{"v4.16", []byte("' >> ebpf.go && hexdump -v -e '"\x" 1/1 "%02x"' ebpf416arm64.o >> ebpf.go && echo '")},' >> ebpf.go \
+	&& echo -en '\t\t{"v5.12", []byte("' >> ebpf.go && hexdump -v -e '"\x" 1/1 "%02x"' ebpf512x86.o >> ebpf.go && echo '")},' >> ebpf.go \
+	&& echo -en '\t\t{"v5.6", []byte("' >> ebpf.go && hexdump -v -e '"\x" 1/1 "%02x"' ebpf512x86.o >> ebpf.go && echo '")},' >> ebpf.go \
+	&& echo -en '\t\t{"v4.20", []byte("' >> ebpf.go && hexdump -v -e '"\x" 1/1 "%02x"' ebpf512x86.o >> ebpf.go && echo '")},' >> ebpf.go \
+	&& echo -en '\t\t{"v4.16", []byte("' >> ebpf.go && hexdump -v -e '"\x" 1/1 "%02x"' ebpf512x86.o >> ebpf.go && echo '")},' >> ebpf.go \
 	&& echo -en '\t},\n'>> ebpf.go \
 	&& echo -en '}\n'>> ebpf.go

+ 7 - 0
ebpftracer/ebpf/ebpf.c

@@ -1,3 +1,10 @@
+//#include <linux/kconfig.h>
+//#include <linux/tcp.h>
+//#include <net/flow.h>
+//#include <net/inet_sock.h>
+//#include <net/sock.h>
+//#include <net/net_namespace.h>
+#include <stdbool.h>
 #include <uapi/linux/bpf.h>
 #include "vmlinux.h"
 #include <bpf/bpf_helpers.h>

+ 50 - 0
ebpftracer/ebpf/l7/apm_trace.c

@@ -0,0 +1,50 @@
+//
+// Created by Carl.Guo on 2024/4/1.
+//
+/***********************************************************
+ * Trace struct
+ ***********************************************************/
+struct trace_key_t {
+    __u32 tgid;
+    __u32 pid;
+};
+
+struct trace_info_t {
+    /*
+     * Whether traceID is zero ?
+     * For the client to actively send request, set traceID to zero.
+     */
+    //	bool is_trace_id_zero;
+    //	__u32 update_time; // 从系统开机开始到创建/更新时的间隔时间单位是秒
+    //	__u32 peer_fd;	   // 用于socket之间的关联
+    //	__u64 thread_trace_id; // 线程追踪ID
+    //	__u64 socket_id; // Records the socket associated when tracing was created (记录创建追踪时关联的socket)
+    __u64 trace_id;
+};
+
+struct {
+    __uint(type, BPF_MAP_TYPE_LRU_HASH);
+    __uint(key_size, sizeof(struct trace_key_t));
+    __uint(value_size, sizeof(struct trace_info_t));
+    __uint(max_entries, 32768);
+} trace_info_heap SEC(".maps");
+
+
+static inline __attribute__((__always_inline__))
+struct trace_key_t get_trace_key(__u32 pid,__u32 tid){
+    struct trace_key_t trace_key = {};
+    trace_key.tgid = pid;
+    trace_key.pid = tid;
+    return trace_key;
+}
+
+static inline __attribute__((__always_inline__))
+__u64 get_trace_id(__u32 pid, __u32 tid){
+    struct trace_key_t trace_key = get_trace_key(pid, tid);
+    struct trace_info_t *trace_info = bpf_map_lookup_elem(&trace_info_heap, &trace_key);
+    if (trace_info) {
+        bpf_printk("trace_id:%llu",trace_info->trace_id);
+        return trace_info->trace_id;
+    }
+    return 0;
+}

+ 128 - 8
ebpftracer/ebpf/l7/l7.c

@@ -12,6 +12,8 @@
 #define PROTOCOL_HTTP2	   11
 #define PROTOCOL_DUBBO2    12
 
+#define PROTOCOL_TRACE	   200
+
 #define STATUS_UNKNOWN  0
 #define STATUS_OK       200
 #define STATUS_FAILED   500
@@ -51,6 +53,9 @@
 #include "nats.c"
 #include "http2.c"
 #include "dubbo2.c"
+#include "apm_trace.c"
+
+__u32 filterPid = 69161;
 
 struct l7_event {
     __u64 fd;
@@ -63,6 +68,9 @@ struct l7_event {
     __u16 padding;
     __u32 statement_id;
     __u64 payload_size;
+    __u64 trace_id;
+    __u32 trace_start;
+    __u32 trace_end;
     char payload[MAX_PAYLOAD_SIZE];
 };
 
@@ -145,15 +153,15 @@ struct trace_event_raw_sys_exit_rw__stub {
     long int ret;
 };
 
-struct iovec {
+struct l7_iovec {
     char* buf;
     __u64 size;
 };
 
-struct user_msghdr {
+struct l7_user_msghdr {
 	void *msg_name;
 	int msg_namelen;
-	struct iovec *msg_iov;
+	struct l7_iovec *msg_iov;
 	__u64 msg_iovlen;
 };
 
@@ -177,8 +185,8 @@ void send_event(void *ctx, struct l7_event *e, __u32 pid, __u64 fd) {
 }
 
 static inline __attribute__((__always_inline__))
-__u64 read_iovec(char *iovec, __u64 iovlen, __u64 ret, char *buf) {
-    struct iovec iov = {};
+__u64 read_iovec(char *l7_iovec, __u64 iovlen, __u64 ret, char *buf) {
+    struct l7_iovec iov = {};
     __u64 max = (ret) ? MIN(ret, MAX_PAYLOAD_SIZE) : MAX_PAYLOAD_SIZE;
     __u64 offset = 0;
     __u64 size = 0;
@@ -187,7 +195,7 @@ __u64 read_iovec(char *iovec, __u64 iovlen, __u64 ret, char *buf) {
         if (i >= iovlen) {
             break;
         }
-        if (bpf_probe_read(&iov, sizeof(iov), (void *)(iovec+i*sizeof(iov)))) {
+        if (bpf_probe_read(&iov, sizeof(iov), (void *)(l7_iovec+i*sizeof(iov)))) {
             return 0;
         }
         if (iov.size <= 0) {
@@ -211,7 +219,14 @@ static inline __attribute__((__always_inline__))
 int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size, __u64 iovlen) {
     __u64 id = bpf_get_current_pid_tgid();
     __u32 zero = 0;
+    __u32 pid, tid;
+    __u32 http_status ;
 
+    pid = id >> 32;
+    tid =  (__u32)id;
+    if (pid != filterPid) {
+        return 0;
+    }
     char* payload = buf;
     if (iovlen) {
         payload = bpf_map_lookup_elem(&iovec_buf_heap, &zero);
@@ -239,7 +254,54 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
     k.is_tls = is_tls;
     k.stream_id = -1;
 
+    bpf_printk("enter-payload:%s|type:%s|FD:%d\n",payload,"type",k.fd);
+
+    if (is_http_response(payload, &http_status))
+    {
+        bpf_printk("[Response][HTTP]:TGID:%d|type:%s|FD:%d\n", k.pid, "type", k.fd);
+        struct trace_key_t trace_key = get_trace_key(pid, tid);
+        __u64 trace_id = get_trace_id(pid, tid);
+        bpf_printk("trace_id:%llu", trace_id);
+        // 清除trace信息
+        bpf_map_delete_elem(&trace_info_heap, &trace_key);
+
+        // 发送事件到用户空间 start
+        struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+        if (!e) {
+            return 0;
+        }
+        struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
+        if (!req)
+        {
+            bpf_printk("[Response][HTTP]:no req-----------");
+            bpf_printk("[Response][HTTP]:pid:%d|tid:%d",k.pid,k.fd);
+            bpf_printk("[Response][HTTP]:is_tls:%d|tid:%d",k.is_tls,k.stream_id);
+            return 0;
+        }
+        bpf_printk("[Response][HTTP]:req->ns:%d\n",req->ns);
+
+        e->duration = bpf_ktime_get_ns() - req->ns;
+        bpf_printk("[Response][HTTP]:duration->ns:%d\n",e->duration);
+        e->protocol = PROTOCOL_TRACE;
+        e->status = http_status;
+        e->pid = k.pid;
+        e->fd = k.fd;
+        // e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
+        e->trace_start = 0;
+        e->trace_end = 1;
+        e->trace_id = trace_id;
+        bpf_printk("[Response][HTTP]:status:%d",e->status);
+        e->payload_size = size;
+        COPY_PAYLOAD(e->payload, size, payload);
+        bpf_map_delete_elem(&active_l7_requests, &k);
+        bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+        // 发送事件到用户空间 end
+        bpf_printk("HTTP_END");
+        return 0;
+    }
+
     if (is_http_request(payload)) {
+        bpf_printk("[Enter][HTTP]:TGID:%d|FD:%d\n",k.pid,k.fd);
         req->protocol = PROTOCOL_HTTP;
     } else if (is_postgres_query(payload, size, &req->request_type)) {
         if (req->request_type == POSTGRES_FRAME_CLOSE) {
@@ -256,10 +318,12 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         }
         req->protocol = PROTOCOL_POSTGRES;
     } else if (is_redis_query(payload, size)) {
+        bpf_printk("[Enter][Redis]:TGID:%d|type:%s|FD:%d\n",k.pid,"type",k.fd);
         req->protocol = PROTOCOL_REDIS;
     } else if (is_memcached_query(payload, size)) {
         req->protocol = PROTOCOL_MEMCACHED;
     } else if (is_mysql_query(payload, size, &req->request_type)) {
+        bpf_printk("[Enter][Mysql]:TGID:%d|type:%s|FD:%d\n",k.pid,"type",k.fd);
         if (req->request_type == MYSQL_COM_STMT_CLOSE) {
             struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
             if (!e) {
@@ -335,12 +399,19 @@ int trace_enter_read(__u64 id, __u64 fd, char *buf, __u64 *ret, __u64 iovlen) {
     args.buf = buf;
     args.ret = ret;
     args.iovlen = iovlen;
+    __u32 pid = id >> 32;
     bpf_map_update_elem(&active_reads, &id, &args, BPF_ANY);
     return 0;
 }
 
 static inline __attribute__((__always_inline__))
 int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret) {
+    __u32 tid =  (__u32)id;
+
+    if (pid != filterPid) {
+        return 0;
+    }
+
     struct read_args *args = bpf_map_lookup_elem(&active_reads, &id);
     if (!args) {
         return 0;
@@ -383,11 +454,54 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
     if (!e) {
         return 0;
     }
+    e->fd = k.fd;
+    e->pid = k.pid;
     e->protocol = PROTOCOL_UNKNOWN;
     e->status = STATUS_UNKNOWN;
     e->method = METHOD_UNKNOWN;
     e->statement_id = 0;
     e->payload_size = 0;
+//    bpf_printk("exit-payload:%s|type:%s\n",payload,type);
+
+    // 被调用方http入口
+    if (is_http_request(payload)) {
+        struct l7_request *req = bpf_map_lookup_elem(&l7_request_heap, &zero);
+        if (!req)
+        {
+            return 0;
+        } else
+        {
+            req->protocol = PROTOCOL_HTTP;
+            req->payload_size = ret;
+            req->ns = bpf_ktime_get_ns();
+            COPY_PAYLOAD(req->payload, ret, payload);
+            // bpf_printk("[Receive][HTTP]:pid:%d|tid:%d",k.pid,k.fd);
+            // bpf_printk("[Receive][HTTP]:is_tls:%d|tid:%d",k.is_tls,k.stream_id);
+            bpf_map_update_elem(&active_l7_requests, &k, req, BPF_NOEXIST);
+        }
+
+        bpf_printk("[Receive][HTTP]:TGID:%d|type:%s|FD:%d\n",k.pid,"type",k.fd);
+        bpf_printk("[Receive][HTTP] payload1:%s|type:%s\n",payload,"type");
+
+        struct trace_key_t trace_key = {};
+        struct trace_info_t trace_info = {};
+        trace_key.tgid = pid;
+        trace_key.pid = tid;
+        __u64 uid_base = bpf_ktime_get_ns();
+        trace_info.trace_id = bpf_get_current_pid_tgid() + uid_base;
+        e->trace_start = 1;
+        e->trace_end = 0;
+        e->protocol = PROTOCOL_TRACE;
+        e->trace_id = trace_info.trace_id;
+        e->payload_size = ret;
+        COPY_PAYLOAD(e->payload, ret, payload);
+
+        // 入口方法缓存  bpf_map_update_elem(map, key, value, options)
+        bpf_map_update_elem(&trace_info_heap, &trace_key, &trace_info, BPF_NOEXIST);
+        bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+        bpf_printk("[Receive][HTTP] to user space");
+        return 0;
+    }
 
     if (is_rabbitmq_consume(payload, ret)) {
         e->protocol = PROTOCOL_RABBITMQ;
@@ -430,6 +544,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
 
     bpf_map_delete_elem(&active_l7_requests, &k);
     if (e->protocol == PROTOCOL_HTTP) {
+        bpf_printk("[Response][HTTP]:TGID:%d|type:%s|FD:%d\n",k.pid,"type",k.fd);
         response = is_http_response(payload, &e->status);
     } else if (e->protocol == PROTOCOL_POSTGRES) {
         response = is_postgres_response(payload, ret, &e->status);
@@ -437,10 +552,15 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
             e->method = METHOD_STATEMENT_PREPARE;
         }
     } else if (e->protocol == PROTOCOL_REDIS) {
+        bpf_printk("[Response][Redis]:TGID:%d|type:%s|FD:%d\n",k.pid,"",k.fd);
         response = is_redis_response(payload, ret, &e->status);
     } else if (e->protocol == PROTOCOL_MEMCACHED) {
         response = is_memcached_response(payload, ret, &e->status);
     } else if (e->protocol == PROTOCOL_MYSQL) {
+        bpf_printk("[Response][Mysql]:TGID:%d|type:%s|FD:%d\n",k.pid,"type",k.fd);
+        __u64 trace_id = get_trace_id(pid, tid);
+        bpf_printk("[Mysql] trace_id:%llu", trace_id);
+        e->trace_id = trace_id;
         response = is_mysql_response(payload, ret, req->request_type, &e->statement_id, &e->status);
         if (req->request_type == MYSQL_COM_STMT_PREPARE) {
             e->method = METHOD_STATEMENT_PREPARE;
@@ -486,7 +606,7 @@ int sys_enter_writev(struct trace_event_raw_sys_enter_rw__stub* ctx) {
 
 SEC("tracepoint/syscalls/sys_enter_sendmsg")
 int sys_enter_sendmsg(struct trace_event_raw_sys_enter_rw__stub* ctx) {
-    struct user_msghdr msghdr = {};
+    struct l7_user_msghdr msghdr = {};
     if (bpf_probe_read(&msghdr, sizeof(msghdr), (void *)ctx->buf)) {
         return 0;
     }
@@ -513,7 +633,7 @@ int sys_enter_readv(struct trace_event_raw_sys_enter_rw__stub* ctx) {
 SEC("tracepoint/syscalls/sys_enter_recvmsg")
 int sys_enter_recvmsg(struct trace_event_raw_sys_enter_rw__stub* ctx) {
     __u64 id = bpf_get_current_pid_tgid();
-    struct user_msghdr msghdr = {};
+    struct l7_user_msghdr msghdr = {};
     if (bpf_probe_read(&msghdr, sizeof(msghdr), (void *)ctx->buf)) {
         return 0;
     }

+ 32 - 0
ebpftracer/l7/http.go

@@ -2,6 +2,8 @@ package l7
 
 import (
 	"bytes"
+	"strconv"
+	"strings"
 )
 
 func ParseHttp(payload []byte) (string, string) {
@@ -18,3 +20,33 @@ func ParseHttp(payload []byte) (string, string) {
 	}
 	return string(method), string(uri)
 }
+
+func ParseHttpHost(payload []byte) (string, string, string, uint16) {
+	method, rest, ok := bytes.Cut(payload, space)
+	if !ok {
+		return "", "", "", 0
+	}
+	if !isHttpMethod(string(method)) {
+		return "", "", "", 0
+	}
+
+	uri, rest, ok := bytes.Cut(rest, space)
+	if !ok {
+		uri = append(uri, []byte("...")...)
+	}
+
+	hostStart := bytes.Index(rest, []byte("Host:")) + len("Host:")
+	hostEnd := bytes.Index(rest[hostStart:], []byte("\r\n"))
+	hostPort := string(bytes.TrimSpace(rest[hostStart : hostStart+hostEnd]))
+	hostParts := strings.Split(hostPort, ":")
+	host := hostParts[0]
+	port := uint16(80) // Default port
+	if len(hostParts) > 1 {
+		port64, err := strconv.ParseUint(hostParts[1], 10, 16)
+		if err == nil {
+			port = uint16(port64)
+		}
+	}
+
+	return string(method), string(uri), host, port
+}

+ 7 - 0
ebpftracer/l7/l7.go

@@ -20,6 +20,8 @@ const (
 	ProtocolNats      Protocol = 10
 	ProtocolHTTP2     Protocol = 11
 	ProtocolDubbo2    Protocol = 12
+
+	ProtocolTrace     Protocol = 200
 )
 
 func (p Protocol) String() string {
@@ -48,6 +50,8 @@ func (p Protocol) String() string {
 		return "HTTP2"
 	case ProtocolDubbo2:
 		return "Dubbo2"
+	case ProtocolTrace:
+		return "TRACE"
 	}
 	return "UNKNOWN:" + strconv.Itoa(int(p))
 }
@@ -119,4 +123,7 @@ type RequestData struct {
 	Method      Method
 	StatementId uint32
 	Payload     []byte
+	TraceId     uint64
+	TraceStart  uint32
+	TraceEnd    uint32
 }

+ 8 - 0
ebpftracer/tracer.go

@@ -211,11 +211,13 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 			return fmt.Errorf("failed to create ebpf reader: %w", err)
 		}
 		t.readers[pm.name] = r
+		// event监听
 		go runEventsReader(pm.name, r, ch, pm.typ)
 	}
 
 	for _, programSpec := range collectionSpec.Programs {
 		program := t.collection.Programs[programSpec.Name]
+		fmt.Println(programSpec.Name, programSpec.SectionName, programSpec.Type)
 		if t.disableL7Tracing {
 			switch programSpec.Name {
 			case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg":
@@ -318,6 +320,9 @@ type l7Event struct {
 	Padding             uint16
 	StatementId         uint32
 	PayloadSize         uint64
+	TraceId             uint64
+	TraceStart          uint32
+	TraceEnd            uint32
 }
 
 func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
@@ -350,6 +355,9 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				Duration:    time.Duration(v.Duration),
 				Method:      l7.Method(v.Method),
 				StatementId: v.StatementId,
+				TraceId:     v.TraceId,
+				TraceStart:  v.TraceStart,
+				TraceEnd:    v.TraceEnd,
 			}
 			switch {
 			case v.PayloadSize == 0:

+ 147 - 117
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -3,9 +3,7 @@ package otlptrace
 import (
 	"encoding/json"
 	"fmt"
-	"go.opentelemetry.io/otel/attribute"
 	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform"
-	"go.opentelemetry.io/otel/sdk/instrumentation"
 	tracesdk "go.opentelemetry.io/otel/sdk/trace"
 	tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
 	"strings"
@@ -20,15 +18,16 @@ type RootData struct {
 	AppName        string        `json:"app_name"`
 	CalledId       int           `json:"called_id"`
 	ClientIp       string        `json:"client_ip"`
-	CollTime       int64         `json:"coll_time"`
+	CollTime       uint64        `json:"coll_time"`
 	Cpu            int           `json:"cpu"`
 	Custom         string        `json:"custom"`
 	HostId         int64         `json:"host_id"`
 	HostName       string        `json:"host_name"`
-	HttpCode       int           `json:"http_code"`
+	HttpCode       int64         `json:"http_code"`
 	HttpMethod     string        `json:"http_method"`
 	InstanceId     int64         `json:"instance_id"`
 	InstanceIdFrom int           `json:"instance_id_from"`
+	LocalPort      int64         `json:"local_port"`
 	Maps           []MapInfo     `json:"maps"`
 	MemU           int           `json:"mem_u"`
 	MemUP          int           `json:"mem_u_p"`
@@ -36,14 +35,14 @@ type RootData struct {
 	Parameters     []interface{} `json:"parameters"`
 	ParentTaskName int           `json:"parent_task_name"`
 	Period         int           `json:"period"`
-	RespTime       int           `json:"resp_time"`
+	RespTime       uint64        `json:"resp_time"`
 	Sampling       int           `json:"sampling"`
 	ServiceName    string        `json:"service_name"`
 	ServiceType    string        `json:"service_type"`
 	Sip            string        `json:"sip"`
 	Sn             string        `json:"sn"`
 	SpanIdFrom     string        `json:"span_id_from"`
-	Sport          int           `json:"sport"`
+	Sport          int64         `json:"sport"`
 	TId            int           `json:"t_id"`
 	TName          string        `json:"t_name"`
 	TraceId        string        `json:"trace_id"`
@@ -75,146 +74,177 @@ type MapInfo struct {
 	WallTime       uint64   `json:"wall_time"`
 }
 
-func tracetransformData(sdl []tracesdk.ReadOnlySpan) []*tracepb.ResourceSpans {
+type TraceMapT struct {
+	RootData RootData `json:"root_data"`
+	Index    int      `json:"index"`
+}
+
+func tracetransformData(sdl []tracesdk.ReadOnlySpan) []RootData {
 	if len(sdl) == 0 {
 		return nil
 	}
-	//rootData := make(map[trace.TraceID]RootData)
 
-	rsm := make(map[attribute.Distinct]*tracepb.ResourceSpans)
-
-	type key struct {
-		r  attribute.Distinct
-		is instrumentation.Scope
-	}
-	ssm := make(map[key]*tracepb.ScopeSpans)
+	//traceMap := make(map[string][]MapInfo)
+	//traceIndexMap := make(map[string]int)
 
-	var resources int
-	var nid = 1
+	traceRoot := make(map[string]*TraceMapT)
+	sendData := []RootData{}
 	for _, sd := range sdl {
 		if sd == nil {
 			continue
 		}
-		//fmt.Println(span(sd))
-
-		//span(sd).Attributes
-		//for k, attr := range span(sd).Attributes {
-		//	fmt.Println(k, attr)
-		//}
-		//fmt.Println(span(sd).Attributes)
-		//level:=1
-		nid++
-		m := MapInfo{
-			Exception:      0,
-			ExceptionMsg:   "",
-			ExceptionStack: "",
-			Ip:             "",
-			Level:          0,
-			MethodDesc:     "unknown",
-			MethodName:     "unknown",
-			Nid:            nid,
-			OperType:       "",
-			Pid:            1,
-			Port:           0,
-			Ps:             []string{},
-			PureTime:       (span(sd).EndTimeUnixNano - span(sd).StartTimeUnixNano) / 1e3,
-			ServiceName:    "",
-			ServiceType:    "",
-			StartTime:      span(sd).StartTimeUnixNano / 1e6,
-			WallTime:       0,
-		}
-		m.WallTime = m.PureTime
-		mapType := span(sd).Name
-		switch mapType {
-		case "MYSQL":
-			m.Dbn = "unknown"
-			m.ServiceName = mapType
-			m.ServiceType = "SQL"
-			m.Level = 2
-		case "MAIN":
-			m.ServiceName = "GO"
-			m.ServiceType = "APPLICATION"
-			m.MethodName = "main"
-			m.MethodDesc = "main"
-			m.Level = 1
-			m.Nid = 1
-			m.Pid = 0
-			nid = 1
+		traceId := sd.SpanContext().TraceID().String()
+		if _, ok := traceRoot[traceId]; !ok {
+			traceRoot[traceId] = &TraceMapT{RootData: initRootData(traceId), Index: 1}
 		}
+
+		traceRoot[traceId].Index++
+		initMapInfo(sd, traceRoot[traceId])
+	}
+	for _, v := range traceRoot {
+		sendData = append(sendData, v.RootData)
+	}
+
+	// Transform the categorized map into a slice
+	aa, _ := json.Marshal(sendData)
+	fmt.Println(string(aa))
+	fmt.Println(len(sendData))
+	return sendData
+}
+
+func initRootData(traceId string) RootData {
+	data := RootData{
+		AccountId:      110,
+		AgentId:        1011005252979954,
+		AgentVersion:   "2.1.0",
+		AppId:          5410049101545798,
+		AppIdFrom:      -1,
+		AppName:        "eBPF-agent",
+		CalledId:       -1,
+		ClientIp:       "",
+		CollTime:       0,
+		Cpu:            0,
+		Custom:         "",
+		HostId:         10154813500555812,
+		HostName:       "localhost",
+		HttpCode:       0,
+		HttpMethod:     "GET",
+		InstanceId:     1005051101515357,
+		InstanceIdFrom: -1,
+		Maps:           []MapInfo{},
+		MemU:           0,
+		MemUP:          0,
+		OperType:       "",
+		Parameters:     []interface{}{},
+		ParentTaskName: 0,
+		Period:         -1,
+		RespTime:       0,
+		Sampling:       0,
+		ServiceName:    "GO",
+		ServiceType:    "APPLICATION",
+		Sip:            "",
+		Sn:             "",
+		SpanIdFrom:     "",
+		Sport:          0,
+		TId:            -1,
+		TName:          "",
+		TraceId:        traceId,
+		TransIds:       []interface{}{},
+		TypeFrom:       "",
+		Uri:            "",
+		UserDir:        0,
+		VipIds:         []interface{}{},
+	}
+	return data
+}
+
+func initMapInfo(sd tracesdk.ReadOnlySpan, traceRoot *TraceMapT) MapInfo {
+	mNode := MapInfo{
+		Exception:      0,
+		ExceptionMsg:   "",
+		ExceptionStack: "",
+		Ip:             "",
+		Level:          2,
+		MethodDesc:     "unknown",
+		MethodName:     "unknown",
+		Nid:            traceRoot.Index,
+		OperType:       "",
+		Pid:            1,
+		Port:           0,
+		Ps:             []string{},
+		PureTime:       (span(sd).EndTimeUnixNano - span(sd).StartTimeUnixNano) / 1e3,
+		ServiceName:    "",
+		ServiceType:    "",
+		StartTime:      span(sd).StartTimeUnixNano / 1e6,
+		WallTime:       0,
+	}
+	mNode.WallTime = mNode.PureTime
+	mapType := span(sd).Name
+	switch mapType {
+	case "MYSQL":
+		mNode.Dbn = "unknown"
+		mNode.ServiceName = mapType
+		mNode.ServiceType = "SQL"
+		mNode.MethodName = "(mysql)"
+		mNode.MethodDesc = "(mysql)"
 		for _, attr := range sd.Attributes() {
+			fmt.Println(attr.Key, ":", attr.Value.AsInterface())
 			switch attr.Key {
 			case "net.peer.name":
-				m.Ip = attr.Value.AsString()
+				mNode.Ip = attr.Value.AsString()
 			case "net.peer.port":
-				m.Port = attr.Value.AsInt64()
+				mNode.Port = attr.Value.AsInt64()
 			case "db.statement":
 				query := attr.Value.AsString()
-				m.Ps = []string{query}
+				mNode.Ps = []string{query}
 				upperOperType := strings.ToUpper(query[:6])
 				if upperOperType == "SELECT" ||
 					upperOperType == "UPDATE" ||
 					upperOperType == "INSERT" ||
 					upperOperType == "DELETE" {
-					m.OperType = upperOperType
+					mNode.OperType = upperOperType
 				}
 			}
-			//fmt.Println("attr.GetKey()", attr.GetKey())
-			//fmt.Println("attr.GetValue()", attr.GetValue())
-
-			//switch k {
-			//case :
-			//
-			//}
-		}
-		mm, _ := json.Marshal(m)
-		fmt.Println("mmmmmmmmm:", string(mm))
-		rKey := sd.Resource().Equivalent()
-		k := key{
-			r:  rKey,
-			is: sd.InstrumentationScope(),
-		}
-
-		scopeSpan, iOk := ssm[k]
-		if !iOk {
-			// Either the resource or instrumentation scope were unknown.
-			scopeSpan = &tracepb.ScopeSpans{
-				Scope:     tracetransform.InstrumentationScope(sd.InstrumentationScope()),
-				Spans:     []*tracepb.Span{},
-				SchemaUrl: sd.InstrumentationScope().SchemaURL,
-			}
 		}
-		//scopeSpan.Spans = append(scopeSpan.Spans, span(sd))
-		ssm[k] = scopeSpan
-
-		rs, rOk := rsm[rKey]
-		if !rOk {
-			resources++
-			// The resource was unknown.
-			rs = &tracepb.ResourceSpans{
-				Resource:   tracetransform.Resource(sd.Resource()),
-				ScopeSpans: []*tracepb.ScopeSpans{scopeSpan},
-				SchemaUrl:  sd.Resource().SchemaURL(),
+		traceRoot.RootData.Maps = append(traceRoot.RootData.Maps, mNode)
+	case "APPLICATION":
+		mNode.ServiceName = "GO"
+		mNode.ServiceType = "APPLICATION"
+		mNode.MethodName = "main"
+		mNode.MethodDesc = "main"
+		mNode.Level = 1
+		mNode.Nid = 1
+		mNode.Pid = 0
+		// 构建root节点
+		traceRoot.RootData.RespTime = mNode.PureTime
+		traceRoot.RootData.CollTime = mNode.StartTime
+		for _, attr := range sd.Attributes() {
+			switch attr.Key {
+			case "http.uri":
+				traceRoot.RootData.Uri = attr.Value.AsString()
+			case "http.method":
+				traceRoot.RootData.HttpMethod = attr.Value.AsString()
+			case "http.status_code":
+				traceRoot.RootData.HttpCode = attr.Value.AsInt64()
+			case "net.peer.name":
+				traceRoot.RootData.ClientIp = attr.Value.AsString()
+				traceRoot.RootData.Sip = attr.Value.AsString()
+				traceRoot.RootData.Sn = attr.Value.AsString()
+			case "net.peer.port":
+				traceRoot.RootData.Sport = attr.Value.AsInt64()
+				traceRoot.RootData.LocalPort = attr.Value.AsInt64()
 			}
-			rsm[rKey] = rs
-			continue
-		}
-
-		// The resource has been seen before. Check if the instrumentation
-		// library lookup was unknown because if so we need to add it to the
-		// ResourceSpans. Otherwise, the instrumentation library has already
-		// been seen and the append we did above will be included it in the
-		// ScopeSpans reference.
-		if !iOk {
-			rs.ScopeSpans = append(rs.ScopeSpans, scopeSpan)
 		}
+		traceRoot.RootData.Maps = append([]MapInfo{mNode}, traceRoot.RootData.Maps...)
 	}
+	return mNode
+}
 
-	// Transform the categorized map into a slice
-	rss := make([]*tracepb.ResourceSpans, 0, resources)
-	for _, rs := range rsm {
-		rss = append(rss, rs)
+func isEnter(_type string) bool {
+	if _type == "APPLICATION" {
+		return true
 	}
-	return rss
+	return false
 }
 
 func span(sd tracesdk.ReadOnlySpan) *tracepb.Span {

+ 1 - 0
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/clients.go

@@ -51,4 +51,5 @@ type Client interface {
 	UploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error
 	// DO NOT CHANGE: any modification will not be backwards compatible and
 	// must never be done outside of a new major release.
+	UploadApmTraces(ctx context.Context, protoSpans []RootData) error
 }

+ 8 - 2
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/exporter.go

@@ -45,8 +45,14 @@ func (e *Exporter) ExportSpans(ctx context.Context, ss []tracesdk.ReadOnlySpan)
 	if len(protoSpans) == 0 {
 		return nil
 	}
-
-	err := e.client.UploadTraces(ctx, protoSpans)
+	//for _, s := range ss {
+	//	fmt.Println(s.ChildSpanCount())
+	//}
+	//a, _ := json.Marshal(protoSpans)
+	//fmt.Println(string(a))
+	sendData := tracetransformData(ss)
+	//tracetransformData(ss)
+	err := e.client.UploadApmTraces(ctx, sendData)
 	if err != nil {
 		return fmt.Errorf("traces export: %w", err)
 	}

+ 140 - 0
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/client_apm.go

@@ -0,0 +1,140 @@
+package otlptracehttp // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
+import (
+	"bytes"
+	"compress/gzip"
+	"context"
+	"encoding/json"
+	"fmt"
+	"go.opentelemetry.io/otel"
+	"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
+	"io"
+	"net/http"
+	"net/url"
+	"strconv"
+)
+
+func (d *client) UploadApmTraces(ctx context.Context, rootData []otlptrace.RootData) error {
+	//pbRequest := &coltracepb.ExportTraceServiceRequest{
+	//	ResourceSpans: protoSpans,
+	//}
+	rawRequest, err := json.Marshal(rootData)
+	if err != nil {
+		return err
+	}
+
+	ctx, cancel := d.contextWithStop(ctx)
+	defer cancel()
+	mapLen := len(rootData)
+	request, err := d.newApmRequest(rawRequest, mapLen)
+	if err != nil {
+		return err
+	}
+
+	return d.requestFunc(ctx, func(ctx context.Context) error {
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		default:
+		}
+
+		request.reset(ctx)
+		resp, err := d.client.Do(request.Request)
+		if err != nil {
+			return err
+		}
+
+		if resp != nil && resp.Body != nil {
+			defer func() {
+				if err := resp.Body.Close(); err != nil {
+					otel.Handle(err)
+				}
+			}()
+		}
+
+		switch sc := resp.StatusCode; {
+		case sc >= 200 && sc <= 299:
+			// Success, do not retry.
+			// Read the partial success message, if any.
+			var respData bytes.Buffer
+			if _, err := io.Copy(&respData, resp.Body); err != nil {
+				return err
+			}
+
+			if respData.Len() != 0 {
+				var respJsonData RespDataT
+				if err := json.Unmarshal(respData.Bytes(), &respJsonData); err != nil {
+					return err
+				}
+				msg := respJsonData.Msg
+				code := respJsonData.Code
+				if msg != "send ok" || code != 1000 {
+					return fmt.Errorf("resp error msg:<%s> code:<%d>", msg, code)
+				}
+			}
+			return nil
+
+		case sc == http.StatusTooManyRequests, sc == http.StatusServiceUnavailable:
+			// Retry-able failures.  Drain the body to reuse the connection.
+			if _, err := io.Copy(io.Discard, resp.Body); err != nil {
+				otel.Handle(err)
+			}
+			return newResponseError(resp.Header)
+		default:
+			return fmt.Errorf("failed to send to %s: %s", request.URL, resp.Status)
+		}
+	})
+}
+
+type RespDataT struct {
+	Code int    `json:"code"`
+	Msg  string `json:"message"`
+}
+
+func (d *client) newApmRequest(body []byte, mapLen int) (request, error) {
+	u := url.URL{Scheme: d.getScheme(), Host: d.cfg.Endpoint, Path: d.cfg.URLPath}
+	r, err := http.NewRequest(http.MethodPost, u.String(), nil)
+	if err != nil {
+		return request{Request: r}, err
+	}
+
+	userAgent := "OTel OTLP Exporter Go/" + otlptrace.Version()
+	r.Header.Set("User-Agent", userAgent)
+
+	for k, v := range d.cfg.Headers {
+		r.Header.Set(k, v)
+	}
+
+	//r.Header.Set("Content-Type", contentTypeProto)
+	r.Header.Set("Content-Type", "text/plain;charset=utf-8")
+	r.Header.Set("routingKey", "goTopic")
+	r.Header.Set("DataCount", strconv.Itoa(mapLen))
+
+	req := request{Request: r}
+	switch Compression(d.cfg.Compression) {
+	case NoCompression:
+		r.ContentLength = (int64)(len(body))
+		req.bodyReader = bodyReader(body)
+	case GzipCompression:
+		// Ensure the content length is not used.
+		r.ContentLength = -1
+		r.Header.Set("Content-Encoding", "gzip")
+
+		gz := gzPool.Get().(*gzip.Writer)
+		defer gzPool.Put(gz)
+
+		var b bytes.Buffer
+		gz.Reset(&b)
+
+		if _, err := gz.Write(body); err != nil {
+			return req, err
+		}
+		// Close needs to be called to ensure body if fully written.
+		if err := gz.Close(); err != nil {
+			return req, err
+		}
+
+		req.bodyReader = bodyReader(b.Bytes())
+	}
+
+	return req, nil
+}

+ 8 - 0
proc/proc.go

@@ -74,3 +74,11 @@ func ListPids() ([]uint32, error) {
 	}
 	return res, nil
 }
+
+func GetProcName(pid uint32) string {
+	comm, err := os.ReadFile(Path(pid, "comm"))
+	if err != nil {
+		return ""
+	}
+	return string(bytes.TrimRight(comm, "\n"))
+}

+ 78 - 1
tracing/tracing.go

@@ -3,7 +3,6 @@ package tracing
 import (
 	"context"
 	"fmt"
-	"time"
 
 	"github.com/coroot/coroot-node-agent/common"
 	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
@@ -18,6 +17,8 @@ import (
 	"go.opentelemetry.io/otel/trace"
 	"inet.af/netaddr"
 	"k8s.io/klog/v2"
+	"sync"
+	"time"
 )
 
 const (
@@ -74,6 +75,9 @@ type Trace struct {
 	containerId string
 	destination netaddr.IPPort
 	commonAttrs []attribute.KeyValue
+	ctx         context.Context
+	span        trace.Span
+	lock        sync.RWMutex
 }
 
 func NewTrace(containerId string, destination netaddr.IPPort) *Trace {
@@ -189,3 +193,76 @@ func (t *Trace) RedisQuery(cmd, args string, error bool, duration time.Duration)
 		semconv.DBStatement(statement),
 	)
 }
+
+/**
+ * Trace
+ */
+
+func (t *Trace) setContext(ctx context.Context) {
+	t.lock.Lock()
+	defer t.lock.Unlock()
+	t.ctx = ctx
+}
+
+func (t *Trace) setSpan(span trace.Span) {
+	t.lock.Lock()
+	defer t.lock.Unlock()
+	t.span = span
+}
+
+func (t *Trace) TraceStart(method, path string, status l7.Status, duration time.Duration) {
+	if t == nil || method == "" {
+		return
+	}
+	t.createParentSpan("APPLICATION", duration, status >= 400,
+		semconv.HTTPURL(fmt.Sprintf("http://%s%s", t.destination.String(), path)),
+		semconv.HTTPMethod(method),
+		//semconv.HTTPStatusCode(int(status)),
+		attribute.String("http.uri", path),
+	)
+}
+
+func (t *Trace) TraceEnd(r *l7.RequestData) {
+	if t == nil {
+		return
+	}
+	t.span.SetAttributes(semconv.HTTPStatusCode(int(r.Status)))
+	t.span.End(trace.WithTimestamp(time.Now()))
+}
+
+func (t *Trace) createParentSpan(name string, duration time.Duration, error bool, attrs ...attribute.KeyValue) {
+	end := time.Now()
+	start := end.Add(-duration)
+	ctx, span := tracer(t.containerId).Start(context.Background(), name, trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
+	span.SetAttributes(attrs...)
+	span.SetAttributes(t.commonAttrs...)
+	if error {
+		span.SetStatus(codes.Error, "")
+	}
+	t.setContext(ctx)
+	t.setSpan(span)
+}
+
+func (t *Trace) MysqlTraceQuery(query string, error bool, duration time.Duration, destination netaddr.IPPort) {
+	if t == nil || query == "" {
+		return
+	}
+	t.createTraceSpan("MYSQL", duration, error,
+		semconv.DBSystemMySQL,
+		semconv.DBStatement(query),
+		semconv.NetPeerName(destination.IP().String()),
+		semconv.NetPeerPort(int(destination.Port())),
+	)
+}
+func (t *Trace) createTraceSpan(name string, duration time.Duration, error bool, attrs ...attribute.KeyValue) {
+	end := time.Now()
+	start := end.Add(-duration)
+	fmt.Println("createTraceSpan:", t.ctx)
+	_, span := tracer(t.containerId).Start(t.ctx, name, trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
+	span.SetAttributes(t.commonAttrs...)
+	span.SetAttributes(attrs...)
+	if error {
+		span.SetStatus(codes.Error, "")
+	}
+	span.End(trace.WithTimestamp(end))
+}