Kaynağa Gözat

Merge branch 'dev-stack' into dev

Carl 1 yıl önce
ebeveyn
işleme
da15949267

+ 17 - 2
build.sh

@@ -1,6 +1,21 @@
 #!/bin/sh
 #!/bin/sh
 rm ./euspace
 rm ./euspace
 make -f Makefile2 all debug=1 pid=1121
 make -f Makefile2 all debug=1 pid=1121
-pid=`ps aux | grep ebpfdemo81 | grep -v grep | awk '{print $2}'`
+
+# pid=`ps aux | grep ebpfdemo81 | grep -v grep | awk '{print $2}'`
+# echo $pid
+# TRACES_ENDPOINT=http://10.2.31.156:8099/docp/api/v2/data/receive BIN_TYPE=go SEND=1 FILTER_PID=$pid WHITE_LIST=".*HandleFunc|.*main.*|.*serverHandler.*|.*ServeHTTP.*" ./euspace --listen="0.0.0.0:8123"
+
+
+# pid=`ps aux | grep ./helloworld | grep -v grep | awk '{print $2}'`
+# echo $pid
+# TRACES_ENDPOINT=http://10.2.31.156:8099/docp/api/v2/data/receive BIN_TYPE=java DBG_PATH="/data/roger/graalvm/simplehttpserver.debug" SEND=1 FILTER_PID=$pid WHITE_LIST="main.*|addwj.*" ./euspace  --listen="0.0.0.0:8123"
+
+
+pid=`ps aux | grep ./simplehttpserver | grep -v grep | awk '{print $2}'`
 echo $pid
 echo $pid
-TRACES_ENDPOINT=http://10.2.31.73:8099/docp/api/v2/data/receive SEND=1 FILTER_PID=$pid WHITE_LIST=".*HandleFunc|.*main.*|.*serverHandler.*|.*ServeHTTP.*" ./euspace  --listen="0.0.0.0:8123"
+TRACES_ENDPOINT=http://10.2.31.156:8099/docp/api/v2/data/receive BIN_TYPE=java DBG_PATH="/data/roger/graalvm/simplehttpserver.debug" SEND=1 FILTER_PID=$pid WHITE_LIST="handle*|addw.*" ./euspace  --listen="0.0.0.0:8123"
+
+# pid=`ps aux | grep CoreAoT | grep -v grep | awk '{print $2}'`
+# echo $pid
+# TRACES_ENDPOINT=http://10.2.31.156:8099/docp/api/v2/data/receive BIN_TYPE=dotnet DBG_PATH="/data/roger/NET8/CoreAoT/bin/Release/net8.0/linux-x64/publish/CoreAoT.dbg" SEND=1 FILTER_PID=$pid WHITE_LIST="main.*|Addwj.*" ./euspace  --listen="0.0.0.0:8123"

+ 1 - 1
containers/container.go

@@ -1139,7 +1139,7 @@ func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) {
 	if !p.jvmUprobesChecked {
 	if !p.jvmUprobesChecked {
 		tracer.InitKProcInfo(pid, c.instanceID)
 		tracer.InitKProcInfo(pid, c.instanceID)
 		p.uprobes = append(p.uprobes, tracer.AttachJavaNioReadUprobes(pid, c.instanceID)...)
 		p.uprobes = append(p.uprobes, tracer.AttachJavaNioReadUprobes(pid, c.instanceID)...)
-		p.uprobes = append(p.uprobes, tracer.AttachJavaNetReadUprobes(pid, c.instanceID)...)
+		p.uprobes = append(p.uprobes, tracer.AttachJavaNetWriteUprobes(pid, c.instanceID)...)
 		p.jvmUprobesChecked = true
 		p.jvmUprobesChecked = true
 	}
 	}
 }
 }

+ 135 - 64
containers/container_apm.go

@@ -4,11 +4,6 @@ import (
 	"bufio"
 	"bufio"
 	"debug/elf"
 	"debug/elf"
 	"fmt"
 	"fmt"
-	"os"
-	"sort"
-	"strconv"
-	"strings"
-
 	"github.com/coroot/coroot-node-agent/ebpftracer"
 	"github.com/coroot/coroot-node-agent/ebpftracer"
 	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
 	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
 	"github.com/coroot/coroot-node-agent/ebpftracer/tracer"
 	"github.com/coroot/coroot-node-agent/ebpftracer/tracer"
@@ -16,6 +11,11 @@ import (
 	"github.com/coroot/coroot-node-agent/utils"
 	"github.com/coroot/coroot-node-agent/utils"
 	"github.com/pkg/errors"
 	"github.com/pkg/errors"
 	"inet.af/netaddr"
 	"inet.af/netaddr"
+	"os"
+	"sort"
+	"strconv"
+	"strings"
+	"time"
 )
 )
 
 
 type CodeType int16
 type CodeType int16
@@ -56,6 +56,24 @@ func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
 	return trace, ok
 	return trace, ok
 }
 }
 
 
+func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
+	c.traceMap[traceId] = trace
+}
+
+// 查询或创建trace信息
+func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
+	trace, ok := c.getTrace(traceId)
+	if !ok {
+		//new trace
+		trace = tracing.NewTraceFromEvent(string(c.id))
+		//create TraceMap
+		c.createTraceMap(traceId, trace)
+		//create ParentSpan
+		trace.CreateRootSpan(traceId)
+	}
+	return trace, nil
+}
+
 func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
 func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
 	method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
 	method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
 	ip, err := netaddr.ParseIP(hostIp)
 	ip, err := netaddr.ParseIP(hostIp)
@@ -73,6 +91,23 @@ func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
 	return nil
 	return nil
 }
 }
 
 
+// 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
+// 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
+
+func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
+	if t.AllEventReady(traceID) {
+		t.SendEvent()
+		fmt.Printf("----send:%d \n", traceID)
+		//fmt.Println(t.GetSpan())
+		//fmt.Println("===============")
+		delete(c.traceMap, traceID)
+	}
+}
+
+func (c *Container) valuableTrace(traceID uint64) bool {
+	return traceID != 0
+}
+
 func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
 func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
 	c.lock.Lock()
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	defer c.lock.Unlock()
@@ -80,37 +115,34 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 		return c.onDNSRequest(r)
 		return c.onDNSRequest(r)
 	}
 	}
 
 
+	if !c.valuableTrace(r.TraceId) {
+		return nil
+	}
+
 	if r.Protocol == l7.ProtocolTrace {
 	if r.Protocol == l7.ProtocolTrace {
 		//fmt.Println("r.TraceStart:", r.TraceStart)
 		//fmt.Println("r.TraceStart:", r.TraceStart)
 		//fmt.Println("r.TraceEnd:", r.TraceEnd)
 		//fmt.Println("r.TraceEnd:", r.TraceEnd)
 
 
 		if r.TraceStart == 1 {
 		if r.TraceStart == 1 {
-			//fmt.Println("====ProtocolTrace start1====", r.TraceId)
-			err := c.InitTrace(r.TraceId, r)
-			if err != nil {
-				fmt.Println(err)
+			fmt.Println("====ProtocolTrace start====", r.TraceId)
+
+			trace, err := c.getOrInitTrace(r.TraceId)
+			if err == nil {
+				method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
+				ip, _ := netaddr.ParseIP(hostIp)
+				trace.TraceStartEvent(method, path, r.Status, netaddr.IPPortFrom(ip, port))
+				c.SendEvent(trace, r.TraceId)
 			}
 			}
-			//fmt.Println("init r.TraceId:", r.TraceId)
-			//trace, _ := c.getTrace(r.TraceId)
-			//fmt.Println("init traceId", trace)
-			//stats.observe(r.Status.Http(), "", r.Duration)
-			//method, path := l7.ParseHttp(r.Payload)
-			//fmt.Println("r.Payload:", string(r.Payload))
-			//fmt.Println("method:", method)
-			//fmt.Println("path:", path)
-			//fmt.Println("====ProtocolTrace start2====")
+
 			return nil
 			return nil
 		}
 		}
 		if r.TraceEnd == 1 {
 		if r.TraceEnd == 1 {
-			//fmt.Println("r:", r)
-			//fmt.Println("r.Payload:", string(r.Payload))
-			//fmt.Println("====ProtocolTrace end2====")
-			trace, ok := c.getTrace(r.TraceId)
-			if ok {
-				trace.TraceEnd(r)
-				delete(c.traceMap, r.TraceId)
+			fmt.Println("====ProtocolTrace end====", r.TraceId, r.EventCount)
+			trace, err := c.getOrInitTrace(r.TraceId)
+			if err == nil {
+				trace.TraceEndEvent(r)
+				c.SendEvent(trace, r.TraceId)
 			}
 			}
-			//fmt.Println("====ProtocolTrace end1====", ok, r.TraceId)
 			return nil
 			return nil
 		}
 		}
 	}
 	}
@@ -119,9 +151,16 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 		method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
 		method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
 		//trace.HttpRequest(method, path, r.Status, r.Duration)
 		//trace.HttpRequest(method, path, r.Status, r.Duration)
 
 
-		apmTrace, ok := c.getTrace(r.TraceId)
-		if ok {
-			apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
+		//apmTrace, ok := c.getTrace(r.TraceId)
+		//if ok {
+		//	apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
+		//}
+
+		apmTrace, err := c.getOrInitTrace(r.TraceId)
+		fmt.Println("ProtocolHTTP-----", r.TraceId, err)
+		if err == nil {
+			apmTrace.HttpTraceRequestEvent(method, path, hostIp, port, r)
+			c.SendEvent(apmTrace, r.TraceId)
 		}
 		}
 		return nil
 		return nil
 	}
 	}
@@ -138,15 +177,15 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 	trace := tracing.NewTrace(string(c.id), conn.ActualDest)
 	trace := tracing.NewTrace(string(c.id), conn.ActualDest)
 	switch r.Protocol {
 	switch r.Protocol {
 	case l7.ProtocolHTTP:
 	case l7.ProtocolHTTP:
-		fmt.Println("l7.ProtocolHTTP", r.TraceId)
-		//stats.observe(r.Status.Http(), "", r.Duration)
-		method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
-		//trace.HttpRequest(method, path, r.Status, r.Duration)
-
-		apmTrace, ok := c.getTrace(r.TraceId)
-		if ok {
-			apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
-		}
+		//fmt.Println("l7.ProtocolHTTP", r.TraceId)
+		////stats.observe(r.Status.Http(), "", r.Duration)
+		//method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
+		////trace.HttpRequest(method, path, r.Status, r.Duration)
+		//
+		//apmTrace, ok := c.getTrace(r.TraceId)
+		//if ok {
+		//	apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
+		//}
 
 
 	case l7.ProtocolHTTP2:
 	case l7.ProtocolHTTP2:
 		if conn.http2Parser == nil {
 		if conn.http2Parser == nil {
@@ -158,14 +197,14 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 			trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
 			trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
 		}
 		}
 	case l7.ProtocolPostgres:
 	case l7.ProtocolPostgres:
-		if r.Method != l7.MethodStatementClose {
-			stats.observe(r.Status.String(), "", r.Duration)
-		}
-		if conn.postgresParser == nil {
-			conn.postgresParser = l7.NewPostgresParser()
-		}
-		query := conn.postgresParser.Parse(r.Payload)
-		trace.PostgresQuery(query, r.Status.Error(), r.Duration)
+		//if r.Method != l7.MethodStatementClose {
+		//	stats.observe(r.Status.String(), "", r.Duration)
+		//}
+		//if conn.postgresParser == nil {
+		//	conn.postgresParser = l7.NewPostgresParser()
+		//}
+		//query := conn.postgresParser.Parse(r.Payload)
+		//trace.PostgresQuery(query, r.Status.Error(), r.Duration)
 	case l7.ProtocolMysql:
 	case l7.ProtocolMysql:
 		//fmt.Println("mysql mysql")
 		//fmt.Println("mysql mysql")
 		//fmt.Println(conn)
 		//fmt.Println(conn)
@@ -178,39 +217,41 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 		query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
 		query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
 		//trace.MysqlQuery(query, r.Status.Error(), r.Duration)
 		//trace.MysqlQuery(query, r.Status.Error(), r.Duration)
 
 
-		apmTrace, ok := c.getTrace(r.TraceId)
+		//apmTrace, ok := c.getTrace(r.TraceId)
+		apmTrace, err := c.getOrInitTrace(r.TraceId)
 		//fmt.Println("mysql r.TraceId:", r.TraceId)
 		//fmt.Println("mysql r.TraceId:", r.TraceId)
 		//fmt.Println("ok:", ok)
 		//fmt.Println("ok:", ok)
 		//fmt.Println("traceMap:", len(c.traceMap))
 		//fmt.Println("traceMap:", len(c.traceMap))
-		if ok {
-			apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
+		if err == nil {
+			//apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
+			apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
+			c.SendEvent(apmTrace, r.TraceId)
 		}
 		}
 	case l7.ProtocolMemcached:
 	case l7.ProtocolMemcached:
-		stats.observe(r.Status.String(), "", r.Duration)
-		cmd, items := l7.ParseMemcached(r.Payload)
-		trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
+		//stats.observe(r.Status.String(), "", r.Duration)
+		//cmd, items := l7.ParseMemcached(r.Payload)
+		//trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
 	case l7.ProtocolRedis:
 	case l7.ProtocolRedis:
-		fmt.Println("redis redis")
 		stats.observe(r.Status.String(), "", r.Duration)
 		stats.observe(r.Status.String(), "", r.Duration)
 		cmd, args := l7.ParseRedis(r.Payload)
 		cmd, args := l7.ParseRedis(r.Payload)
 		fmt.Println("cmd", cmd)
 		fmt.Println("cmd", cmd)
 		fmt.Println("args", args)
 		fmt.Println("args", args)
-		apmTrace, ok := c.getTrace(r.TraceId)
-		fmt.Println("redis r.TraceId:", r.TraceId)
-		fmt.Println("ok:", ok)
-		fmt.Println("traceMap:", len(c.traceMap))
-		if ok {
-			apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
+		//apmTrace, ok := c.getTrace(r.TraceId)
+		apmTrace, err := c.getOrInitTrace(r.TraceId)
+		if err == nil {
+			//apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
+			apmTrace.RedisTraceQueryEvent(cmd, args, r, conn.ActualDest)
+			c.SendEvent(apmTrace, r.TraceId)
 		}
 		}
 		//trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
 		//trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
 	case l7.ProtocolMongo:
 	case l7.ProtocolMongo:
-		stats.observe(r.Status.String(), "", r.Duration)
-		query := l7.ParseMongo(r.Payload)
-		trace.MongoQuery(query, r.Status.Error(), r.Duration)
+		//stats.observe(r.Status.String(), "", r.Duration)
+		//query := l7.ParseMongo(r.Payload)
+		//trace.MongoQuery(query, r.Status.Error(), r.Duration)
 	case l7.ProtocolKafka, l7.ProtocolCassandra:
 	case l7.ProtocolKafka, l7.ProtocolCassandra:
-		stats.observe(r.Status.String(), "", r.Duration)
+		//stats.observe(r.Status.String(), "", r.Duration)
 	case l7.ProtocolRabbitmq, l7.ProtocolNats:
 	case l7.ProtocolRabbitmq, l7.ProtocolNats:
-		stats.observe(r.Status.String(), r.Method.String(), 0)
+		//stats.observe(r.Status.String(), r.Method.String(), 0)
 	}
 	}
 	return nil
 	return nil
 }
 }
@@ -259,6 +300,36 @@ func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer
 	}
 	}
 }
 }
 
 
+func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+	// get the associated uprobe
+	switch event.Location {
+	case 0: // ret
+		uprobe, err := c.GetUprobe(event, tracer)
+		if err != nil {
+			fmt.Println("GetUprobeGetUprobe errer: %v", err)
+			// log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
+			return
+		}
+
+		if event.TraceId <= 0 {
+			fmt.Println("StackProcess TraceId id 0")
+			// log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
+			return
+		}
+
+		//fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
+		apmTrace, err := c.getOrInitTrace(event.TraceId)
+		if err == nil {
+			//fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
+			duration := event.TimeNsEnd - event.TimeNsStart
+			apmTrace.FuncTraceQuery(uprobe.Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
+			c.SendEvent(apmTrace, event.TraceId)
+		}
+	}
+}
+
 // ResolveAddress returns the symbol(s) and offset of the given address.
 // ResolveAddress returns the symbol(s) and offset of the given address.
 func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
 func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
 	if addr == 0 {
 	if addr == 0 {

+ 3 - 3
containers/registry.go

@@ -255,7 +255,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 				//fmt.Println("ebpftracer.EventTypeConnectionOpen==================", e.Pid)
 				//fmt.Println("ebpftracer.EventTypeConnectionOpen==================", e.Pid)
 				if c := r.getOrCreateContainer(e.Pid); c != nil {
 				if c := r.getOrCreateContainer(e.Pid); c != nil {
 					c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false)
 					c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false)
-					//c.attachTlsUprobes(r.tracer, e.Pid)
+					c.attachTlsUprobes(r.tracer, e.Pid)
 					// c.attachJVMUprobes(r.tracer, e.Pid)
 					// c.attachJVMUprobes(r.tracer, e.Pid)
 					c.attachUprobes(r.tracer, e.Pid)
 					c.attachUprobes(r.tracer, e.Pid)
 				} else {
 				} else {
@@ -301,9 +301,9 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 				}
 				}
 
 
 				if c := r.containersByPid[uint32(e.StackEvent.Pid)]; c != nil {
 				if c := r.containersByPid[uint32(e.StackEvent.Pid)]; c != nil {
-					// fmt.Printf("e.EventTypeFunEnt: TraceId:%d, Pid:%d, Location:%d, Goid:%d, TimeNs:%d, Ip:%X, CallerIp:%x, Bp:%x, CallerBp:%x\n", e.StackEvent.TraceId, e.StackEvent.Pid, e.StackEvent.Location, e.StackEvent.Goid, e.StackEvent.TimeNsStart, e.StackEvent.Ip, e.StackEvent.CallerIp, e.StackEvent.Bp, e.StackEvent.CallerBp)
+					// fmt.Printf("e.EventTypeFunEnt: TraceId:%lld, Pid:%d, Location:%d, Goid:%lld, TimeNs:%d, Ip:%x, CallerIp:%x, Bp:%x, CallerBp:%x\n", e.StackEvent.TraceId, e.StackEvent.Pid, e.StackEvent.Location, e.StackEvent.Goid, e.StackEvent.TimeNsStart, e.StackEvent.Ip, e.StackEvent.CallerIp, e.StackEvent.Bp, e.StackEvent.CallerBp)
 					// fmt.Printf("e.EventTypeFunEnt: FPid:%x, Nid:%x, Level:%d\n", e.StackEvent.Fpid, e.StackEvent.Nid, e.StackEvent.Level)
 					// fmt.Printf("e.EventTypeFunEnt: FPid:%x, Nid:%x, Level:%d\n", e.StackEvent.Fpid, e.StackEvent.Nid, e.StackEvent.Level)
-					c.StackProcess(*e.StackEvent, r.tracer)
+					c.StackProcess2(*e.StackEvent, r.tracer)
 				} else {
 				} else {
 					// fmt.Printf("e.EventTypeFunEnt ErrorError: TraceId:%d, Pid:%d, Location:%d, Goid:%d, TimeNs:%d, Ip:%X, CallerIp:%x, Bp:%x, CallerBp:%x", e.StackEvent.TraceId, e.StackEvent.Pid, e.StackEvent.Location, e.StackEvent.Goid, e.StackEvent.TimeNsStart, e.StackEvent.Ip, e.StackEvent.CallerIp, e.StackEvent.Bp, e.StackEvent.CallerBp)
 					// fmt.Printf("e.EventTypeFunEnt ErrorError: TraceId:%d, Pid:%d, Location:%d, Goid:%d, TimeNs:%d, Ip:%X, CallerIp:%x, Bp:%x, CallerBp:%x", e.StackEvent.TraceId, e.StackEvent.Pid, e.StackEvent.Location, e.StackEvent.Goid, e.StackEvent.TimeNsStart, e.StackEvent.Ip, e.StackEvent.CallerIp, e.StackEvent.Bp, e.StackEvent.CallerBp)
 					// fmt.Printf("e.EventTypeFunEnt ErrorError: TraceId:%x, FPid:%x, Nid:%x, Level:%d\n", e.StackEvent.Fpid, e.StackEvent.Nid, e.StackEvent.Level)
 					// fmt.Printf("e.EventTypeFunEnt ErrorError: TraceId:%x, FPid:%x, Nid:%x, Level:%d\n", e.StackEvent.Fpid, e.StackEvent.Nid, e.StackEvent.Level)

+ 2 - 0
ebpftracer/ebpf/include/bpf_base.h

@@ -40,6 +40,8 @@ static long (*bpf_map_delete_elem) (void *map, const void *key) = (void *)3;
 static long (*bpf_probe_read) (void *dst, __u32 size, const void *unsafe_ptr) =
 static long (*bpf_probe_read) (void *dst, __u32 size, const void *unsafe_ptr) =
     (void *)4;
     (void *)4;
 static __u64(*bpf_ktime_get_ns) (void) = (void *)5;
 static __u64(*bpf_ktime_get_ns) (void) = (void *)5;
+static __u64 (*bpf_ktime_get_boot_ns)(void) = (void *) 125;
+
 static long (*bpf_trace_printk) (const char *fmt, __u32 fmt_size, ...) =
 static long (*bpf_trace_printk) (const char *fmt, __u32 fmt_size, ...) =
     (void *)6;
     (void *)6;
 static __u32(*bpf_get_prandom_u32) (void) = (void *)7;
 static __u32(*bpf_get_prandom_u32) (void) = (void *)7;

+ 32 - 9
ebpftracer/ebpf/l7/apm_trace.c

@@ -6,14 +6,14 @@ struct {
 	__uint(type, BPF_MAP_TYPE_LRU_HASH);
 	__uint(type, BPF_MAP_TYPE_LRU_HASH);
 	__uint(key_size, sizeof(struct apm_trace_key_t));
 	__uint(key_size, sizeof(struct apm_trace_key_t));
 	__uint(value_size, sizeof(struct apm_span_context));
 	__uint(value_size, sizeof(struct apm_span_context));
-	__uint(max_entries, 1);
+	__uint(max_entries, 32768);
 } apm_parent_span_context_map SEC(".maps");
 } apm_parent_span_context_map SEC(".maps");
 
 
 struct {
 struct {
 	__uint(type, BPF_MAP_TYPE_LRU_HASH);
 	__uint(type, BPF_MAP_TYPE_LRU_HASH);
 	__uint(key_size, sizeof(struct apm_trace_key_t));
 	__uint(key_size, sizeof(struct apm_trace_key_t));
 	__uint(value_size, sizeof(struct apm_span_context));
 	__uint(value_size, sizeof(struct apm_span_context));
-	__uint(max_entries, 1);
+	__uint(max_entries, 32768);
 } apm_current_span_context_map SEC(".maps");
 } apm_current_span_context_map SEC(".maps");
 
 
 struct {
 struct {
@@ -37,6 +37,13 @@ struct {
 	__uint(max_entries, 32768);
 	__uint(max_entries, 32768);
 } thread_trace_info_heap SEC(".maps");
 } thread_trace_info_heap SEC(".maps");
 
 
+struct {
+	__uint(type, BPF_MAP_TYPE_HASH);
+	__uint(key_size, sizeof(__u64));
+	__uint(value_size, sizeof(__u32));
+	__uint(max_entries, 32768);
+} trace_event_count_heap SEC(".maps");
+
 static __inline __attribute__((__always_inline__))
 static __inline __attribute__((__always_inline__))
 struct apm_trace_key_t get_apm_trace_key(__u64 timeout, bool is_socket_io) {
 struct apm_trace_key_t get_apm_trace_key(__u64 timeout, bool is_socket_io) {
 	__u64 pid_tgid = bpf_get_current_pid_tgid();
 	__u64 pid_tgid = bpf_get_current_pid_tgid();
@@ -196,12 +203,7 @@ __u64 cw_clear_trace(__u32 tgid, __u32 pid, __u32 fd) {
 static __inline __attribute__((__always_inline__))
 static __inline __attribute__((__always_inline__))
 void cw_save_current_tracking_span(struct apm_span_context *sc) {
 void cw_save_current_tracking_span(struct apm_span_context *sc) {
 	struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
 	struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
-	long err = 0;
-	err = bpf_map_update_elem(&apm_current_span_context_map, &trace_key, sc, BPF_ANY);
-	if (err != 0) {
-		bpf_printk("Failed to update apm_current_span_context_map map: %ld", err);
-		return;
-	}
+	bpf_map_update_elem(&apm_current_span_context_map, &trace_key, sc, BPF_ANY);
 }
 }
 
 
 
 
@@ -244,6 +246,25 @@ struct apm_span_context *cw_get_current_tracking_span() {
 	return apm_sc;
 	return apm_sc;
 }
 }
 
 
+static __inline __attribute__((__always_inline__))
+__u32 cw_add_event_count(__u64 trace_id) {
+	__u32 *event_count = bpf_map_lookup_elem(&trace_event_count_heap, &trace_id);
+	if (event_count != NULL) {
+		*event_count = *event_count + 1;
+		return *event_count;
+	}
+	return 0;
+}
+
+static __inline __attribute__((__always_inline__))
+__u32 cw_get_event_count(__u64 trace_id) {
+	__u32 *event_count = bpf_map_lookup_elem(&trace_event_count_heap, &trace_id);
+	if (event_count != NULL){
+		return *event_count;
+	}
+	return 0;
+}
+
 static __inline __attribute__((__always_inline__))
 static __inline __attribute__((__always_inline__))
 struct apm_trace_info_t cw_save_trace_info(__u64 id, __u32 pid, __u64 fd) {
 struct apm_trace_info_t cw_save_trace_info(__u64 id, __u32 pid, __u64 fd) {
 	// apm trace
 	// apm trace
@@ -269,6 +290,8 @@ struct apm_trace_info_t cw_save_trace_info(__u64 id, __u32 pid, __u64 fd) {
 	bpf_map_update_elem(&trace_info_heap, &trace_info.trace_key, &trace_info, BPF_NOEXIST);
 	bpf_map_update_elem(&trace_info_heap, &trace_info.trace_key, &trace_info, BPF_NOEXIST);
 	bpf_map_update_elem(&fd_trace_info_heap, &trace_info.fd_trace_key, &trace_info, BPF_NOEXIST);
 	bpf_map_update_elem(&fd_trace_info_heap, &trace_info.fd_trace_key, &trace_info, BPF_NOEXIST);
 	bpf_map_update_elem(&thread_trace_info_heap, &trace_info.thread_trace_key, &trace_info, BPF_NOEXIST);
 	bpf_map_update_elem(&thread_trace_info_heap, &trace_info.thread_trace_key, &trace_info, BPF_NOEXIST);
+	// 事件个数
+	__u32 event_count = 0;
+	bpf_map_update_elem(&trace_event_count_heap, &trace_info.trace_id, &event_count, BPF_NOEXIST);
 	return trace_info;
 	return trace_info;
 }
 }
-

+ 20 - 4
ebpftracer/ebpf/l7/l7.c

@@ -72,8 +72,11 @@ struct l7_event {
     __u32 statement_id;
     __u32 statement_id;
     __u64 payload_size;
     __u64 payload_size;
     __u64 trace_id;
     __u64 trace_id;
+	__u64 start_at;
+	__u64 end_at;
     __u32 trace_start;
     __u32 trace_start;
     __u32 trace_end;
     __u32 trace_end;
+    __u32 event_count;
 	unsigned char assumed_app_id[APM_ASSUMED_APP_ID_SIZE];
 	unsigned char assumed_app_id[APM_ASSUMED_APP_ID_SIZE];
 	unsigned char span_id[APM_SPAN_ID_SIZE];
 	unsigned char span_id[APM_SPAN_ID_SIZE];
 	unsigned char trace_id_from[APM_TRACE_ID_SIZE];
 	unsigned char trace_id_from[APM_TRACE_ID_SIZE];
@@ -199,6 +202,7 @@ void send_event(void *ctx, struct l7_event *e, __u32 pid, __u64 fd) {
     __u64 *timestamp = bpf_map_lookup_elem(&connection_timestamps, &sk);
     __u64 *timestamp = bpf_map_lookup_elem(&connection_timestamps, &sk);
     if (timestamp) {
     if (timestamp) {
         if (*timestamp == 0) {
         if (*timestamp == 0) {
+//	        bpf_printk("timestamp=0");
             return;
             return;
         }
         }
         e->connection_timestamp = *timestamp;
         e->connection_timestamp = *timestamp;
@@ -207,7 +211,10 @@ void send_event(void *ctx, struct l7_event *e, __u32 pid, __u64 fd) {
     }
     }
     e->fd = fd;
     e->fd = fd;
     e->pid = pid;
     e->pid = pid;
-    bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+    long error = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+	if (error ==0){
+		cw_add_event_count(e->trace_id);
+	}
 }
 }
 
 
 static inline __attribute__((__always_inline__))
 static inline __attribute__((__always_inline__))
@@ -286,7 +293,9 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
     {
     {
 //        __u64 trace_id = get_apm_trace_id(pid, fd);
 //        __u64 trace_id = get_apm_trace_id(pid, fd);
         __u64 trace_id = get_fd_trace_id(pid, fd);
         __u64 trace_id = get_fd_trace_id(pid, fd);
+	    __u32 event_count = cw_get_event_count(trace_id);
         cw_bpf_debug("[Trace End in l7][Response][HTTP] pid:%d,fd:%d,trace_id:%llu", tid, fd, trace_id);
         cw_bpf_debug("[Trace End in l7][Response][HTTP] pid:%d,fd:%d,trace_id:%llu", tid, fd, trace_id);
+        cw_bpf_debug("[Trace End in l7][Response][HTTP] event_count:%d", event_count);
         // 清除trace信息
         // 清除trace信息
         cw_clear_trace(pid, tid, fd);
         cw_clear_trace(pid, tid, fd);
         // 发送事件到用户空间 start
         // 发送事件到用户空间 start
@@ -302,8 +311,10 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
 //            cw_bpf_debug("[Response][HTTP]:is_tls:%d|tid:%d",k.is_tls,k.stream_id);
 //            cw_bpf_debug("[Response][HTTP]:is_tls:%d|tid:%d",k.is_tls,k.stream_id);
             return 0;
             return 0;
         }
         }
-
-        e->duration = bpf_ktime_get_ns() - req->ns;
+	    e->start_at = req->ns;
+	    bpf_printk("req->ns:%llu",req->ns);
+	    e->end_at = bpf_ktime_get_ns();
+        e->duration = e->end_at - e->start_at;
 //        cw_bpf_debug("[Response][HTTP]:duration->ns:%d\n",e->duration);
 //        cw_bpf_debug("[Response][HTTP]:duration->ns:%d\n",e->duration);
         e->protocol = PROTOCOL_TRACE;
         e->protocol = PROTOCOL_TRACE;
         e->status = http_status;
         e->status = http_status;
@@ -314,6 +325,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         e->trace_end = 1;
         e->trace_end = 1;
         e->trace_id = trace_id;
         e->trace_id = trace_id;
         e->payload_size = size;
         e->payload_size = size;
+        e->event_count = event_count;
         COPY_PAYLOAD(e->payload, size, payload);
         COPY_PAYLOAD(e->payload, size, payload);
 		// psc
 		// psc
 	    struct apm_span_context *cw_psc = cw_get_parent_tracking_span();
 	    struct apm_span_context *cw_psc = cw_get_parent_tracking_span();
@@ -328,6 +340,8 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
 //		    }
 //		    }
 	    }
 	    }
         bpf_map_delete_elem(&active_l7_requests, &k);
         bpf_map_delete_elem(&active_l7_requests, &k);
+		// 清除事件计数
+	    bpf_map_delete_elem(&trace_event_count_heap, &trace_id);
         bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
         bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
         // 发送事件到用户空间 end
         // 发送事件到用户空间 end
 //        __u64 k_version = load_filter_pid();
 //        __u64 k_version = load_filter_pid();
@@ -725,7 +739,9 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
     if (!response) {
     if (!response) {
         return 0;
         return 0;
     }
     }
-    e->duration = bpf_ktime_get_ns() - req->ns;
+	e->end_at = bpf_ktime_get_ns();
+	e->start_at = req->ns;
+    e->duration = e->end_at - e->start_at;
     send_event(ctx, e, k.pid, k.fd);
     send_event(ctx, e, k.pid, k.fd);
     return 0;
     return 0;
 }
 }

+ 3 - 3
ebpftracer/ebpf/utrace/go/include/alloc.h

@@ -103,7 +103,7 @@ static __always_inline void *write_target_data(void *data, s32 size)
     u64 end = get_area_end(start,start_from_proc,end_from_proc);
     u64 end = get_area_end(start,start_from_proc,end_from_proc);
     if (end - start < size)
     if (end - start < size)
     {
     {
-        bpf_printk("reached end of CPU memory block, going to the start again");
+        cw_bpf_debug("reached end of CPU memory block, going to the start again");
         s32 start_index = 0;
         s32 start_index = 0;
         bpf_map_delete_elem(&alloc_map, &start_index);
         bpf_map_delete_elem(&alloc_map, &start_index);
         start = get_area_start(start_from_proc, end_from_proc);
         start = get_area_start(start_from_proc, end_from_proc);
@@ -119,7 +119,7 @@ static __always_inline void *write_target_data(void *data, s32 size)
     }
     }
     u64 target_u = (u64)target;
     u64 target_u = (u64)target;
     if (target_u > end_from_proc || target_u < start_from_proc) {
     if (target_u > end_from_proc || target_u < start_from_proc) {
-        bpf_printk("TARGET ADDRESS IS OUT OF BOUNDS: 0x%llx", target);
+	    cw_bpf_debug("TARGET ADDRESS IS OUT OF BOUNDS: 0x%llx", target);
         return NULL;
         return NULL;
     }
     }
 
 
@@ -141,7 +141,7 @@ static __always_inline void *write_target_data(void *data, s32 size)
     }
     }
     else
     else
     {
     {
-        bpf_printk("failed to write to userspace, error code: %d, addr: %lx, size: %d", success, target, size);
+	    cw_bpf_debug("failed to write to userspace, error code: %d, addr: %lx, size: %d", success, target, size);
         return NULL;
         return NULL;
     }
     }
 }
 }

+ 80 - 65
ebpftracer/ebpf/utrace/go/net/stack.probe.bpf.c

@@ -21,13 +21,11 @@ struct event
 	__u64 caller_bp;
 	__u64 caller_bp;
 	__u64 time_ns_start;
 	__u64 time_ns_start;
 	__u64 time_ns_end;
 	__u64 time_ns_end;
-	__u64 nid;
-	__u64 fpid;
-	__u64 level;
 	__u8 location;
 	__u8 location;
 }__attribute__((packed));
 }__attribute__((packed));
 
 
 struct trace_stack_entry_key_t {
 struct trace_stack_entry_key_t {
+	__u64 trace_id;
 	__u64 caller_bp;
 	__u64 caller_bp;
 	__u64 bp;
 	__u64 bp;
 };
 };
@@ -36,7 +34,7 @@ struct {
     __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
     __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
     __uint(key_size, sizeof(int));
     __uint(key_size, sizeof(int));
     __uint(value_size, sizeof(int));
     __uint(value_size, sizeof(int));
-	__uint(max_entries, 32768);
+	__uint(max_entries, 655351);
 } event_queue SEC(".maps");
 } event_queue SEC(".maps");
 
 
 struct {
 struct {
@@ -105,35 +103,41 @@ int ent(struct pt_regs *ctx)
 
 
 	cw_bpf_debug("[Go] [uprobe/ent]: goid: %llu", e->goid);
 	cw_bpf_debug("[Go] [uprobe/ent]: goid: %llu", e->goid);
 
 
-	cw_bpf_debug("[Go] [uprobe/ent]: event: location:%x,ip:%lx,time_ns_start:%lld\n", e->location, e->ip, e->time_ns_start);
+	cw_bpf_debug("[Go] [uprobe/ent]: event: location:%x,ip:%lx,time_ns_start:%llu\n", e->location, e->ip, e->time_ns_start);
 	cw_bpf_debug("[Go] [uprobe/ent]: event: bp:%x,caller_bp:%x,caller_ip:%x\n", e->bp,e->caller_bp,e->caller_ip);
 	cw_bpf_debug("[Go] [uprobe/ent]: event: bp:%x,caller_bp:%x,caller_ip:%x\n", e->bp,e->caller_bp,e->caller_ip);
 
 
-	// struct trace_stack_entry_key_t trace_key = {};
-	// trace_key.caller_bp = e->caller_bp % 0x800 + (e->goid << 12);
-	// trace_key.bp = e->bp % 0x800 + (e->goid << 12);
+	struct trace_stack_entry_key_t trace_key = {};
+	trace_key.caller_bp = e->caller_bp % 0x800 + (e->goid << 12);
+	trace_key.bp = e->bp % 0x800 + (e->goid << 12);
+	trace_key.trace_id = trace_id;
 
 
 	// trace_key.caller_bp = e->caller_bp;
 	// trace_key.caller_bp = e->caller_bp;
 	// trace_key.bp = e->bp;
 	// trace_key.bp = e->bp;
 	// cw_bpf_debug("[Go] [uprobe/ent]: trace_keytrace_keytrace_key: bp: %x", trace_key.bp);
 	// cw_bpf_debug("[Go] [uprobe/ent]: trace_keytrace_keytrace_key: bp: %x", trace_key.bp);
-	// cw_bpf_debug("[Go] [uprobe/ent]: trace_keytrace_keytrace_key: caller_bp: %x, bp: %x", trace_key.caller_bp, trace_key.bp);
-
-	// struct event event_current = {};
-	// event_current.bp = e->bp,
-	// event_current.caller_bp = e->caller_bp,
-	// event_current.caller_ip = e->caller_ip,
-	// event_current.pid = e->pid,
-	// event_current.trace_id = e->trace_id,
-	// event_current.goid = e->goid,
-	// event_current.ip = e->ip,
-	// event_current.time_ns_start = e->time_ns_start,
-	// event_current.location = e->location,
-	// event_current.nid = ((e->ip % 0x1000) << 24) + e->bp % 0x800 + (e->goid << 12),
-	// event_current.fpid = e->caller_bp % 0x800 + (e->goid << 12),
-
-	// bpf_map_update_elem(&trace_stack_entry, &trace_key, &event_current, BPF_ANY);
+	bpf_printk("[Go] [uprobe/ent]: trace_keytrace_keytrace_key: caller_bp: %x, bp: %x, %lld", trace_key.caller_bp, trace_key.bp, trace_key.trace_id);
+
+	struct event *event_p = bpf_map_lookup_elem(&trace_stack_entry, &trace_key);
+
+	if (event_p) {
+		cw_bpf_debug("[Go] [uprobe/ent]:Error get alike funEntry %x, %x, %x", event_p->ip, event_p->caller_bp, event_p->bp);
+		// return 0;
+	}
+
+	struct event event_current = {};
+	event_current.bp = e->bp,
+	event_current.caller_bp = e->caller_bp,
+	event_current.caller_ip = e->caller_ip,
+	event_current.pid = e->pid,
+	event_current.trace_id = e->trace_id,
+	event_current.goid = e->goid,
+	event_current.ip = e->ip,
+	event_current.time_ns_start = e->time_ns_start,
+	event_current.location = e->location,
+
+	bpf_map_update_elem(&trace_stack_entry, &trace_key, &event_current, BPF_ANY);
 
 
 	// return bpf_map_push_elem(&event_queue, e, BPF_EXIST);
 	// return bpf_map_push_elem(&event_queue, e, BPF_EXIST);
-	bpf_perf_event_output(ctx, &event_queue, BPF_F_CURRENT_CPU, e, sizeof(*e));
+	// bpf_perf_event_output(ctx, &event_queue, BPF_F_CURRENT_CPU, e, sizeof(*e));
 
 
 	cw_bpf_debug("[Go] [uprobe/ent] end");
 	cw_bpf_debug("[Go] [uprobe/ent] end");
 	return 1;
 	return 1;
@@ -167,46 +171,58 @@ int ret(struct pt_regs *ctx)
 	cw_bpf_debug("[Go] [uprobe/ret]: yes");
 	cw_bpf_debug("[Go] [uprobe/ret]: yes");
 	e->pid = pid;
 	e->pid = pid;
 	e->trace_id = trace_id;
 	e->trace_id = trace_id;
-	e->location = RETPOINT;
+	e->location = ENTPOINT;
 	e->ip = PT_REGS_IP(ctx);
 	e->ip = PT_REGS_IP(ctx);
 	e->time_ns_end = bpf_ktime_get_ns();
 	e->time_ns_end = bpf_ktime_get_ns();
 	e->bp = PT_REGS_SP(ctx) - 8;
 	e->bp = PT_REGS_SP(ctx) - 8;
 	e->caller_bp = PT_REGS_FP(ctx);
 	e->caller_bp = PT_REGS_FP(ctx);
 
 
+	// __u64 caller_bp = PT_REGS_FP(ctx);
 	cw_bpf_debug("[Go] [uprobe/ret]: e->ip:%lx,bp:%x,caller_bp:%x", e->ip, e->bp, e->caller_bp);
 	cw_bpf_debug("[Go] [uprobe/ret]: e->ip:%lx,bp:%x,caller_bp:%x", e->ip, e->bp, e->caller_bp);
 	cw_bpf_debug("[Go] [uprobe/ret]: sp:%x, goid: %llu, goid:0x:%x", PT_REGS_SP(ctx), e->goid, e->goid);
 	cw_bpf_debug("[Go] [uprobe/ret]: sp:%x, goid: %llu, goid:0x:%x", PT_REGS_SP(ctx), e->goid, e->goid);
 
 
-	cw_bpf_debug("[Go] [uprobe/ret]: event: location:%x,ip:%x,time_ns_end:%lld\n", e->location, e->ip, e->time_ns_end);
+	cw_bpf_debug("[Go] [uprobe/ret]: event: location:%x,ip:%x,time_ns_end:%llu\n", e->location, e->ip, e->time_ns_end);
 
 
-	// struct trace_stack_entry_key_t trace_key = {};
-	// trace_key.caller_bp = caller_bp % 0x800 + (e->goid << 12);
-	// trace_key.bp = e->bp % 0x800 + (e->goid << 12);
+	struct trace_stack_entry_key_t trace_key = {};
+	trace_key.caller_bp = e->caller_bp % 0x800 + (e->goid << 12);
+	trace_key.bp = e->bp % 0x800 + (e->goid << 12);
+	trace_key.trace_id = trace_id;
 
 
 	// trace_key.caller_bp = caller_bp;
 	// trace_key.caller_bp = caller_bp;
 	// trace_key.bp = e->bp;
 	// trace_key.bp = e->bp;
 
 
-	// cw_bpf_debug("[Go] [uprobe/ret]: trace_keytrace_keytrace_key: caller_bp: %x, bp: %x", trace_key.caller_bp, trace_key.bp);
+	cw_bpf_debug("[Go] [uprobe/ret]: trace_keytrace_keytrace_key: caller_bp: %x, bp: %x", trace_key.caller_bp, trace_key.bp);
 	// cw_bpf_debug("[Go] [uprobe/ret]: trace_keytrace_keytrace_key: bp: %x", trace_key.bp);
 	// cw_bpf_debug("[Go] [uprobe/ret]: trace_keytrace_keytrace_key: bp: %x", trace_key.bp);
 
 
-	// struct event *event_p = bpf_map_lookup_elem(&trace_stack_entry, &trace_key);
+	struct event *event_p = bpf_map_lookup_elem(&trace_stack_entry, &trace_key);
 
 
-	// if (!event_p) {
-	// 	cw_bpf_debug("[Go] [uprobe/ret]:ErrorErrorErrorError Not get funEntry");
-	// 	return 0;
-	// }
-
-	// event_p->time_ns_end = e->time_ns_end;
+	if (!event_p) {
+		cw_bpf_debug("[Go] [uprobe/ret]:ErrorErrorErrorError Not get funEntry");
+		return 0;
+	}
 
 
-	// cw_bpf_debug("[Go] [uprobe/ret]: ent:event: location:%d,ip:%x,time_ns_start:%lld\n", event_p->location, event_p->ip, event_p->time_ns_start);
-	// cw_bpf_debug("[Go] [uprobe/ret]: ent:event: bp:%x,caller_bp:%x,caller_ip:%x\n", event_p->bp,event_p->caller_bp,event_p->caller_ip);
-	// cw_bpf_debug("[Go] [uprobe/ret]: ent:event: nid:%llx,fpid:%llx,level:%d\n", event_p->nid,event_p->fpid,event_p->level);
+	event_p->time_ns_end = e->time_ns_end;
 
 
-	// bpf_map_delete_elem(&trace_stack_entry, &trace_key);
+	cw_bpf_debug("[Go] [uprobe/ret]: ent:event: location:%d,ip:%x,time_ns_start:%llu\n", event_p->location, event_p->ip, event_p->time_ns_start);
+	cw_bpf_debug("[Go] [uprobe/ret]: ent:event: bp:%x,caller_bp:%x,caller_ip:%x\n", event_p->bp,event_p->caller_bp,event_p->caller_ip);
 
 
+	e->time_ns_start = event_p->time_ns_start;
+	// e->caller_ip = event_p->caller_ip;
+	// e->ip = event_p->ip;
+	
 	// return bpf_map_push_elem(&event_queue, e, BPF_EXIST);
 	// return bpf_map_push_elem(&event_queue, e, BPF_EXIST);
-	// cw_bpf_debug("[Go] [uprobe/ret]: ent:event_ret push: nid:%llx,fpid:%llx,level:%d\n", event_p->nid,event_p->fpid,event_p->level);
-	bpf_perf_event_output(ctx, &event_queue, BPF_F_CURRENT_CPU, e, sizeof(*e));
-
+	cw_bpf_debug("[Go] [uprobe/ret]: ent:event_ret push: event_p->ip:%llx,ip:%llx\n", event_p->ip, e->ip);
+	long err =  bpf_perf_event_output(ctx, &event_queue, BPF_F_CURRENT_CPU, e, sizeof(*e));
+	if (err == 0) {
+		__u32 count = cw_add_event_count(trace_id);
+		if (event_p->trace_id != trace_id) {
+			cw_bpf_debug("[pref] fuck err<%d> ip->%x->%x", err, PT_REGS_IP(ctx), e->ip);
+		}
+		cw_bpf_debug("[pref] err<%d> count->%d trace_id->%llu", err, count, trace_id);
+	} else {
+		cw_bpf_debug("[pref] err<%d> trace_id->%d", err, trace_id);
+	}
+	bpf_map_delete_elem(&trace_stack_entry, &trace_key);
 	cw_bpf_debug("[Go] [uprobe/ret] end");
 	cw_bpf_debug("[Go] [uprobe/ret] end");
 	return 1;
 	return 1;
 }
 }
@@ -267,32 +283,31 @@ int dotnetent(struct pt_regs *ctx)
 	cw_bpf_debug("[Go] [uprobe/ent]: event: location:%x,ip:%lx,time_ns_start:%lld\n", e->location, e->ip, e->time_ns_start);
 	cw_bpf_debug("[Go] [uprobe/ent]: event: location:%x,ip:%lx,time_ns_start:%lld\n", e->location, e->ip, e->time_ns_start);
 	cw_bpf_debug("[Go] [uprobe/ent]: event: bp:%x,caller_bp:%x,caller_ip:%x\n", e->bp,e->caller_bp,e->caller_ip);
 	cw_bpf_debug("[Go] [uprobe/ent]: event: bp:%x,caller_bp:%x,caller_ip:%x\n", e->bp,e->caller_bp,e->caller_ip);
 
 
-	// struct trace_stack_entry_key_t trace_key = {};
-	// trace_key.caller_bp = e->caller_bp % 0x800 + (e->goid << 12);
-	// trace_key.bp = e->bp % 0x800 + (e->goid << 12);
+	struct trace_stack_entry_key_t trace_key = {};
+	trace_key.caller_bp = e->caller_bp % 0x800 + (e->goid << 12);
+	trace_key.bp = e->bp % 0x800 + (e->goid << 12);
+	trace_key.trace_id = trace_id;
 
 
 	// trace_key.caller_bp = e->caller_bp;
 	// trace_key.caller_bp = e->caller_bp;
 	// trace_key.bp = e->bp;
 	// trace_key.bp = e->bp;
 	// cw_bpf_debug("[Go] [uprobe/ent]: trace_keytrace_keytrace_key: bp: %x", trace_key.bp);
 	// cw_bpf_debug("[Go] [uprobe/ent]: trace_keytrace_keytrace_key: bp: %x", trace_key.bp);
-	// cw_bpf_debug("[Go] [uprobe/ent]: trace_keytrace_keytrace_key: caller_bp: %x, bp: %x", trace_key.caller_bp, trace_key.bp);
-
-	// struct event event_current = {};
-	// event_current.bp = e->bp,
-	// event_current.caller_bp = e->caller_bp,
-	// event_current.caller_ip = e->caller_ip,
-	// event_current.pid = e->pid,
-	// event_current.trace_id = e->trace_id,
-	// event_current.goid = e->goid,
-	// event_current.ip = e->ip,
-	// event_current.time_ns_start = e->time_ns_start,
-	// event_current.location = e->location,
-	// event_current.nid = ((e->ip % 0x1000) << 24) + e->bp % 0x800 + (e->goid << 12),
-	// event_current.fpid = e->caller_bp % 0x800 + (e->goid << 12),
-
-	// bpf_map_update_elem(&trace_stack_entry, &trace_key, &event_current, BPF_ANY);
+	cw_bpf_debug("[Go] [uprobe/ent]: trace_keytrace_keytrace_key: caller_bp: %x, bp: %x", trace_key.caller_bp, trace_key.bp);
+
+	struct event event_current = {};
+	event_current.bp = e->bp,
+	event_current.caller_bp = e->caller_bp,
+	event_current.caller_ip = e->caller_ip,
+	event_current.pid = e->pid,
+	event_current.trace_id = e->trace_id,
+	event_current.goid = e->goid,
+	event_current.ip = e->ip,
+	event_current.time_ns_start = e->time_ns_start,
+	event_current.location = e->location,
+
+	bpf_map_update_elem(&trace_stack_entry, &trace_key, &event_current, BPF_ANY);
 
 
 	// return bpf_map_push_elem(&event_queue, e, BPF_EXIST);
 	// return bpf_map_push_elem(&event_queue, e, BPF_EXIST);
-	bpf_perf_event_output(ctx, &event_queue, BPF_F_CURRENT_CPU, e, sizeof(*e));
+	// bpf_perf_event_output(ctx, &event_queue, BPF_F_CURRENT_CPU, e, sizeof(*e));
 
 
 	cw_bpf_debug("[Go] [uprobe/ent] end");
 	cw_bpf_debug("[Go] [uprobe/ent] end");
 	return 1;
 	return 1;

+ 48 - 48
ebpftracer/ebpf/utrace/java/net/client.probe.bpf.c

@@ -5,30 +5,30 @@
 static __inline int updataSocket2(struct sock_t *map_data_res, char * payload,int len,void * jbytechar_ptr,void * len_from_rbp_ptr) {
 static __inline int updataSocket2(struct sock_t *map_data_res, char * payload,int len,void * jbytechar_ptr,void * len_from_rbp_ptr) {
 
 
 	long res = bpf_probe_write_user((void *) jbytechar_ptr,  &map_data_res->payload, sizeof(map_data_res->payload));
 	long res = bpf_probe_write_user((void *) jbytechar_ptr,  &map_data_res->payload, sizeof(map_data_res->payload));
-	bpf_printk("sizeof(map_data_res->payload) %d\n", sizeof(map_data_res->payload));
-	bpf_printk("sizeof(payload) %d\n", sizeof(payload));
-	bpf_printk("&payload 0x%lx\n", &payload);
+	cw_bpf_debug("sizeof(map_data_res->payload) %d\n", sizeof(map_data_res->payload));
+	cw_bpf_debug("sizeof(payload) %d\n", sizeof(payload));
+	cw_bpf_debug("&payload 0x%lx\n", &payload);
 
 
 	if (res == 0) {
 	if (res == 0) {
-		bpf_printk("Successfully wrote value to user address: 0x%lx\n", jbytechar_ptr);
+		cw_bpf_debug("Successfully wrote value to user address: 0x%lx\n", jbytechar_ptr);
 	} else {
 	} else {
-		bpf_printk("Failed to write value to user address: %p, error: %ld\n", jbytechar_ptr, res);
+		cw_bpf_debug("Failed to write value to user address: %p, error: %ld\n", jbytechar_ptr, res);
 	}
 	}
 
 
 	unsigned long new_val;
 	unsigned long new_val;
 	new_val = len;
 	new_val = len;
 	res = bpf_probe_write_user((void *) len_from_rbp_ptr, &new_val, sizeof(new_val));
 	res = bpf_probe_write_user((void *) len_from_rbp_ptr, &new_val, sizeof(new_val));
 	if (res == 0) {
 	if (res == 0) {
-		bpf_printk("Successfully wrote value to user address: 0x%lx\n", len_from_rbp_ptr);
+		cw_bpf_debug("Successfully wrote value to user address: 0x%lx\n", len_from_rbp_ptr);
 	} else {
 	} else {
-		bpf_printk("Failed to write value to user address: %p, error: %ld\n", len_from_rbp_ptr, res);
+		cw_bpf_debug("Failed to write value to user address: %p, error: %ld\n", len_from_rbp_ptr, res);
 	}
 	}
 
 
-	bpf_printk("len %d\n", len);
+	cw_bpf_debug("len %d\n", len);
 
 
-//	bpf_printk("payload %s\n", payload);
+//	cw_bpf_debug("payload %s\n", payload);
 //	for (int i = 270; i < len; ++i) {
 //	for (int i = 270; i < len; ++i) {
-//		bpf_printk("data[%d]=%c", i,payload[i]);
+//		cw_bpf_debug("data[%d]=%c", i,payload[i]);
 //		if(payload[i]=='\0'){
 //		if(payload[i]=='\0'){
 //			break;
 //			break;
 //		}
 //		}
@@ -40,25 +40,25 @@ static __inline int updataSocket(struct sock_t *map_data_res,void * jbytechar_pt
 
 
 	long res = bpf_probe_write_user((void *) jbytechar_ptr, &map_data_res->payload, sizeof(map_data_res->payload));
 	long res = bpf_probe_write_user((void *) jbytechar_ptr, &map_data_res->payload, sizeof(map_data_res->payload));
 	if (res == 0) {
 	if (res == 0) {
-		bpf_printk("Successfully wrote value to user address: 0x%lx\n", jbytechar_ptr);
+		cw_bpf_debug("Successfully wrote value to user address: 0x%lx\n", jbytechar_ptr);
 	} else {
 	} else {
-		bpf_printk("Failed to write value to user address: %p, error: %ld\n", jbytechar_ptr, res);
+		cw_bpf_debug("Failed to write value to user address: %p, error: %ld\n", jbytechar_ptr, res);
 	}
 	}
 
 
 	unsigned long new_val;
 	unsigned long new_val;
 	new_val = map_data_res->size;
 	new_val = map_data_res->size;
 	res = bpf_probe_write_user((void *) len_from_rbp_ptr, &new_val, sizeof(new_val));
 	res = bpf_probe_write_user((void *) len_from_rbp_ptr, &new_val, sizeof(new_val));
 	if (res == 0) {
 	if (res == 0) {
-		bpf_printk("Successfully wrote value to user address: 0x%lx\n", len_from_rbp_ptr);
+		cw_bpf_debug("Successfully wrote value to user address: 0x%lx\n", len_from_rbp_ptr);
 	} else {
 	} else {
-		bpf_printk("Failed to write value to user address: %p, error: %ld\n", len_from_rbp_ptr, res);
+		cw_bpf_debug("Failed to write value to user address: %p, error: %ld\n", len_from_rbp_ptr, res);
 	}
 	}
 
 
-	bpf_printk("Successfully %d\n", map_data_res->size);
+	cw_bpf_debug("Successfully %d\n", map_data_res->size);
 
 
-	bpf_printk("Successfully %s\n", map_data_res->payload);
+	cw_bpf_debug("Successfully %s\n", map_data_res->payload);
 //	for (int i = 270; i < 290; ++i) {
 //	for (int i = 270; i < 290; ++i) {
-//		bpf_printk("data[%d]=%c", i,map_data_res->payload[i]);
+//		cw_bpf_debug("data[%d]=%c", i,map_data_res->payload[i]);
 //		if(map_data_res->payload[i]=='\0'){
 //		if(map_data_res->payload[i]=='\0'){
 //			break;
 //			break;
 //		}
 //		}
@@ -72,7 +72,7 @@ static __inline struct sock_t* buildHeader(struct sock_t *map_data) {
 	struct sock_t *map_data_res = bpf_map_lookup_elem(&socket_res, &key);
 	struct sock_t *map_data_res = bpf_map_lookup_elem(&socket_res, &key);
 
 
 	if (!map_data_res) {
 	if (!map_data_res) {
-		bpf_printk("Failed to lookup socket_heap");
+		cw_bpf_debug("Failed to lookup socket_heap");
 		return NULL;
 		return NULL;
 	}
 	}
 
 
@@ -121,10 +121,10 @@ static __inline struct sock_t* buildHeader(struct sock_t *map_data) {
 
 
 	char header[CW_STREAM_HEADER_LEN];
 	char header[CW_STREAM_HEADER_LEN];
 	span_context_to_cw_string_stream(cw_span_context, header, '1');
 	span_context_to_cw_string_stream(cw_span_context, header, '1');
-	bpf_printk("Successfully HEADER  %s\n", header);
+	cw_bpf_debug("Successfully HEADER  %s\n", header);
 
 
 //	bpf_probe_read(map_data_res->payload, map_data->header_offset_idx, map_data->payload);
 //	bpf_probe_read(map_data_res->payload, map_data->header_offset_idx, map_data->payload);
-//	bpf_printk("Successfully %s\n", data);
+//	cw_bpf_debug("Successfully %s\n", data);
 //	return 1;
 //	return 1;
 #pragma unroll
 #pragma unroll
 	for (int i = 0; i < MAX_LEN; i++) {
 	for (int i = 0; i < MAX_LEN; i++) {
@@ -145,7 +145,7 @@ static __inline struct sock_t* buildHeader(struct sock_t *map_data) {
 				map_data_res->payload[tmp_len] = map_data->payload[i];
 				map_data_res->payload[tmp_len] = map_data->payload[i];
 				if (map_data->payload[i] == '\0')
 				if (map_data->payload[i] == '\0')
 					break;
 					break;
-//				bpf_printk("map_data->payload: %c->i=%d, offindex %d", map_data_res->payload[i], i, map_data->header_offset_idx);
+//				cw_bpf_debug("map_data->payload: %c->i=%d, offindex %d", map_data_res->payload[i], i, map_data->header_offset_idx);
 			}
 			}
 		}
 		}
 	}
 	}
@@ -158,7 +158,7 @@ static __inline int insertHeader(struct sock_t *map_data,void * jbytechar_ptr,vo
 
 
 	struct sock_t *map_data_res= buildHeader(map_data);
 	struct sock_t *map_data_res= buildHeader(map_data);
 	if (map_data_res == NULL) {
 	if (map_data_res == NULL) {
-		bpf_printk("Failed to lookup socket_heap");
+		cw_bpf_debug("Failed to lookup socket_heap");
 		return -1;
 		return -1;
 	}
 	}
 
 
@@ -177,7 +177,7 @@ int uprobe_Java_java_net_SocketOutputStream_socketWrite0(struct pt_regs *ctx) {
 		return 0;
 		return 0;
 	}
 	}
 
 
-	bpf_printk("--------");
+	cw_bpf_debug("--------");
 
 
 
 
 //	void *len_from_rsp_ptr = (void *) PT_REGS_RSP(ctx) - 65640 +8 ;
 //	void *len_from_rsp_ptr = (void *) PT_REGS_RSP(ctx) - 65640 +8 ;
@@ -186,32 +186,32 @@ int uprobe_Java_java_net_SocketOutputStream_socketWrite0(struct pt_regs *ctx) {
 
 
 //	long res;
 //	long res;
 	void *len_ptr = 0;
 	void *len_ptr = 0;
-	bpf_printk("address: len_from_rbp_ptr<0x%lx>\n", len_from_rbp_ptr);
-//	bpf_printk("len_from_rsp_ptr address: 0x%lx\n", len_from_rsp_ptr);
+	cw_bpf_debug("address: len_from_rbp_ptr<0x%lx>\n", len_from_rbp_ptr);
+//	cw_bpf_debug("len_from_rsp_ptr address: 0x%lx\n", len_from_rsp_ptr);
 
 
 	bpf_probe_read(&len_ptr, sizeof(len_ptr), (void *) len_from_rbp_ptr);
 	bpf_probe_read(&len_ptr, sizeof(len_ptr), (void *) len_from_rbp_ptr);
-	bpf_printk("[len_ptr] before addr<0x%lx>, %d \n", len_from_rbp_ptr, len_ptr);
+	cw_bpf_debug("[len_ptr] before addr<0x%lx>, %d \n", len_from_rbp_ptr, len_ptr);
 
 
 	bpf_probe_read(&len_ptr, sizeof(len_ptr), (void *) len_from_rbp_ptr);
 	bpf_probe_read(&len_ptr, sizeof(len_ptr), (void *) len_from_rbp_ptr);
-	bpf_printk("[len_ptr]after  addr<0x%lx>, %d \n", len_from_rbp_ptr, len_ptr);
+	cw_bpf_debug("[len_ptr]after  addr<0x%lx>, %d \n", len_from_rbp_ptr, len_ptr);
 
 
 	// 获取jbytearray
 	// 获取jbytearray
 	void *jbytearray_ptr = (void *) PT_REGS_PARM4(ctx);
 	void *jbytearray_ptr = (void *) PT_REGS_PARM4(ctx);
-	bpf_printk("jbytechar_ptr_from_rcx <0x%lx>", jbytearray_ptr);
+	cw_bpf_debug("jbytechar_ptr_from_rcx <0x%lx>", jbytearray_ptr);
 
 
 	unsigned long jbytechar_head_ptr;
 	unsigned long jbytechar_head_ptr;
 	// 读取一级指针
 	// 读取一级指针
 	long ret = bpf_probe_read_user(&jbytechar_head_ptr, sizeof(unsigned long), (void *) jbytearray_ptr);
 	long ret = bpf_probe_read_user(&jbytechar_head_ptr, sizeof(unsigned long), (void *) jbytearray_ptr);
-	bpf_printk("[jbytechar_head_ptr] <0x%lx>", jbytechar_head_ptr);
+	cw_bpf_debug("[jbytechar_head_ptr] <0x%lx>", jbytechar_head_ptr);
 
 
 	if (ret != 0) {
 	if (ret != 0) {
-		bpf_printk("Failed to read first level ptr: %d\n", ret);
+		cw_bpf_debug("Failed to read first level ptr: %d\n", ret);
 		return 0;
 		return 0;
 	}
 	}
 
 
 	// 检查一级指针是否有效
 	// 检查一级指针是否有效
 	if (!jbytechar_head_ptr) {
 	if (!jbytechar_head_ptr) {
-		bpf_printk("First level pointer is null.\n");
+		cw_bpf_debug("First level pointer is null.\n");
 		return 0;
 		return 0;
 	}
 	}
 
 
@@ -219,17 +219,17 @@ int uprobe_Java_java_net_SocketOutputStream_socketWrite0(struct pt_regs *ctx) {
 	int key = 0;
 	int key = 0;
 	struct sock_t *map_data = bpf_map_lookup_elem(&socket_heap, &key);
 	struct sock_t *map_data = bpf_map_lookup_elem(&socket_heap, &key);
 	if (!map_data) {
 	if (!map_data) {
-		bpf_printk("Failed to lookup socket_heap\n");
+		cw_bpf_debug("Failed to lookup socket_heap\n");
 		return 1;
 		return 1;
 	}
 	}
 //	__builtin_memset(map_data, 0, sizeof(struct sock_t));
 //	__builtin_memset(map_data, 0, sizeof(struct sock_t));
 	// 读取用户空间数据到 map_data->payload
 	// 读取用户空间数据到 map_data->payload
 	void *jbytechar_ptr = (void *) (jbytechar_head_ptr + 16);
 	void *jbytechar_ptr = (void *) (jbytechar_head_ptr + 16);
-	bpf_printk("[jbytechar_ptr] <0x%lx>", jbytechar_ptr);
+	cw_bpf_debug("[jbytechar_ptr] <0x%lx>", jbytechar_ptr);
 	long err = bpf_probe_read_user_str(map_data->payload, sizeof(map_data->payload), jbytechar_ptr);
 	long err = bpf_probe_read_user_str(map_data->payload, sizeof(map_data->payload), jbytechar_ptr);
 
 
 	if (err < 0) {
 	if (err < 0) {
-		bpf_printk("bpf_probe_read_user failed with return code: %d\n", err);
+		cw_bpf_debug("bpf_probe_read_user failed with return code: %d\n", err);
 		return 0;
 		return 0;
 	}
 	}
 	map_data->size = data_count;
 	map_data->size = data_count;
@@ -265,18 +265,18 @@ int uprobe_Java_java_net_SocketOutputStream_socketWrite0(struct pt_regs *ctx) {
 		return -1;
 		return -1;
 	// 读取完整http
 	// 读取完整http
 	long aaa = bpf_probe_read_user_str(data, MAX_BUFFER_SIZE, jbytechar_ptr);
 	long aaa = bpf_probe_read_user_str(data, MAX_BUFFER_SIZE, jbytechar_ptr);
-	bpf_printk("data %s\n", data);
+	cw_bpf_debug("data %s\n", data);
 
 
 	char *data2;
 	char *data2;
 	int key2 = 1;
 	int key2 = 1;
 	data2 = bpf_map_lookup_elem(&large_array_map, &key2);
 	data2 = bpf_map_lookup_elem(&large_array_map, &key2);
 	if (!data2) {
 	if (!data2) {
-		bpf_printk("data2 %s\n", data2);
+		cw_bpf_debug("data2 %s\n", data2);
 		return -1;
 		return -1;
 	}
 	}
 
 
 	long end_str_len = bpf_probe_read_user_str(data2, MAX_BUFFER_SIZE, (void *) (jbytechar_ptr + map_data->header_offset_idx));
 	long end_str_len = bpf_probe_read_user_str(data2, MAX_BUFFER_SIZE, (void *) (jbytechar_ptr + map_data->header_offset_idx));
-	bpf_printk("end_str_len %d\n", end_str_len);
+	cw_bpf_debug("end_str_len %d\n", end_str_len);
 
 
 	// 挪动数据以腾出空间插入子串
 	// 挪动数据以腾出空间插入子串
 	int insert_pos = map_data->header_offset_idx;
 	int insert_pos = map_data->header_offset_idx;
@@ -287,7 +287,7 @@ int uprobe_Java_java_net_SocketOutputStream_socketWrite0(struct pt_regs *ctx) {
 	}
 	}
 
 
 	int insert_pos2 = insert_pos + sizeof(subs) - 1;
 	int insert_pos2 = insert_pos + sizeof(subs) - 1;
-	bpf_printk("sizeof(data2) %d\n", sizeof(data2));
+	cw_bpf_debug("sizeof(data2) %d\n", sizeof(data2));
 
 
 
 
 	int copy_size = 375;
 	int copy_size = 375;
@@ -298,8 +298,8 @@ int uprobe_Java_java_net_SocketOutputStream_socketWrite0(struct pt_regs *ctx) {
 //		for (int i = 0; i < max_chunk; i++) {
 //		for (int i = 0; i < max_chunk; i++) {
 //			if (i <= chunk) {
 //			if (i <= chunk) {
 //				insert_pos2 = insert_pos + sizeof(subs) - 1 + i*copy_size;
 //				insert_pos2 = insert_pos + sizeof(subs) - 1 + i*copy_size;
-//				bpf_printk("insert_pos2:%d\n", insert_pos2);
-//				bpf_printk("chunk i:%i %d\n", i, chunk);
+//				cw_bpf_debug("insert_pos2:%d\n", insert_pos2);
+//				cw_bpf_debug("chunk i:%i %d\n", i, chunk);
 //				if (insert_pos2 >= 0 && insert_pos2 <= MAX_BUFFER_SIZE - copy_size) {
 //				if (insert_pos2 >= 0 && insert_pos2 <= MAX_BUFFER_SIZE - copy_size) {
 //					__builtin_memcpy(data + insert_pos2, data2 + i*copy_size, copy_size);
 //					__builtin_memcpy(data + insert_pos2, data2 + i*copy_size, copy_size);
 //				}
 //				}
@@ -309,13 +309,13 @@ int uprobe_Java_java_net_SocketOutputStream_socketWrite0(struct pt_regs *ctx) {
 
 
 	if (end_str_len <= copy_size) {
 	if (end_str_len <= copy_size) {
 		if (insert_pos2 >= 0 && insert_pos2 <= MAX_BUFFER_SIZE - copy_size){
 		if (insert_pos2 >= 0 && insert_pos2 <= MAX_BUFFER_SIZE - copy_size){
-			bpf_printk("nochunk %d\n", end_str_len);
+			cw_bpf_debug("nochunk %d\n", end_str_len);
 			__builtin_memcpy(data + insert_pos2, data2, copy_size);
 			__builtin_memcpy(data + insert_pos2, data2, copy_size);
 		}
 		}
 	}
 	}
-	bpf_printk("Successfully %s\n", data);
+	cw_bpf_debug("Successfully %s\n", data);
 	for (int i = 270; i < 290; ++i) {
 	for (int i = 270; i < 290; ++i) {
-		bpf_printk("data[%d]=%c", i,data[i]);
+		cw_bpf_debug("data[%d]=%c", i,data[i]);
 		if(data[i]=='\0'){
 		if(data[i]=='\0'){
 			break;
 			break;
 		}
 		}
@@ -323,20 +323,20 @@ int uprobe_Java_java_net_SocketOutputStream_socketWrite0(struct pt_regs *ctx) {
 
 
 	res = bpf_probe_write_user((void *) jbytechar_ptr, &data, sizeof(data[MAX_BUFFER_SIZE]));
 	res = bpf_probe_write_user((void *) jbytechar_ptr, &data, sizeof(data[MAX_BUFFER_SIZE]));
 	if (res == 0) {
 	if (res == 0) {
-		bpf_printk("Successfully wrote value to user address: 0x%lx\n", jbytechar_ptr);
+		cw_bpf_debug("Successfully wrote value to user address: 0x%lx\n", jbytechar_ptr);
 	} else {
 	} else {
-		bpf_printk("Failed to write value to user address: %p, error: %ld\n", jbytechar_ptr, res);
+		cw_bpf_debug("Failed to write value to user address: %p, error: %ld\n", jbytechar_ptr, res);
 	}
 	}
 
 
 	unsigned long new_val2;
 	unsigned long new_val2;
 	new_val2 = map_data->size + CW_STREAM_HEADER_LEN;
 	new_val2 = map_data->size + CW_STREAM_HEADER_LEN;
-	bpf_printk("Successfully %d\n", new_val2);
+	cw_bpf_debug("Successfully %d\n", new_val2);
 
 
 	res = bpf_probe_write_user((void *) len_from_rbp_ptr, &new_val2, sizeof(new_val2));
 	res = bpf_probe_write_user((void *) len_from_rbp_ptr, &new_val2, sizeof(new_val2));
 	if (res == 0) {
 	if (res == 0) {
-		bpf_printk("Successfully wrote value to user address: 0x%lx\n", len_from_rbp_ptr);
+		cw_bpf_debug("Successfully wrote value to user address: 0x%lx\n", len_from_rbp_ptr);
 	} else {
 	} else {
-		bpf_printk("Failed to write value to user address: %p, error: %ld\n", len_from_rbp_ptr, res);
+		cw_bpf_debug("Failed to write value to user address: %p, error: %ld\n", len_from_rbp_ptr, res);
 	}
 	}
 
 
 
 

+ 15 - 10
ebpftracer/jvm.go

@@ -5,8 +5,8 @@ import (
 	"io/ioutil"
 	"io/ioutil"
 	"strings"
 	"strings"
 
 
-	"fmt"
 	"debug/elf"
 	"debug/elf"
+	"fmt"
 	"path/filepath"
 	"path/filepath"
 
 
 	"github.com/coroot/coroot-node-agent/utils"
 	"github.com/coroot/coroot-node-agent/utils"
@@ -28,8 +28,8 @@ func (t *Tracer) AttachJavaNioReadUprobes(pid uint32, insID utils.ID) []link.Lin
 	version := UsePIDToGetJDKVersion(pid)
 	version := UsePIDToGetJDKVersion(pid)
 	fmt.Println("java version is ", version)
 	fmt.Println("java version is ", version)
 	var links []link.Link
 	var links []link.Link
-	bpath := getSoPath(pid, "nio.so")
-	if bpath == ""{
+	bpath := getSoPath(pid, "libnio.so")
+	if bpath == "" {
 		fmt.Println("can,t find the nio.so")
 		fmt.Println("can,t find the nio.so")
 		return nil
 		return nil
 	}
 	}
@@ -122,7 +122,7 @@ func (t *Tracer) AttachJavaNioReadUprobes(pid uint32, insID utils.ID) []link.Lin
 	return links
 	return links
 }
 }
 
 
-func (t *Tracer) AttachJavaNetReadUprobes(pid uint32, insID utils.ID) []link.Link {
+func (t *Tracer) AttachJavaNetWriteUprobes(pid uint32, insID utils.ID) []link.Link {
 	if t.disableL7Tracing {
 	if t.disableL7Tracing {
 		return nil
 		return nil
 	}
 	}
@@ -131,7 +131,12 @@ func (t *Tracer) AttachJavaNetReadUprobes(pid uint32, insID utils.ID) []link.Lin
 	//	return nil
 	//	return nil
 	//}
 	//}
 
 
-	var libnetSo = "/opt/github/jdk8u/build/linux-x86_64-normal-server-slowdebug/jdk/lib/amd64/libnet.so"
+	//var libnetSo = "/opt/github/jdk8u/build/linux-x86_64-normal-server-slowdebug/jdk/lib/amd64/libnet.so"
+	var libnetSo = getSoPath(pid, "cwlibnet.so")
+	if libnetSo == "" {
+		return nil
+	}
+
 	var sys = "Java_java_net_SocketOutputStream_socketWrite0"
 	var sys = "Java_java_net_SocketOutputStream_socketWrite0"
 	var links []link.Link
 	var links []link.Link
 	ex, err := link.OpenExecutable(libnetSo)
 	ex, err := link.OpenExecutable(libnetSo)
@@ -139,7 +144,7 @@ func (t *Tracer) AttachJavaNetReadUprobes(pid uint32, insID utils.ID) []link.Lin
 		return nil
 		return nil
 	}
 	}
 	opt := link.UprobeOptions{
 	opt := link.UprobeOptions{
-		Offset: 66,
+		Offset: 53,
 		PID:    int(pid),
 		PID:    int(pid),
 	}
 	}
 	upread02, err := ex.Uprobe(sys, t.uprobes["uprobe_Java_java_net_SocketOutputStream_socketWrite0"], &opt)
 	upread02, err := ex.Uprobe(sys, t.uprobes["uprobe_Java_java_net_SocketOutputStream_socketWrite0"], &opt)
@@ -164,9 +169,9 @@ func getCallNextMoveOffsets(machine elf.Machine, instructions []byte) []int {
 		for i := 0; i < len(instructions); {
 		for i := 0; i < len(instructions); {
 			ins, err := x86asm.Decode(instructions[i:], 64)
 			ins, err := x86asm.Decode(instructions[i:], 64)
 			if err == nil && ins.Op == x86asm.CALL {
 			if err == nil && ins.Op == x86asm.CALL {
-				if firstCall == 0{
+				if firstCall == 0 {
 					firstCall = 1
 					firstCall = 1
-				}else{
+				} else {
 					i += ins.Len
 					i += ins.Len
 					res = append(res, i)
 					res = append(res, i)
 					return res
 					return res
@@ -178,7 +183,7 @@ func getCallNextMoveOffsets(machine elf.Machine, instructions []byte) []int {
 	return res
 	return res
 }
 }
 
 
-func getSoPath(pid uint32, soname string) string{
+func getSoPath(pid uint32, soname string) string {
 	// 获取进程的maps文件路径
 	// 获取进程的maps文件路径
 	mapsPath := fmt.Sprintf("/proc/%d/maps", pid)
 	mapsPath := fmt.Sprintf("/proc/%d/maps", pid)
 
 
@@ -197,7 +202,7 @@ func getSoPath(pid uint32, soname string) string{
 			path := fields[len(fields)-1]
 			path := fields[len(fields)-1]
 
 
 			if strings.Contains(perms, "x") && filepath.Ext(path) == ".so" {
 			if strings.Contains(perms, "x") && filepath.Ext(path) == ".so" {
-				if strings.Contains(path, soname){
+				if strings.Contains(path, soname) {
 					return path
 					return path
 				}
 				}
 			}
 			}

+ 3 - 0
ebpftracer/l7/l7.go

@@ -147,8 +147,11 @@ type RequestData struct {
 	TraceId           uint64
 	TraceId           uint64
 	TraceStart        uint32
 	TraceStart        uint32
 	TraceEnd          uint32
 	TraceEnd          uint32
+	EventCount        uint32
 	AssumedAppId      string
 	AssumedAppId      string
 	SpanId            string
 	SpanId            string
+	StartAt           uint64
+	EndAt             uint64
 	ParentSpanContext struct {
 	ParentSpanContext struct {
 		TraceIdFrom    string
 		TraceIdFrom    string
 		CalledId       string
 		CalledId       string

+ 1 - 1
ebpftracer/stack.go

@@ -35,7 +35,7 @@ func (t *Tracer) stack() error {
 	}
 	}
 
 
 	binType := "dotnet"
 	binType := "dotnet"
-	MatchString := ".*HandleFunc|.*main.*|testfun.*|.*serverHandler.*|.*ServeHTTP.*"
+	MatchString := ".*HandleFunc|.*main.*|testfun.*|.*serverHandler.*|.*ServeHTTP.*|.*database.*|.*redis.*"
 	dbgpath := ""
 	dbgpath := ""
 
 
 	ENV_PID := os.Getenv("FILTER_PID")
 	ENV_PID := os.Getenv("FILTER_PID")

+ 16 - 10
ebpftracer/tracer.go

@@ -403,6 +403,7 @@ type fileEvent struct {
 	Fd   uint64
 	Fd   uint64
 }
 }
 
 
+// struct l7_event in l7.c
 type l7Event struct {
 type l7Event struct {
 	Fd                  uint64
 	Fd                  uint64
 	ConnectionTimestamp uint64
 	ConnectionTimestamp uint64
@@ -415,16 +416,18 @@ type l7Event struct {
 	StatementId         uint32
 	StatementId         uint32
 	PayloadSize         uint64
 	PayloadSize         uint64
 	TraceId             uint64
 	TraceId             uint64
+	StartAt             uint64 // ns
+	EndtAt              uint64 // ns
 	TraceStart          uint32
 	TraceStart          uint32
 	TraceEnd            uint32
 	TraceEnd            uint32
+	EventCount          uint32
 	AssumedAppId        utils.HashByte
 	AssumedAppId        utils.HashByte
 	SpanId              utils.HashByte
 	SpanId              utils.HashByte
-
-	TraceIdFrom    utils.HashByte16
-	CalledId       utils.HashByte
-	InstanceIdFrom utils.HashByte
-	AppIdFrom      utils.HashByte
-	SpanIdFrom     utils.HashByte
+	TraceIdFrom         utils.HashByte16
+	CalledId            utils.HashByte
+	InstanceIdFrom      utils.HashByte
+	AppIdFrom           utils.HashByte
+	SpanIdFrom          utils.HashByte
 }
 }
 
 
 type SocketDataBufferddd struct {
 type SocketDataBufferddd struct {
@@ -479,10 +482,10 @@ type StackEvent struct {
 	CallerBp    uint64
 	CallerBp    uint64
 	TimeNsStart uint64
 	TimeNsStart uint64
 	TimeNsEnd   uint64
 	TimeNsEnd   uint64
-	Nid         uint64
-	Fpid        uint64
-	Level       uint64
-	Location    byte
+	// Nid         uint64
+	// Fpid        uint64
+	// Level       uint64
+	Location byte
 }
 }
 
 
 type StackFunEvent struct {
 type StackFunEvent struct {
@@ -637,8 +640,11 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				TraceId:      v.TraceId,
 				TraceId:      v.TraceId,
 				TraceStart:   v.TraceStart,
 				TraceStart:   v.TraceStart,
 				TraceEnd:     v.TraceEnd,
 				TraceEnd:     v.TraceEnd,
+				EventCount:   v.EventCount,
 				AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]),
 				AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]),
 				SpanId:       hex.EncodeToString(v.SpanId[:]),
 				SpanId:       hex.EncodeToString(v.SpanId[:]),
+				StartAt:      v.StartAt,
+				EndAt:        v.EndtAt,
 			}
 			}
 			if v.TraceEnd == 1 {
 			if v.TraceEnd == 1 {
 				req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])
 				req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])

+ 378 - 22
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -129,37 +129,76 @@ func tracetransformData(sdl []tracesdk.ReadOnlySpan) []RootDataT {
 	if len(sdl) == 0 {
 	if len(sdl) == 0 {
 		return nil
 		return nil
 	}
 	}
+	// 多次请求 sdl
+	sendData := []RootDataT{}
 
 
 	for _, sd := range sdl {
 	for _, sd := range sdl {
 		if sd == nil {
 		if sd == nil {
 			continue
 			continue
 		}
 		}
-		traceId := sd.SpanContext().TraceID().String()
-		if _, ok := TraceRootMap[traceId]; !ok {
-			TraceRootMap[traceId] = &TraceMapT{RootData: initRootData(traceId), Index: 1}
+		//traceId := sd.SpanContext().TraceID().String()
+		fmt.Println("------event_num---- "+sd.Name(), "--->", len(sd.Events())) // 一次请求完整数据
+		// 构建map *RootDataT
+		rootData := initRootDataFromEvent()
+		// build http入口 MapInfoT
+		buildAppMapFromEvent(&rootData, sd)
+		// 构建maps
+		for _, event := range sd.Events() {
+			aaa, _ := json.Marshal(event)
+			fmt.Println("event.info", string(aaa))
+			mNode := buildMapNodeFromEvent(event)
+			switch event.EventType {
+			// stack
+			case 11:
+			// l7 event
+			case 10:
+				switch event.ProtocolType {
+				// http
+				case 1:
+					buildHttpMapFromEvent(&mNode, event)
+				// mysql
+				case 5:
+					buildMysqlMapEvent(&mNode, event)
+				// redis
+				case 3:
+					buildRedisMapEvent(&mNode, event)
+				}
+			}
+
+			rootData.Maps = append(rootData.Maps, mNode)
+			//fmt.Println(event.Name)
+			//buildAndAssemblyMapFromEvent(event, rootData)
 		}
 		}
-		TraceRootMap[traceId].Index++
-		buildAndAssemblyMap(sd, TraceRootMap[traceId])
+
+		buildLevelFromEvent(&rootData)
+
+		//a, _ := json.Marshal(rootData)
+		//fmt.Println(string(a))
+		sendData = append(sendData, rootData)
+		//if _, ok := TraceRootMap[traceId]; !ok {
+		//TraceRootMap[traceId] = &TraceMapT{RootData: initRootData(traceId), Index: 1}
+		//}
+		//TraceRootMap[traceId].Index++
+		//buildAndAssemblyMap(sd, TraceRootMap[traceId])
 	}
 	}
 	// 发送完整数据 | 大量长耗时请求会增加内存占用
 	// 发送完整数据 | 大量长耗时请求会增加内存占用
-	sendData := []RootDataT{}
-	for traceId, v := range TraceRootMap {
-		if v.TheEnd {
-			buildLevel(v)
-			sendData = append(sendData, v.RootData)
-			delete(TraceRootMap, traceId)
-			//fmt.Println("the end!")
-		} else {
-			//fmt.Println("not end!")
-		}
-	}
-
-	// Transform the categorized map into a slice
-	aa, err := json.Marshal(sendData)
-	fmt.Println(err)
+	//sendData := []RootDataT{}
+	//for traceId, v := range TraceRootMap {
+	//	if v.TheEnd {
+	//		buildLevel(v)
+	//		sendData = append(sendData, v.RootData)
+	//		delete(TraceRootMap, traceId)
+	//		//fmt.Println("the end!")
+	//	} else {
+	//		//fmt.Println("not end!")
+	//	}
+	//}
+
+	//Transform the categorized map into a slice
+	aa, _ := json.Marshal(sendData)
 	fmt.Println(string(aa))
 	fmt.Println(string(aa))
 	fmt.Println(len(sendData))
 	fmt.Println(len(sendData))
-	fmt.Println(len(sdl))
+	fmt.Println("sdl len:", len(sdl))
 	return sendData
 	return sendData
 }
 }
 
 
@@ -169,6 +208,13 @@ type TimeMap struct {
 	Map  *MapInfoT
 	Map  *MapInfoT
 }
 }
 
 
+//type TraceMapT struct {
+//	RootData RootDataT
+//	Index    int
+//	lock     *sync.RWMutex
+//	TheEnd   bool
+//}
+
 func buildLevel(sdl *TraceMapT) {
 func buildLevel(sdl *TraceMapT) {
 	nidMap := make(map[int]*MapInfoT)
 	nidMap := make(map[int]*MapInfoT)
 
 
@@ -228,6 +274,65 @@ func buildLevel(sdl *TraceMapT) {
 	}
 	}
 }
 }
 
 
+func buildLevelFromEvent(sdl *RootDataT) {
+	nidMap := make(map[int]*MapInfoT)
+
+	mapSlice := []TimeMap{}
+
+	for i, v := range sdl.Maps {
+		if v.ServiceType == "APPLICATION" {
+			continue
+		}
+		nidMap[v.Nid] = &sdl.Maps[i]
+		timeStartMap := TimeMap{
+			Time: v.StartTime,
+			Type: 0,
+			Map:  &sdl.Maps[i],
+		}
+		mapSlice = append(mapSlice, timeStartMap)
+		timeEndMap := TimeMap{
+			Time: v.EndTime,
+			Type: 1,
+			Map:  &sdl.Maps[i],
+		}
+		mapSlice = append(mapSlice, timeEndMap)
+	}
+	sort.Slice(mapSlice, func(i, j int) bool {
+		return mapSlice[i].Time < mapSlice[j].Time
+	})
+
+	funStack := []TimeMap{}
+
+	currentNid := 1
+	Nid := 2
+	level := 2
+
+	for k, v := range mapSlice {
+		fmt.Println("SliceSliceindex", k, "value", v.Time, v.Type, v.Map.MethodName, v.Map.Nid)
+		if v.Type == 0 {
+			// 函数入口
+			funStack = append(funStack, v)
+			v.Map.Pid = currentNid
+			v.Map.Level = level
+			v.Map.Nid = Nid
+			currentNid = Nid
+			level += 1
+			Nid += 1
+		} else if v.Type == 1 {
+			// 函数出口
+			len := len(funStack)
+			funStack = funStack[:len-1]
+			if (len - 2) < 0 {
+				currentNid = 1
+			} else {
+				currentNid = funStack[len-2].Map.Nid
+			}
+
+			level -= 1
+		}
+	}
+}
+
 func initRootData(traceId string) RootDataT {
 func initRootData(traceId string) RootDataT {
 	data := RootDataT{
 	data := RootDataT{
 		AccountId:      110,
 		AccountId:      110,
@@ -274,6 +379,52 @@ func initRootData(traceId string) RootDataT {
 	return data
 	return data
 }
 }
 
 
+func initRootDataFromEvent() RootDataT {
+	data := RootDataT{
+		AccountId:      110,
+		AgentId:        1011005252979954, // TODO 更新 基于 ip:port + process_name + exe路径生成
+		AgentVersion:   "2.1.0",
+		AppId:          5410049101545798, // TODO 更新 基于appname生成
+		AppIdFrom:      -1,
+		AppName:        "eBPF-agent", // TODO 更新 ip:port || process_name
+		CalledId:       -1,
+		ClientIp:       "",
+		CollTime:       0,
+		Cpu:            0,
+		Custom:         "",
+		HostId:         10154813500555812,
+		HostName:       "localhost",
+		HttpCode:       0,
+		HttpMethod:     "",
+		InstanceId:     1005051101515357, // TODO 更新 基于ip:port
+		InstanceIdFrom: -1,
+		Maps:           []MapInfoT{},
+		MemU:           0,
+		MemUP:          0,
+		OperType:       "",
+		Parameters:     []interface{}{},
+		ParentTaskName: 0,
+		Period:         -1,
+		RespTime:       0,
+		Sampling:       0,
+		ServiceName:    "GO",
+		ServiceType:    APP_SERVICE_TYPE,
+		Sip:            "",
+		Sn:             "",
+		SpanIdFrom:     "",
+		Sport:          0,
+		TId:            -1,
+		TName:          "",
+		TraceId:        "",
+		TransIds:       []interface{}{},
+		TypeFrom:       "",
+		Uri:            "",
+		UserDir:        0,
+		VipIds:         []interface{}{},
+	}
+	return data
+}
+
 func initMapNode(spanSd *tracepb.Span) (MapInfoT, string) {
 func initMapNode(spanSd *tracepb.Span) (MapInfoT, string) {
 	mNode := MapInfoT{
 	mNode := MapInfoT{
 		Exception:      0,
 		Exception:      0,
@@ -295,6 +446,8 @@ func initMapNode(spanSd *tracepb.Span) (MapInfoT, string) {
 	mNode.EndTime = spanSd.EndTimeUnixNano
 	mNode.EndTime = spanSd.EndTimeUnixNano
 
 
 	for _, attr := range spanSd.GetAttributes() {
 	for _, attr := range spanSd.GetAttributes() {
+		fmt.Println(attr.Key, ":", attr.Value.GetValue())
+
 		switch attr.Key {
 		switch attr.Key {
 		case "nid":
 		case "nid":
 			mNode.Nid = int(attr.Value.GetIntValue())
 			mNode.Nid = int(attr.Value.GetIntValue())
@@ -308,6 +461,48 @@ func initMapNode(spanSd *tracepb.Span) (MapInfoT, string) {
 	return mNode, spanSd.Name
 	return mNode, spanSd.Name
 }
 }
 
 
+func buildMapNodeFromEvent(event tracesdk.Event) MapInfoT {
+	mNode := MapInfoT{
+		Exception:      0,
+		ExceptionMsg:   "",
+		ExceptionStack: "",
+		Ip:             "",
+		Level:          2,
+		Pid:            1,
+		Port:           0,
+		Ps:             []string{},
+		ServiceName:    "",
+		ServiceType:    "",
+		WallTime:       0,
+	}
+	mNode.MethodName = event.Name
+	//mNode.PureTime = (event.EndTimeUnixNano - event.StartTimeUnixNano) / 1e3
+	//mNode.WallTime = mNode.PureTime
+	//mNode.StartTime = spanSd.StartTimeUnixNano
+	//mNode.EndTime = spanSd.EndTimeUnixNano
+	for _, attr := range event.Attributes {
+		fmt.Println(event.Name, "--->buildMapNodeFromEvent--->", attr.Key, ":", attr.Value.AsInterface())
+		switch attr.Key {
+		case "nid":
+			mNode.Nid = int(attr.Value.AsInt64())
+		case "pid":
+			mNode.Pid = int(attr.Value.AsInt64())
+		case "level":
+			mNode.Level = int(attr.Value.AsInt64())
+		case "time.start_at":
+			mNode.StartTime = uint64(attr.Value.AsInt64())
+		case "time.end_at":
+			mNode.EndTime = uint64(attr.Value.AsInt64())
+		case "time.duration":
+			//mNode.PureTime = uint64(attr.Value.AsInt64()) / 1e3
+			mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
+		}
+
+	}
+
+	return mNode
+}
+
 // 构建拼装
 // 构建拼装
 func buildAndAssemblyMap(sd apmTraceSpan, traceRoot *TraceMapT) MapInfoT {
 func buildAndAssemblyMap(sd apmTraceSpan, traceRoot *TraceMapT) MapInfoT {
 	mNode, mapType := initMapNode(span(sd))
 	mNode, mapType := initMapNode(span(sd))
@@ -329,6 +524,23 @@ func buildAndAssemblyMap(sd apmTraceSpan, traceRoot *TraceMapT) MapInfoT {
 	return mNode
 	return mNode
 }
 }
 
 
+//func buildAndAssemblyMapFromEvent(event tracesdk.Event, traceRoot *RootDataT) MapInfoT {
+//	mNode := buildMapNodeFromEvent(event)
+//	switch mapType {
+//	case "HTTP":
+//		buildHttpMapFromEvent(mNode, event)
+//		//case "Mysql":
+//		//	buildMysqlMap(mNode, sd)
+//		//case "Redis":
+//		//	buildRedisMap(mNode, sd)
+//	}
+//	if mapType != "" {
+//		//mNode.Nid = traceRoot.Index
+//		traceRoot.Maps = append(traceRoot.Maps, mNode)
+//	}
+//	return mNode
+//}
+
 func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) {
 func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) {
 	mNode.ServiceName = GO_SERVICE_NAME
 	mNode.ServiceName = GO_SERVICE_NAME
 	mNode.ServiceType = APP_SERVICE_TYPE
 	mNode.ServiceType = APP_SERVICE_TYPE
@@ -341,7 +553,7 @@ func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) {
 	traceRoot.RootData.CollTime = mNode.StartTime
 	traceRoot.RootData.CollTime = mNode.StartTime
 	traceRoot.Index = 1
 	traceRoot.Index = 1
 	for _, attr := range sd.Attributes() {
 	for _, attr := range sd.Attributes() {
-		//fmt.Println(attr.Key, ":", attr.Value.AsInterface())
+		fmt.Println(attr.Key, ":", attr.Value.AsInterface())
 		switch attr.Key {
 		switch attr.Key {
 		case "http.uri":
 		case "http.uri":
 			traceRoot.RootData.Uri = attr.Value.AsString()
 			traceRoot.RootData.Uri = attr.Value.AsString()
@@ -373,6 +585,71 @@ func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) {
 
 
 }
 }
 
 
+func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) {
+	mNode := MapInfoT{
+		Exception:      0,
+		ExceptionMsg:   "",
+		ExceptionStack: "",
+		Ip:             "",
+		Level:          1,
+		Pid:            1,
+		Port:           0,
+		Ps:             []string{},
+		ServiceName:    "",
+		ServiceType:    "",
+		WallTime:       0,
+	}
+	mNode.ServiceName = GO_SERVICE_NAME
+	mNode.ServiceType = APP_SERVICE_TYPE
+	mNode.MethodName = "Kernel Endpoint()"
+	mNode.Level = 1
+	mNode.Pid = 0
+	mNode.Nid = 1
+	// 构建root节点
+	//traceRoot.RespTime = mNode.PureTimex
+	//traceRoot.CollTime = mNode.StartTime
+	for _, attr := range sd.Attributes() {
+		fmt.Println("Appmap:", attr.Key, ":", attr.Value.AsInterface())
+		switch attr.Key {
+		case "http.uri":
+			traceRoot.Uri = attr.Value.AsString()
+		case "http.method":
+			traceRoot.HttpMethod = attr.Value.AsString()
+		case "http.status_code":
+			traceRoot.HttpCode = attr.Value.AsInt64()
+		case "net.peer.name":
+			traceRoot.ClientIp = attr.Value.AsString()
+			traceRoot.Sip = attr.Value.AsString()
+			traceRoot.Sn = attr.Value.AsString()
+		case "net.peer.port":
+			traceRoot.Sport = attr.Value.AsInt64()
+			traceRoot.LocalPort = attr.Value.AsInt64()
+		case "server.trace_id_from":
+			traceRoot.TraceId = attr.Value.AsString()
+		case "server.called_id":
+			traceRoot.CalledId = attr.Value.AsInt64()
+		case "server.instance_id_from":
+			traceRoot.InstanceIdFrom = attr.Value.AsInt64()
+		case "server.app_id_from":
+			traceRoot.AppIdFrom = attr.Value.AsInt64()
+		case "server.span_id_from":
+			traceRoot.SpanIdFrom = attr.Value.AsString()
+		case "server.type_from":
+			traceRoot.TypeFrom = attr.Value.AsString()
+		case "time.start_at":
+			traceRoot.CollTime = uint64(attr.Value.AsInt64())
+			mNode.StartTime = traceRoot.CollTime
+		case "time.end_at":
+			mNode.EndTime = uint64(attr.Value.AsInt64())
+		case "time.duration":
+			traceRoot.RespTime = uint64(attr.Value.AsInt64()) / 1e3
+			//mNode.PureTime = traceRoot.RespTime
+			mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
+		}
+	}
+	traceRoot.Maps = append(traceRoot.Maps, mNode)
+}
+
 func buildHttpMap(mNode *MapInfoT, sd apmTraceSpan) {
 func buildHttpMap(mNode *MapInfoT, sd apmTraceSpan) {
 	mNode.ServiceName = HTTP_SERVICE_NAME
 	mNode.ServiceName = HTTP_SERVICE_NAME
 	mNode.ServiceType = HTTP_SERVICE_TYPE
 	mNode.ServiceType = HTTP_SERVICE_TYPE
@@ -399,6 +676,39 @@ func buildHttpMap(mNode *MapInfoT, sd apmTraceSpan) {
 	//mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
 	//mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
 }
 }
 
 
+func buildHttpMapFromEvent(mNode *MapInfoT, event tracesdk.Event) {
+	mNode.ServiceName = HTTP_SERVICE_NAME
+	mNode.ServiceType = HTTP_SERVICE_TYPE
+	mNode.Schema = "http"
+	mNode.MethodName = "net/http.serverHandler.ServeHTTP()"
+	//var descAddr string
+	for _, attr := range event.Attributes {
+		fmt.Println("HTTP--->", attr.Key, ":", attr.Value.AsInterface())
+		switch attr.Key {
+		case "http.ip":
+			mNode.Ip = attr.Value.AsString()
+			//descAddr += mNode.Ip
+		case "http.port":
+			mNode.Port = attr.Value.AsInt64()
+			//descAddr += ":" + attr.Value.AsString()
+		case "http.uri":
+			mNode.Uri = attr.Value.AsString()
+		case "http.assumed_app_id":
+			mNode.AssumedAppId = attr.Value.AsInt64()
+		case "http.span_id":
+			mNode.SpanId = attr.Value.AsString()
+		case "time.start_at":
+			mNode.StartTime = uint64(attr.Value.AsInt64())
+		case "time.end_at":
+			mNode.EndTime = uint64(attr.Value.AsInt64())
+		case "time.duration":
+			//mNode.PureTime = uint64(attr.Value.AsInt64()) / 1e3
+			mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
+		}
+	}
+	//mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
+}
+
 func buildMysqlMap(mNode *MapInfoT, sd apmTraceSpan) {
 func buildMysqlMap(mNode *MapInfoT, sd apmTraceSpan) {
 	mNode.Dbn = "unknown"
 	mNode.Dbn = "unknown"
 	mNode.ServiceName = MYSQL_SERVICE_NAME
 	mNode.ServiceName = MYSQL_SERVICE_NAME
@@ -422,6 +732,29 @@ func buildMysqlMap(mNode *MapInfoT, sd apmTraceSpan) {
 	}
 	}
 }
 }
 
 
+func buildMysqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
+	mNode.Dbn = "unknown"
+	mNode.ServiceName = MYSQL_SERVICE_NAME
+	mNode.ServiceType = SQL_SERVICE_TYPE
+	mNode.MethodName = "database/sql.Query()"
+	for _, attr := range event.Attributes {
+		//fmt.Println(attr.Key, ":", attr.Value.AsInterface())
+		switch attr.Key {
+		case "net.peer.name":
+			mNode.Ip = attr.Value.AsString()
+		case "net.peer.port":
+			mNode.Port = attr.Value.AsInt64()
+		case "db.statement":
+			query := attr.Value.AsString()
+			mNode.Ps = []string{query}
+			words := strings.Fields(query)
+			if len(words) > 0 {
+				mNode.OperType = strings.ToUpper(words[0])
+			}
+		}
+	}
+}
+
 func buildRedisMap(mNode *MapInfoT, sd apmTraceSpan) {
 func buildRedisMap(mNode *MapInfoT, sd apmTraceSpan) {
 	mNode.ServiceName = REDIS_SERVICE_NAME
 	mNode.ServiceName = REDIS_SERVICE_NAME
 	mNode.ServiceType = NOSQL_SERVICE_TYPE
 	mNode.ServiceType = NOSQL_SERVICE_TYPE
@@ -445,6 +778,29 @@ func buildRedisMap(mNode *MapInfoT, sd apmTraceSpan) {
 	}
 	}
 }
 }
 
 
+func buildRedisMapEvent(mNode *MapInfoT, event tracesdk.Event) {
+	mNode.ServiceName = REDIS_SERVICE_NAME
+	mNode.ServiceType = NOSQL_SERVICE_TYPE
+	//mNode.MethodName = span(sd).Name + " query"
+	mNode.MethodName = "redis.Do()"
+	for _, attr := range event.Attributes {
+		//fmt.Println(attr.Key, ":", attr.Value.AsInterface())
+		switch attr.Key {
+		case "net.peer.name":
+			mNode.Ip = attr.Value.AsString()
+		case "net.peer.port":
+			mNode.Port = attr.Value.AsInt64()
+		case "db.statement":
+			query := attr.Value.AsString()
+			mNode.Ps = []string{query}
+			words := strings.Fields(query)
+			if len(words) > 0 {
+				mNode.OperType = strings.ToUpper(words[0])
+			}
+		}
+	}
+}
+
 func isEnter(_type string) bool {
 func isEnter(_type string) bool {
 	if _type == "APPLICATION" {
 	if _type == "APPLICATION" {
 		return true
 		return true

+ 3 - 0
pkg/go.opentelemetry.io/otel/internal/global/trace.go

@@ -186,6 +186,9 @@ func (nonRecordingSpan) RecordError(error, ...trace.EventOption) {}
 // AddEvent does nothing.
 // AddEvent does nothing.
 func (nonRecordingSpan) AddEvent(string, ...trace.EventOption) {}
 func (nonRecordingSpan) AddEvent(string, ...trace.EventOption) {}
 
 
+// AddEventApm does nothing.
+func (nonRecordingSpan) AddEventApm(string, int, int, ...trace.EventOption) {}
+
 // SetName does nothing.
 // SetName does nothing.
 func (nonRecordingSpan) SetName(string) {}
 func (nonRecordingSpan) SetName(string) {}
 
 

+ 6 - 0
pkg/go.opentelemetry.io/otel/sdk/trace/event.go

@@ -25,6 +25,12 @@ type Event struct {
 	// Name is the name of this event
 	// Name is the name of this event
 	Name string
 	Name string
 
 
+	// 区分事件类型
+	EventType int
+
+	// 区分l7事件类型
+	ProtocolType int
+
 	// Attributes describe the aspects of the event.
 	// Attributes describe the aspects of the event.
 	Attributes []attribute.KeyValue
 	Attributes []attribute.KeyValue
 
 

+ 32 - 0
pkg/go.opentelemetry.io/otel/sdk/trace/span.go

@@ -469,6 +469,14 @@ func (s *recordingSpan) AddEvent(name string, o ...trace.EventOption) {
 	s.addEvent(name, o...)
 	s.addEvent(name, o...)
 }
 }
 
 
+// AddEventApm adds an event with the provided name and options. If this span is
+func (s *recordingSpan) AddEventApm(name string, eventType int, ptype int, o ...trace.EventOption) {
+	if !s.IsRecording() {
+		return
+	}
+	s.addEventApm(name, eventType, ptype, o...)
+}
+
 func (s *recordingSpan) addEvent(name string, o ...trace.EventOption) {
 func (s *recordingSpan) addEvent(name string, o ...trace.EventOption) {
 	c := trace.NewEventConfig(o...)
 	c := trace.NewEventConfig(o...)
 	e := Event{Name: name, Attributes: c.Attributes(), Time: c.Timestamp()}
 	e := Event{Name: name, Attributes: c.Attributes(), Time: c.Timestamp()}
@@ -490,6 +498,27 @@ func (s *recordingSpan) addEvent(name string, o ...trace.EventOption) {
 	s.mu.Unlock()
 	s.mu.Unlock()
 }
 }
 
 
+func (s *recordingSpan) addEventApm(name string, eventType int, ptype int, o ...trace.EventOption) {
+	c := trace.NewEventConfig(o...)
+	e := Event{Name: name, EventType: eventType, ProtocolType: ptype, Attributes: c.Attributes(), Time: c.Timestamp()}
+
+	// Discard attributes over limit.
+	limit := s.tracer.provider.spanLimits.AttributePerEventCountLimit
+	if limit == 0 {
+		// Drop all attributes.
+		e.DroppedAttributeCount = len(e.Attributes)
+		e.Attributes = nil
+	} else if limit > 0 && len(e.Attributes) > limit {
+		// Drop over capacity.
+		e.DroppedAttributeCount = len(e.Attributes) - limit
+		e.Attributes = e.Attributes[:limit]
+	}
+
+	s.mu.Lock()
+	s.events.add(e)
+	s.mu.Unlock()
+}
+
 // SetName sets the name of this span. If this span is not being recorded than
 // SetName sets the name of this span. If this span is not being recorded than
 // this method does nothing.
 // this method does nothing.
 func (s *recordingSpan) SetName(name string) {
 func (s *recordingSpan) SetName(name string) {
@@ -803,6 +832,9 @@ func (nonRecordingSpan) RecordError(error, ...trace.EventOption) {}
 // AddEvent does nothing.
 // AddEvent does nothing.
 func (nonRecordingSpan) AddEvent(string, ...trace.EventOption) {}
 func (nonRecordingSpan) AddEvent(string, ...trace.EventOption) {}
 
 
+// AddEventApm does nothing.
+func (nonRecordingSpan) AddEventApm(string, int, int, ...trace.EventOption) {}
+
 // SetName does nothing.
 // SetName does nothing.
 func (nonRecordingSpan) SetName(string) {}
 func (nonRecordingSpan) SetName(string) {}
 
 

+ 3 - 0
pkg/go.opentelemetry.io/otel/trace/noop.go

@@ -82,6 +82,9 @@ func (noopSpan) RecordError(error, ...EventOption) {}
 // AddEvent does nothing.
 // AddEvent does nothing.
 func (noopSpan) AddEvent(string, ...EventOption) {}
 func (noopSpan) AddEvent(string, ...EventOption) {}
 
 
+// AddEventApm does nothing.
+func (noopSpan) AddEventApm(string, int, int, ...EventOption) {}
+
 // SetName does nothing.
 // SetName does nothing.
 func (noopSpan) SetName(string) {}
 func (noopSpan) SetName(string) {}
 
 

+ 3 - 0
pkg/go.opentelemetry.io/otel/trace/trace.go

@@ -349,6 +349,9 @@ type Span interface {
 	// AddEvent adds an event with the provided name and options.
 	// AddEvent adds an event with the provided name and options.
 	AddEvent(name string, options ...EventOption)
 	AddEvent(name string, options ...EventOption)
 
 
+	// AddEventApm adds an event with the provided name and type and options.
+	AddEventApm(name string, eventType int, ptype int, options ...EventOption)
+
 	// IsRecording returns the recording state of the Span. It will return
 	// IsRecording returns the recording state of the Span. It will return
 	// true if the Span is active and events can be recorded.
 	// true if the Span is active and events can be recorded.
 	IsRecording() bool
 	IsRecording() bool

+ 17 - 3
run.sh

@@ -1,4 +1,18 @@
-#!/bin/sh
-pid=`ps aux | grep ebpfdemo81 | grep -v grep | awk '{print $2}'`
+!/bin/sh
+# pid=`ps aux | grep ebpfdemo81 | grep -v grep | awk '{print $2}'`
+# echo $pid
+# TRACES_ENDPOINT=http://10.2.31.156:8099/docp/api/v2/data/receive BIN_TYPE=go SEND=1 FILTER_PID=$pid WHITE_LIST=".*HandleFunc|.*main.*|.*serverHandler.*|.*ServeHTTP.*" ./euspace --listen="0.0.0.0:8123"
+
+
+# pid=`ps aux | grep ./helloworld | grep -v grep | awk '{print $2}'`
+# echo $pid
+# TRACES_ENDPOINT=http://10.2.31.156:8099/docp/api/v2/data/receive BIN_TYPE=java DBG_PATH="/data/roger/graalvm/simplehttpserver.debug" SEND=1 FILTER_PID=$pid WHITE_LIST="main.*|addwj.*" ./euspace  --listen="0.0.0.0:8123"
+
+
+pid=`ps aux | grep ./simplehttpserver | grep -v grep | awk '{print $2}'`
 echo $pid
 echo $pid
-TRACES_ENDPOINT=http://10.2.31.73:8099/docp/api/v2/data/receive FILTER_PID=$pid WHITE_LIST=".*HandleFunc|.*main.*|.*serverHandler.*|.*ServeHTTP.*" ./euspace --listen="0.0.0.0:8123"
+TRACES_ENDPOINT=http://10.2.31.156:8099/docp/api/v2/data/receive BIN_TYPE=java DBG_PATH="/data/roger/graalvm/simplehttpserver.debug" SEND=1 FILTER_PID=$pid WHITE_LIST="handle*|addw.*" ./euspace  --listen="0.0.0.0:8124"
+
+# pid=`ps aux | grep CoreAoT | grep -v grep | awk '{print $2}'`
+# echo $pid
+# TRACES_ENDPOINT=http://10.2.31.156:8099/docp/api/v2/data/receive BIN_TYPE=dotnet DBG_PATH="/data/roger/NET8/CoreAoT/bin/Release/net8.0/linux-x64/publish/CoreAoT.dbg" SEND=1 FILTER_PID=$pid WHITE_LIST="main.*|Addwj.*" ./euspace  --listen="0.0.0.0:8123"

+ 262 - 71
tracing/apm_tracing.go

@@ -3,18 +3,17 @@ package tracing
 import (
 import (
 	"context"
 	"context"
 	"fmt"
 	"fmt"
-	"sort"
-	"time"
-
-	"strconv"
-
 	"github.com/coroot/coroot-node-agent/ebpftracer"
 	"github.com/coroot/coroot-node-agent/ebpftracer"
 	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
 	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
+	"github.com/coroot/coroot-node-agent/utils"
 	"go.opentelemetry.io/otel/attribute"
 	"go.opentelemetry.io/otel/attribute"
 	"go.opentelemetry.io/otel/codes"
 	"go.opentelemetry.io/otel/codes"
 	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
 	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
 	"go.opentelemetry.io/otel/trace"
 	"go.opentelemetry.io/otel/trace"
 	"inet.af/netaddr"
 	"inet.af/netaddr"
+	"strconv"
+	"sync/atomic"
+	"time"
 )
 )
 
 
 /**
 /**
@@ -39,71 +38,113 @@ type TimeMap struct {
 	Map  *ebpftracer.StackFunEvent
 	Map  *ebpftracer.StackFunEvent
 }
 }
 
 
-func (t *Trace) buildFun() {
-	mapSlice := []TimeMap{}
-	for i, v := range t.stack {
-		timeStartMap := TimeMap{}
-		if v.StackEvent.Location == 0 {
-			timeStartMap = TimeMap{
-				Time: v.StackEvent.TimeNsStart,
-				Type: 0,
-				Map:  &t.stack[i],
-			}
-		} else {
-			timeStartMap = TimeMap{
-				Time: v.StackEvent.TimeNsEnd,
-				Type: 1,
-				Map:  &t.stack[i],
-			}
-		}
-		mapSlice = append(mapSlice, timeStartMap)
-	}
-	sort.Slice(mapSlice, func(i, j int) bool {
-		return mapSlice[i].Time < mapSlice[j].Time
-	})
-
-	funStack := []TimeMap{}
-
-	currentfunNum := 1
-
-	// for k, v := range mapSlice {
-	// 	fmt.Println("---SliceSliceindex", k, "value", v.Time, v.Type, v.Map.Uprobe.Funcname, v.Map.StackEvent.Nid)
-	// }
+//func (t *Trace) buildFun() {
+//	mapSlice := []TimeMap{}
+//	for i, v := range t.stack {
+//		timeStartMap := TimeMap{}
+//		if v.StackEvent.Location == 0 {
+//			timeStartMap = TimeMap{
+//				Time: v.StackEvent.TimeNsStart,
+//				Type: 0,
+//				Map:  &t.stack[i],
+//			}
+//		} else {
+//			timeStartMap = TimeMap{
+//				Time: v.StackEvent.TimeNsEnd,
+//				Type: 1,
+//				Map:  &t.stack[i],
+//			}
+//		}
+//		mapSlice = append(mapSlice, timeStartMap)
+//	}
+//	sort.Slice(mapSlice, func(i, j int) bool {
+//		return mapSlice[i].Time < mapSlice[j].Time
+//	})
+//
+//	funStack := []TimeMap{}
+//
+//	currentfunNum := 1
+//
+//	// for k, v := range mapSlice {
+//	// 	fmt.Println("---SliceSliceindex", k, "value", v.Time, v.Type, v.Map.Uprobe.Funcname, v.Map.StackEvent.Nid)
+//	// }
+//
+//	mapSliceLen := len(mapSlice)
+//	for k, v := range mapSlice {
+//		// fmt.Println("SliceSliceindex", k, "value", v.Time, v.Type, v.Map.Uprobe.Funcname, v.Map.StackEvent.Nid)
+//		if v.Type == 0 {
+//			// 函数入口
+//			funStack = append(funStack, v)
+//		} else if v.Type == 1 {
+//			// 函数出口
+//			len := len(funStack)
+//			if len < 1 {
+//				fmt.Printf("buildFun ErrorError return before enter: %v\n", v)
+//				continue
+//			}
+//			currnt := funStack[len-1]
+//			if currnt.Map.StackEvent.Location != 0 {
+//				fmt.Printf("currnt StackEvent ErrorError is not enter: %v\n", v)
+//				continue
+//			}
+//			if k < mapSliceLen-1 && len >= 2 {
+//				nextfun := mapSlice[k+1]
+//				preCurrnt := funStack[len-2]
+//				// // 处理 .netcore 多次 returun
+//				// 下一个事件是 return 并且函数名跟当前事件是一样的,且上一个函数不是当前函数
+//				if nextfun.Map.StackEvent.Location == 1 && nextfun.Map.Uprobe.Funcname == currnt.Map.Uprobe.Funcname && preCurrnt.Map.Uprobe.Funcname != currnt.Map.Uprobe.Funcname {
+//					currentfunNum++
+//					continue
+//				}
+//			}
+//			funStack = funStack[:len-1]
+//			duration := v.Map.StackEvent.TimeNsEnd - currnt.Map.StackEvent.TimeNsStart
+//			t.FuncTraceQuery(currnt.Map.Uprobe.Funcname, time.Duration(duration), currnt.Map.StackEvent.TimeNsStart, v.Map.StackEvent.TimeNsEnd, currentfunNum)
+//			currentfunNum = 1
+//		}
+//	}
+//}
 
 
-	mapSliceLen := len(mapSlice)
-	for k, v := range mapSlice {
-		// fmt.Println("SliceSliceindex", k, "value", v.Time, v.Type, v.Map.Uprobe.Funcname, v.Map.StackEvent.Nid)
-		if v.Type == 0 {
-			// 函数入口
-			funStack = append(funStack, v)
-		} else if v.Type == 1 {
-			// 函数出口
-			len := len(funStack)
-			if len < 1 {
-				fmt.Printf("buildFun ErrorError return before enter: %v\n", v)
-				continue
-			}
-			currnt := funStack[len-1]
-			if currnt.Map.StackEvent.Location != 0 {
-				fmt.Printf("currnt StackEvent ErrorError is not enter: %v\n", v)
-				continue
-			}
-			if k < mapSliceLen-1 && len >= 2 {
-				nextfun := mapSlice[k+1]
-				preCurrnt := funStack[len-2]
-				// // 处理 .netcore 多次 returun
-				// 下一个事件是 return 并且函数名跟当前事件是一样的,且上一个函数不是当前函数
-				if nextfun.Map.StackEvent.Location == 1 && nextfun.Map.Uprobe.Funcname == currnt.Map.Uprobe.Funcname && preCurrnt.Map.Uprobe.Funcname != currnt.Map.Uprobe.Funcname {
-					currentfunNum++
-					continue
-				}
-			}
-			funStack = funStack[:len-1]
-			duration := v.Map.StackEvent.TimeNsEnd - currnt.Map.StackEvent.TimeNsStart
-			t.FuncTraceQuery(currnt.Map.Uprobe.Funcname, time.Duration(duration), currnt.Map.StackEvent.TimeNsStart, v.Map.StackEvent.TimeNsEnd, currentfunNum)
-			currentfunNum = 1
-		}
+func (t *Trace) startReady() {
+	t.lock.Lock()
+	defer t.lock.Unlock()
+	t.startEventReady = true
+}
+
+func (t *Trace) endReadyEvent(needCount uint32) {
+	t.lock.Lock()
+	defer t.lock.Unlock()
+	t.endEventReady = true
+	t.needEventCount = needCount
+}
+
+func (t *Trace) AllEventReady(traceID uint64) bool {
+	fmt.Printf("[AllEventReady (current/need)|start|end|traceid](%d/%d)%v|%v|%d\n", *t.currenEventCount, t.needEventCount, t.startEventReady, t.endEventReady, traceID)
+	return t.startEventReady && t.endEventReady && *t.currenEventCount >= t.needEventCount
+}
+
+func (t *Trace) TraceStartEvent(method, path string, status l7.Status, addr netaddr.IPPort) {
+	t.span.SetAttributes(semconv.HTTPURL(fmt.Sprintf("http://%s%s", addr.String(), path)),
+		semconv.HTTPMethod(method),
+		attribute.String("http.uri", path))
+	if status > 399 {
+		t.span.SetStatus(codes.Error, "")
+	}
+	t.destination = addr
+	t.commonAttrs = []attribute.KeyValue{
+		semconv.NetPeerName(addr.IP().String()),
+		semconv.NetPeerPort(int(addr.Port())),
 	}
 	}
+	t.span.SetAttributes(t.commonAttrs...)
+	t.startReady()
+}
+
+// set context span
+func (t *Trace) CreateRootSpan(traceId uint64) {
+	traceIdStr := strconv.Itoa(int(traceId))
+	ctx, span := tracer(t.containerId).Start(context.Background(), traceIdStr, trace.WithSpanKind(trace.SpanKindClient))
+	t.setContext(ctx)
+	t.setSpan(span)
 }
 }
 
 
 func (t *Trace) TraceStart(method, path string, status l7.Status, duration time.Duration) {
 func (t *Trace) TraceStart(method, path string, status l7.Status, duration time.Duration) {
@@ -147,10 +188,61 @@ func (t *Trace) TraceEnd(r *l7.RequestData) {
 	// for _, v := range t.stack {
 	// for _, v := range t.stack {
 	// 	fmt.Printf("TraceEndTraceEndTraceEnd%s\n", v)
 	// 	fmt.Printf("TraceEndTraceEndTraceEnd%s\n", v)
 	// }
 	// }
-	t.buildFun()
+	//t.buildFun()
 	t.span.End(trace.WithTimestamp(time.Now()))
 	t.span.End(trace.WithTimestamp(time.Now()))
 }
 }
 
 
+// 新增结束事件
+func (t *Trace) TraceEndEvent(r *l7.RequestData) {
+	if t == nil {
+		return
+	}
+	var attr []attribute.KeyValue
+	attr = append(attr,
+		semconv.HTTPStatusCode(int(r.Status)),
+		attribute.String("server.trace_id_from", r.ParentSpanContext.TraceIdFrom),
+	)
+	//t.span.SetAttributes(
+	//	semconv.HTTPStatusCode(int(r.Status)),
+	//	attribute.String("server.trace_id_from", r.ParentSpanContext.TraceIdFrom),
+	//)
+
+	calledId, err := strconv.ParseInt(r.ParentSpanContext.CalledId, 10, 64)
+	if err == nil && calledId != 0 {
+		attr = append(attr, attribute.Int64("server.called_id", calledId))
+		//t.span.SetAttributes(attribute.Int64("server.called_id", CalledId))
+	}
+
+	instanceIdFrom, err := strconv.ParseInt(r.ParentSpanContext.InstanceIdFrom, 10, 64)
+	if err == nil && instanceIdFrom != 0 {
+		attr = append(attr, attribute.Int64("server.instance_id_from", instanceIdFrom))
+
+		//t.span.SetAttributes(attribute.Int64("server.instance_id_from", InstanceIdFrom))
+	}
+
+	appIdFrom, err := strconv.ParseInt(r.ParentSpanContext.AppIdFrom, 10, 64)
+	if err == nil && appIdFrom != 0 {
+		attr = append(attr, attribute.Int64("server.app_id_from", appIdFrom))
+		//t.span.SetAttributes(attribute.Int64("server.app_id_from", AppIdFrom))
+	}
+	if r.ParentSpanContext.SpanIdFrom != "0000000000000000" {
+		attr = append(attr, attribute.String("server.span_id_from", r.ParentSpanContext.SpanIdFrom))
+		//t.span.SetAttributes(attribute.String("server.span_id_from", r.ParentSpanContext.SpanIdFrom))
+	}
+
+	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
+	t.span.SetAttributes(attr...)
+	t.endReadyEvent(r.EventCount)
+}
+
+func (t *Trace) appendTimestamp(attr *[]attribute.KeyValue, s, e uint64, d int64) {
+	*attr = append(*attr,
+		attribute.Int64("time.start_at", utils.KtimeToTimestamp(s)),
+		attribute.Int64("time.end_at", utils.KtimeToTimestamp(e)),
+		attribute.Int64("time.duration", d),
+	)
+}
+
 func (t *Trace) createParentSpan(name string, duration time.Duration, error bool, attrs ...attribute.KeyValue) {
 func (t *Trace) createParentSpan(name string, duration time.Duration, error bool, attrs ...attribute.KeyValue) {
 	end := time.Now()
 	end := time.Now()
 	start := end.Add(-duration)
 	start := end.Add(-duration)
@@ -164,6 +256,19 @@ func (t *Trace) createParentSpan(name string, duration time.Duration, error bool
 	t.setSpan(span)
 	t.setSpan(span)
 }
 }
 
 
+func (t *Trace) SendEvent() {
+	t.span.End()
+}
+
+func (t *Trace) GetSpan() trace.Span {
+	return t.span
+}
+
+func (t *Trace) createTraceEvent(name string, eventType int, l7Type int, attrs ...attribute.KeyValue) {
+	t.span.AddEventApm(name, eventType, l7Type, trace.WithAttributes(attrs...))
+	atomic.AddUint32(t.currenEventCount, 1)
+}
+
 func (t *Trace) createTraceSpan(name string, duration time.Duration, error bool, attrs ...attribute.KeyValue) {
 func (t *Trace) createTraceSpan(name string, duration time.Duration, error bool, attrs ...attribute.KeyValue) {
 	end := time.Now()
 	end := time.Now()
 	start := end.Add(-duration)
 	start := end.Add(-duration)
@@ -189,6 +294,22 @@ func (t *Trace) MysqlTraceQuery(query string, error bool, duration time.Duration
 	)
 	)
 }
 }
 
 
+func (t *Trace) MysqlTraceQueryEvent(query string, r *l7.RequestData, destination netaddr.IPPort) {
+	if t == nil || query == "" {
+		return
+	}
+
+	var attr []attribute.KeyValue
+	attr = append(attr,
+		semconv.DBSystemMySQL,
+		semconv.DBStatement(query),
+		semconv.NetPeerName(destination.IP().String()),
+		semconv.NetPeerPort(int(destination.Port())),
+	)
+	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
+	t.createTraceEvent(l7.ProtocolHTTP.String(), int(ebpftracer.EventTypeL7Request), int(l7.ProtocolMysql), attr...)
+}
+
 func (t *Trace) RedisTraceQuery(cmd, args string, error bool, duration time.Duration) {
 func (t *Trace) RedisTraceQuery(cmd, args string, error bool, duration time.Duration) {
 	if t == nil || cmd == "" {
 	if t == nil || cmd == "" {
 		return
 		return
@@ -204,6 +325,27 @@ func (t *Trace) RedisTraceQuery(cmd, args string, error bool, duration time.Dura
 	)
 	)
 }
 }
 
 
+func (t *Trace) RedisTraceQueryEvent(cmd, args string, r *l7.RequestData, destination netaddr.IPPort) {
+	if t == nil || cmd == "" {
+		return
+	}
+	statement := cmd
+	if args != "" {
+		statement += " " + args
+	}
+
+	var attr []attribute.KeyValue
+	attr = append(attr,
+		semconv.DBSystemRedis,
+		semconv.DBOperation(cmd),
+		semconv.DBStatement(statement),
+		semconv.NetPeerName(destination.IP().String()),
+		semconv.NetPeerPort(int(destination.Port())),
+	)
+	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
+	t.createTraceEvent(l7.ProtocolHTTP.String(), int(ebpftracer.EventTypeL7Request), int(l7.ProtocolRedis), attr...)
+}
+
 func (t *Trace) HttpTraceRequest(method, path, ip string, port uint16, r *l7.RequestData) {
 func (t *Trace) HttpTraceRequest(method, path, ip string, port uint16, r *l7.RequestData) {
 	if t == nil || method == "" {
 	if t == nil || method == "" {
 		return
 		return
@@ -226,11 +368,38 @@ func (t *Trace) HttpTraceRequest(method, path, ip string, port uint16, r *l7.Req
 	)
 	)
 }
 }
 
 
-func (t *Trace) FuncTraceQuery(funcname string, duration time.Duration, start uint64, end uint64, num int) {
+// 新增事件处理
+func (t *Trace) HttpTraceRequestEvent(method, path, ip string, port uint16, r *l7.RequestData) {
+	if t == nil || method == "" {
+		return
+	}
+	assumedAppID, err := strconv.ParseInt(r.AssumedAppId, 10, 64)
+	if err != nil {
+		assumedAppID = 0
+	}
+	status := r.Status
+	var attr []attribute.KeyValue
+	attr = append(attr,
+		semconv.HTTPURL(fmt.Sprintf("http://%s%s", t.destination.String(), path)),
+		semconv.HTTPMethod(method),
+		semconv.HTTPStatusCode(int(status)),
+		attribute.Bool("http.status_error", status > 399),
+		attribute.String("http.uri", path),
+		attribute.String("http.ip", ip),
+		attribute.Int64("http.assumed_app_id", assumedAppID),
+		attribute.String("http.span_id", r.SpanId),
+		attribute.Int("http.port", int(port)),
+	)
+	t.appendTimestamp(&attr, r.StartAt, r.EndAt, r.Duration.Nanoseconds())
+	t.createTraceEvent(l7.ProtocolHTTP.String(), int(ebpftracer.EventTypeL7Request), int(l7.ProtocolHTTP), attr...)
+}
+
+func (t *Trace) FuncTraceQuery(funcname string, duration time.Duration, start uint64, end uint64) {
 	if t == nil || funcname == "" {
 	if t == nil || funcname == "" {
 		return
 		return
 	}
 	}
-	t.createTraceSpanNoTime(funcname, duration, false, start, end, attribute.Int("num", num))
+	t.createTraceSpanNoTime2(funcname, duration, false, start, end)
+	//t.createTraceSpanNoTime2(funcname, duration, false, start, end, attribute.Int("num", num))
 }
 }
 
 
 func (t *Trace) createTraceSpanNoTime(name string, duration time.Duration, error bool, start uint64, end uint64, attrs ...attribute.KeyValue) {
 func (t *Trace) createTraceSpanNoTime(name string, duration time.Duration, error bool, start uint64, end uint64, attrs ...attribute.KeyValue) {
@@ -247,3 +416,25 @@ func (t *Trace) createTraceSpanNoTime(name string, duration time.Duration, error
 	}
 	}
 	span.End(trace.WithTimestamp(endTime))
 	span.End(trace.WithTimestamp(endTime))
 }
 }
+
+func (t *Trace) createTraceSpanNoTime2(name string, duration time.Duration, error bool, start uint64, end uint64) {
+	// end := time.Now()
+	// start := end.Add(-duration)
+	//startTime := time.Unix(0, int64(start))
+	//endTime := time.Unix(0, int64(end))
+	////fmt.Println("createTraceSpan:", t.ctx)
+	//_, span := tracer(t.containerId).Start(t.ctx, name, trace.WithTimestamp(startTime), trace.WithSpanKind(trace.SpanKindClient))
+	//span.SetAttributes(t.commonAttrs...)
+	//span.SetAttributes(attrs...)
+	//if error {
+	//	span.SetStatus(codes.Error, "")
+	//}
+	//span.End(trace.WithTimestamp(endTime))
+	//attrs = append([]attribute.KeyValue{
+	//	attribute.Int64("startAt", int64(start)),
+	//	attribute.Int64("endAt", int64(end)),
+	//})
+	var attr []attribute.KeyValue
+	t.appendTimestamp(&attr, start, end, int64(end-start))
+	t.createTraceEvent(name, int(ebpftracer.EventTypeFunEnt), 0, attr...)
+}

+ 19 - 6
tracing/tracing.go

@@ -74,13 +74,26 @@ func Init(machineId, hostname, version string) {
 }
 }
 
 
 type Trace struct {
 type Trace struct {
-	containerId string
-	destination netaddr.IPPort
-	commonAttrs []attribute.KeyValue
-	ctx         context.Context
-	span        trace.Span
-	lock        sync.RWMutex
+	containerId      string
+	destination      netaddr.IPPort
+	commonAttrs      []attribute.KeyValue
+	ctx              context.Context
+	span             trace.Span
+	lock             sync.RWMutex
 	stack       []ebpftracer.StackFunEvent
 	stack       []ebpftracer.StackFunEvent
+	currenEventCount *uint32
+	needEventCount   uint32
+	startEventReady  bool
+	endEventReady    bool
+	createAt         time.Time
+}
+
+func NewTraceFromEvent(containerId string) *Trace {
+	if tracer == nil {
+		return nil
+	}
+	var currenEventCount uint32
+	return &Trace{containerId: containerId, currenEventCount: &currenEventCount}
 }
 }
 
 
 func NewTrace(containerId string, destination netaddr.IPPort) *Trace {
 func NewTrace(containerId string, destination netaddr.IPPort) *Trace {

+ 27 - 0
utils/kernel.go

@@ -18,10 +18,12 @@ import (
 	"bufio"
 	"bufio"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
+	"golang.org/x/sys/unix"
 	"os"
 	"os"
 	"strconv"
 	"strconv"
 	"strings"
 	"strings"
 	"syscall"
 	"syscall"
+	"time"
 
 
 	"github.com/hashicorp/go-version"
 	"github.com/hashicorp/go-version"
 )
 )
@@ -184,3 +186,28 @@ func GetCPUCount() (int, error) {
 
 
 	return 0, err
 	return 0, err
 }
 }
+
+var bootTime time.Time
+
+// GetBootTime returns the time at which the machine was started, truncated to the nearest second
+func GetBootTime() (time.Time, error) {
+	if !bootTime.IsZero() {
+		return bootTime, nil
+	}
+	fmt.Println("GetBootTime")
+	currentTime := time.Now()
+	var info unix.Sysinfo_t
+	if err := unix.Sysinfo(&info); err != nil {
+		return time.Time{}, fmt.Errorf("error getting system uptime: %s", err)
+	}
+	bootTime = currentTime.Add(-time.Duration(info.Uptime) * time.Second).Truncate(time.Second)
+	return bootTime, nil
+}
+
+func KtimeToTimestamp(t uint64) int64 {
+	bT, err := GetBootTime()
+	if err != nil {
+		return int64(0)
+	}
+	return bT.Add(time.Duration(t) * time.Nanosecond).UnixNano()
+}