Просмотр исходного кода

Merge branch 'dev-trace-header-roger' into 'dev-trace-header'

Fixed #TASK_GK-2944 Go 纵向堆栈

See merge request TSB/euspace!1
roger.wang 1 год назад
Родитель
Сommit
dc8d3c42d6

+ 8 - 3
containers/container.go

@@ -143,7 +143,10 @@ type Container struct {
 
 	done chan struct{}
 
-	traceMap map[uint64]*tracing.Trace
+	traceMap      map[uint64]*tracing.Trace
+	goEventStack  map[uint64]uint64
+	goEvents      map[uint64][]ebpftracer.StackFunEvent
+	goEventStacks map[uint64]map[uint64][]ebpftracer.StackFunEvent
 }
 
 func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32) (*Container, error) {
@@ -179,8 +182,10 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
 
 		hostConntrack: hostConntrack,
 
-		done:     make(chan struct{}),
-		traceMap: make(map[uint64]*tracing.Trace),
+		done:         make(chan struct{}),
+		traceMap:     make(map[uint64]*tracing.Trace),
+		goEventStack: map[uint64]uint64{},
+		goEvents:     map[uint64][]ebpftracer.StackFunEvent{},
 	}
 
 	for _, n := range md.networks {

+ 167 - 0
containers/container_apm.go

@@ -1,9 +1,18 @@
 package containers
 
 import (
+	"debug/elf"
 	"fmt"
+	"math/rand"
+	"sort"
+	"strconv"
+	"time"
+
+	"github.com/coroot/coroot-node-agent/ebpftracer"
 	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
+	"github.com/coroot/coroot-node-agent/ebpftracer/tracer"
 	"github.com/coroot/coroot-node-agent/tracing"
+	"github.com/pkg/errors"
 	"inet.af/netaddr"
 )
 
@@ -157,3 +166,161 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 	}
 	return nil
 }
+
+func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+	// get the associated uprobe
+	switch event.Location {
+	case 0: // ret
+		uprobe, err := c.GetUprobe(event, tracer)
+		if err != nil {
+			fmt.Println("GetUprobeGetUprobe errer: %v", err)
+			// log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
+			return
+		}
+
+		if event.TraceId <= 0 {
+			fmt.Println("StackProcess TraceId id 0")
+			// log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
+			return
+		}
+
+		fmt.Println("StackProcess 函数入口开始处理 fun:", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
+		apmTrace, ok := c.getTrace(event.TraceId)
+		if ok {
+			fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
+			duration := event.TimeNsEnd - event.TimeNsStart
+			apmTrace.FuncTraceQuery(uprobe.Funcname, time.Duration(duration), int(event.Level), int(event.Fpid), int(event.Nid))
+		}
+	case 2: // coroutine
+		fmt.Println("StackProcess 协程入口开始处理 fun:", event.TraceId, event.Goid, event.TimeNsEnd-event.TimeNsStart)
+		apmTrace, ok := c.getTrace(event.TraceId)
+		if ok {
+			fmt.Println("append FuncTraceQuery fun:", event.TraceId, "coroutine"+strconv.FormatUint(event.Goid, 10), event.Pid, event.Fpid)
+			duration := event.TimeNsEnd - event.TimeNsStart
+			apmTrace.FuncTraceQuery("coroutine"+strconv.FormatUint(event.Goid, 10), time.Duration(duration), int(event.Level), int(event.Fpid), int(event.Nid))
+		}
+	}
+}
+
+func (c *Container) StackProcessBak(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+	// get the associated uprobe
+	uprobe, err := c.GetUprobe(event, tracer)
+	if err != nil {
+		fmt.Println("GetUprobeGetUprobe errer: %v", err)
+		// log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
+		return
+	}
+
+	if event.TraceId <= 0 {
+		fmt.Println("StackProcess TraceId id 0")
+		// log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
+		return
+	}
+
+	length := len(c.goEventStacks[event.TraceId])
+	if length <= 0 {
+		c.goEventStacks = map[uint64]map[uint64][]ebpftracer.StackFunEvent{}
+		c.goEventStacks[event.TraceId] = map[uint64][]ebpftracer.StackFunEvent{}
+		c.goEventStacks[event.TraceId][event.Goid] = []ebpftracer.StackFunEvent{}
+	}
+
+	switch event.Location {
+	case 0: // entry
+		level := 100
+		pid := 100000 + event.Goid
+		length := len(c.goEventStacks[event.TraceId][event.Goid])
+		fmt.Println("StackProcess 函数入口开始处理 fun:", event.TraceId, uprobe.Funcname, length)
+		if length > 0 {
+			funEvent := c.goEventStacks[event.TraceId][event.Goid][length-1]
+			fmt.Println("funEvent goEventStacks fun:", event.TraceId, funEvent.Uprobe.Funcname, funEvent.Nid, funEvent.Level)
+			lastEvent := funEvent.StackEvent
+			if lastEvent.Location == event.Location && lastEvent.Ip == event.Ip && lastEvent.Bp != event.CallerBp {
+				// duplicated entry event due to stack expansion/shrinkage
+				// log.Debugf("duplicated entry event: %+v", event)
+				fmt.Println("GetUprobeGetUprobe duplicated entry event: %+v", event)
+				c.goEventStacks[event.TraceId][event.Goid][length-1].StackEvent = event
+				return
+			}
+			level = int(funEvent.Level)
+			pid = uint64(funEvent.Nid)
+		}
+		rand.Seed(time.Now().UnixNano())
+		// append new event
+		fmt.Println("append goEventStacks fun:", event.TraceId, uprobe.Funcname, pid, level+1)
+		c.goEventStacks[event.TraceId][event.Goid] = append(c.goEventStacks[event.TraceId][event.Goid], ebpftracer.StackFunEvent{
+			StackEvent: event,
+			Uprobe:     &uprobe,
+			Level:      level + 1,
+			Pid:        int(pid),
+			Nid:        rand.Intn(100000000),
+		})
+		length = len(c.goEventStacks[event.TraceId][event.Goid])
+		fmt.Println("append goEventStacks end:", event.TraceId, uprobe.Funcname, pid, level+1, length)
+	case 1: // ret
+		// fmt.Println("StackProcess 函数出口开始处理 fun:", event.TraceId, uprobe.Funcname)
+		length := len(c.goEventStacks[event.TraceId][event.Goid])
+		fmt.Println("StackProcess 函数出口开始处理 fun:", event.TraceId, uprobe.Funcname, length)
+		if length > 0 {
+			funEvent := c.goEventStacks[event.TraceId][event.Goid][length-1]
+			entFun := funEvent.StackEvent
+			apmTrace, ok := c.getTrace(event.TraceId)
+			fmt.Println("StackProcess 函数出口处理 fun:", event.TraceId, funEvent.Uprobe.Funcname, length)
+			if ok {
+				fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, funEvent.Pid, funEvent.Level, funEvent.Nid)
+				duration := event.TimeNsEnd - entFun.TimeNsStart
+				c.goEventStacks[event.TraceId][event.Goid] = c.goEventStacks[event.TraceId][event.Goid][:length-1]
+				apmTrace.FuncTraceQuery(funEvent.Uprobe.Funcname, time.Duration(duration), funEvent.Level, funEvent.Pid, funEvent.Nid)
+			}
+		}
+	}
+
+}
+
+// ResolveAddress returns the symbol(s) and offset of the given address.
+func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
+	if addr == 0 {
+		// err = errors.Wrapf(SymbolNotFoundError, "0")
+		return
+	}
+	// symbols, _, err := e.Symbols()
+	if err != nil {
+		return
+	}
+
+	idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
+	if idx == 0 {
+		// err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
+		return
+	}
+
+	// why diff symbol may contains the same addr?
+	sym := symbols[idx-1]
+	for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
+		syms = append(syms, symbols[i])
+	}
+	for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
+		syms = append(syms, symbols[i])
+	}
+	return syms, uint(addr - sym.Value), nil
+}
+
+func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
+	fmt.Println("GetUprobe entory:")
+	syms, offset, err := c.ResolveAddress(event.Ip, tracer.Symbols)
+	if err != nil {
+		return
+	}
+	for _, sym := range syms {
+		fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
+		uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s", sym.Name)]
+		if ok {
+			return uprobe, nil
+		}
+	}
+	err = errors.New("uprobe not found")
+	return
+}

+ 21 - 0
containers/registry.go

@@ -12,6 +12,7 @@ import (
 	"github.com/coroot/coroot-node-agent/cgroup"
 	"github.com/coroot/coroot-node-agent/common"
 	"github.com/coroot/coroot-node-agent/ebpftracer"
+	"github.com/coroot/coroot-node-agent/ebpftracer/tracer"
 	"github.com/coroot/coroot-node-agent/flags"
 	"github.com/coroot/coroot-node-agent/proc"
 	"github.com/prometheus/client_golang/prometheus"
@@ -50,6 +51,13 @@ type Registry struct {
 	processInfoCh chan<- ProcessInfo
 }
 
+var (
+	// goEvents map[uint64][]ebpftracer.StackFunEvent // k=goid,v=[]event
+	// goEventStack map[uint64]uint64
+	uprobes    []tracer.Uprobe
+	uprobesMap map[string]tracer.Uprobe
+)
+
 func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh chan<- ProcessInfo) (*Registry, error) {
 	ns, err := proc.GetSelfNetNs()
 	if err != nil {
@@ -285,6 +293,19 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 					}
 					r.ip2fqdnLock.Unlock()
 				}
+			case ebpftracer.EventTypeFunEnt:
+				if e.StackEvent == nil {
+					continue
+				}
+
+				if c := r.containersByPid[uint32(e.StackEvent.Pid)]; c != nil {
+					fmt.Println("e.EventTypeFunEnt: TraceId:%d, Pid:%d, Location:%d, Goid:%d, TimeNs:%d, Ip:%X, CallerIp:%x, Bp:%x, CallerBp:%x", e.StackEvent.TraceId, e.StackEvent.Pid, e.StackEvent.Location, e.StackEvent.Goid, e.StackEvent.TimeNsStart, e.StackEvent.Ip, e.StackEvent.CallerIp, e.StackEvent.Bp, e.StackEvent.CallerBp)
+					fmt.Println("e.EventTypeFunEnt: FPid:%d, Nid:%d, Level:%d", e.StackEvent.Fpid, e.StackEvent.Nid, e.StackEvent.Level)
+					c.StackProcess(*e.StackEvent, r.tracer)
+				} else {
+					fmt.Println("e.EventTypeFunEnt ErrorError: TraceId:%d, Pid:%d, Location:%d, Goid:%d, TimeNs:%d, Ip:%X, CallerIp:%x, Bp:%x, CallerBp:%x", e.StackEvent.TraceId, e.StackEvent.Pid, e.StackEvent.Location, e.StackEvent.Goid, e.StackEvent.TimeNsStart, e.StackEvent.Ip, e.StackEvent.CallerIp, e.StackEvent.Bp, e.StackEvent.CallerBp)
+					fmt.Println("e.EventTypeFunEnt ErrorError: TraceId:%d, FPid:%d, Nid:%d, Level:%d", e.StackEvent.Fpid, e.StackEvent.Nid, e.StackEvent.Level)
+				}
 			}
 		}
 	}

+ 1 - 0
ebpftracer/ebpf/ebpf.c

@@ -65,4 +65,5 @@
 //#include "l7/openssl.c"
 #include "utrace/go/net/server.probe.bpf.c"
 #include "utrace/go/net/client.probe.bpf.c"
+#include "utrace/go/net/stack.probe.bpf.c"
 char _license[] SEC("license") = "GPL";

+ 1 - 1
ebpftracer/ebpf/l7/uprobe_base_bpf.c

@@ -132,7 +132,7 @@ int runtime_execute(struct pt_regs *ctx)// ok
     __s64 goroutine_id = GOROUTINE(ctx);
 
     bpf_map_update_elem(&goroutines_map, &pid_tgid, &goroutine_id, BPF_ANY);
-    bpf_printk("[GO] [runtime.execute] thread_id:%d|goid:%d", tid, goroutine_id);
+    // bpf_printk("[GO] [runtime.execute] thread_id:%d|goid:%d", tid, goroutine_id);
 
     return 0;
 }

+ 1 - 1
ebpftracer/ebpf/uprobe_base_bpf.c

@@ -399,7 +399,7 @@ int runtime_execute(struct pt_regs *ctx)
 
 	__s64 goid = 0;
 	bpf_probe_read(&goid, sizeof(goid), g_ptr + offset_g_goid);
-	debug("[Go] [runtime.execute] goid:%llu",goid);
+	// debug("[Go] [runtime.execute] goid:%llu",goid);
 
 	bpf_map_update_elem(&goroutines_map, &pid_tgid, &goid, BPF_ANY);
 

+ 288 - 0
ebpftracer/ebpf/utrace/go/net/stack.probe.bpf.c

@@ -0,0 +1,288 @@
+#include "arguments.h"
+#include "span_context.h"
+#include "go_context.h"
+#include "go_types.h"
+#include "uprobe.h"
+
+#define MAX_DATA_SIZE 64
+
+#define ENTPOINT 0
+#define RETPOINT 1
+
+// bpf write event data when enter/exit a function
+struct event
+{
+	__u64 pid;
+	__u64 trace_id;
+	__u64 goid;
+	__u64 ip;
+	__u64 bp;
+	__u64 caller_ip;
+	__u64 caller_bp;
+	__u64 time_ns_start;
+	__u64 time_ns_end;
+	__u64 nid;
+	__u64 fpid;
+	__u64 level;
+	__u8 location;
+}__attribute__((packed));
+
+struct trace_stack_entry_index_key_t {
+	__u32 trace_id;
+	__u64 goid;
+};
+
+struct trace_stack_entry_index_value_t {
+	__u64 fun_key;
+};
+
+struct trace_stack_entry_key_t {
+	__u32 trace_id;
+	__u64 goid;
+	__u64 fun_key;
+};
+
+struct {
+    __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
+    __uint(key_size, sizeof(int));
+    __uint(value_size, sizeof(int));
+} event_queue SEC(".maps");
+
+struct {
+	__uint(type, BPF_MAP_TYPE_LRU_HASH);
+	__uint(key_size, sizeof(struct trace_stack_entry_key_t));
+	__uint(value_size, sizeof(struct event));
+	__uint(max_entries, 32768);
+} trace_stack_entry SEC(".maps");
+
+struct {
+	__uint(type, BPF_MAP_TYPE_LRU_HASH);
+	__uint(key_size, sizeof(struct trace_stack_entry_index_key_t));
+	__uint(value_size, sizeof(__u64));
+	__uint(max_entries, 32768);
+} trace_stack_entry_index SEC(".maps");
+
+// struct bpf_map_def SEC("maps") event_queue = {
+// 	.type = BPF_MAP_TYPE_QUEUE,
+// 	.key_size = 0,
+// 	.value_size = sizeof(struct event),
+// 	.max_entries = 10000,
+// };
+
+
+struct bpf_map_def SEC("maps") event_stack = {
+	.type = BPF_MAP_TYPE_PERCPU_ARRAY,
+	.key_size = sizeof(__u32),
+	.value_size = sizeof(struct event),
+	.max_entries = 1,
+};
+
+struct bpf_map_def SEC("maps") should_trace_goid = {
+	.type = BPF_MAP_TYPE_HASH,
+	.key_size = sizeof(__u64),
+	.value_size = sizeof(bool),
+	.max_entries = 10000,
+};
+
+SEC("uprobe/ent")
+int ent(struct pt_regs *ctx)
+{
+    bpf_printk("[Go] [uprobe/ent] enter");
+	__u64 pid_tgid = bpf_get_current_pid_tgid();
+    __u64 pid = pid_tgid >> 32;
+
+	__u64 trace_id = get_apm_trace_id(pid, pid_tgid);
+
+	__u32 key = 0;
+	struct event *e = bpf_map_lookup_elem(&event_stack, &key);
+	if (!e)
+	{
+		bpf_printk("[Go] [uprobe/ret]: conot get event");
+		return 0;
+	}
+	__builtin_memset(e, 0, sizeof(*e));
+
+	// e->goid = get_goid();
+	e->goid = get_current_goroutine();
+
+	// e->ip = ctx->ip;
+	e->ip = PT_REGS_IP(ctx); // 当前函数的地址
+	if (!bpf_map_lookup_elem(&should_trace_goid, &e->goid))
+	{
+		__u64 should_trace = true;
+		bpf_map_update_elem(&should_trace_goid, &e->goid, &should_trace, BPF_ANY);
+	}
+	bpf_printk("[Go] [uprobe/ent]: e->goid: %llu", e->goid);
+	bpf_printk("[Go] [uprobe/ent]: e->ip: %d", e->ip);
+	bpf_printk("[Go] [uprobe/ent]: yes");
+
+	e->pid = pid;
+	e->trace_id = trace_id;
+	e->location = ENTPOINT; // 标识入口还是出口
+	e->time_ns_start = bpf_ktime_get_ns();
+	e->bp = PT_REGS_SP(ctx) - 8; // 因为 ebpf 程序执行时实际上还没进行堆栈的切换,所以此处是需要在父函数中获取子函数的基地址,新的堆栈会将父函数的 rbp 入栈,一个 rbp 在 64位机器上占用 8 个字节,所以需要减 8,获取到子函数的基地址,低地址的需要减8,如果到高地址的cpu 上就得+8
+	e->caller_bp = PT_REGS_FP(ctx); // 父函数的基地址
+
+	void *ra;
+	ra = (void *)PT_REGS_SP(ctx); // 父函数的 sp,此时 sp 指向父函数的最后一个地址
+	bpf_probe_read_user(&e->caller_ip, sizeof(e->caller_ip), ra);
+
+	bpf_printk("[Go] [uprobe/ent]: goid: %llu", e->goid);
+
+	bpf_printk("[Go] [uprobe/ent]: event: location:%x,ip:%x,time_ns_start:%lld\n", e->location, e->ip, e->time_ns_start);
+	bpf_printk("[Go] [uprobe/ent]: event: bp:%x,caller_bp:%x,caller_ip:%x\n", e->bp,e->caller_bp,e->caller_ip);
+
+	struct trace_stack_entry_index_key_t trace_index_key = {};
+	trace_index_key.goid = e->goid;
+	trace_index_key.trace_id = trace_id;
+
+	bpf_printk("[Go] [uprobe/ent]: trace_stack_entry_index_key_t: goid:%d,trace_id:%lld\n", trace_index_key.goid,trace_index_key.trace_id);
+	__u64 *fun_key_p = bpf_map_lookup_elem(&trace_stack_entry_index, &trace_index_key);
+	__u64 fun_key = 1;
+	__u64 fpid = e->goid;
+	if (fun_key_p) {
+		bpf_printk("[Go] [uprobe/ent]: get fun_key: %d", *fun_key_p);
+		struct trace_stack_entry_key_t trace_key_parent = {};
+		trace_key_parent.goid = e->goid;
+		trace_key_parent.trace_id = trace_id;
+		trace_key_parent.fun_key = *fun_key_p;
+
+		struct event *event_parent = bpf_map_lookup_elem(&trace_stack_entry, &trace_key_parent);
+		if (!event_parent) {
+			bpf_printk("[Go] [uprobe/ent]: get event_parent Error Error: %d", *fun_key_p);
+		} else {
+			fpid = event_parent->nid;
+		}
+		fun_key = *fun_key_p + 1;
+	}
+
+	struct trace_stack_entry_key_t trace_key = {};
+	trace_key.goid = e->goid;
+	trace_key.trace_id = trace_id;
+	trace_key.fun_key = fun_key;
+
+	struct event event_current = {};
+	event_current.bp = e->bp,
+	event_current.caller_bp = e->caller_bp,
+	event_current.caller_ip = e->caller_ip,
+	event_current.pid = e->pid,
+	event_current.trace_id = e->trace_id,
+	event_current.goid = e->goid,
+	event_current.ip = e->ip,
+	event_current.time_ns_start = e->time_ns_start,
+	event_current.location = e->location,
+	event_current.nid = bpf_get_prandom_u32() & 0xFF,
+	event_current.fpid = fpid,
+	event_current.level = fun_key + 2, // 第一级是 appliction,二级是 协程
+
+	bpf_printk("[Go] [uprobe/ent]: uodate fun_key: %d", fun_key);
+	bpf_map_update_elem(&trace_stack_entry, &trace_key, &event_current, BPF_ANY);
+	bpf_map_update_elem(&trace_stack_entry_index, &trace_index_key, &fun_key, BPF_ANY);
+
+	// return bpf_map_push_elem(&event_queue, e, BPF_EXIST);
+	// bpf_perf_event_output(ctx, &event_queue, BPF_F_CURRENT_CPU, e, sizeof(*e));
+
+	bpf_printk("[Go] [uprobe/ent] end");
+	return 1;
+}
+
+SEC("uprobe/ret")
+int ret(struct pt_regs *ctx)
+{
+    bpf_printk("[Go] [uprobe/ret] enter");
+	__u64 pid_tgid = bpf_get_current_pid_tgid();
+    __u64 pid = pid_tgid >> 32;
+
+	__u64 trace_id = get_apm_trace_id(pid, pid_tgid);
+
+	__u32 key = 0;
+	struct event *e = bpf_map_lookup_elem(&event_stack, &key);
+	if (!e)
+	{
+		bpf_printk("[Go] [uprobe/ret]: conot get event");
+		return 0;
+	}
+	__builtin_memset(e, 0, sizeof(*e));
+
+	// e->goid = get_goid();
+	e->goid = get_current_goroutine();
+	if (!bpf_map_lookup_elem(&should_trace_goid, &e->goid))
+	{
+		bpf_printk("[Go] [uprobe/ret]: conot get event: %llu", e->goid);
+		return 0;
+	}
+	bpf_printk("[Go] [uprobe/ret]: yes");
+	e->pid = pid;
+	e->trace_id = trace_id;
+	e->location = RETPOINT;
+	e->ip = PT_REGS_IP(ctx);
+	e->time_ns_end = bpf_ktime_get_ns();
+	e->bp = PT_REGS_FP(ctx);
+	bpf_printk("[Go] [uprobe/ret]: e->ip:%x,bp:%x", e->ip, e->bp);
+	bpf_printk("[Go] [uprobe/ret]: goid: %llu", e->goid);
+
+	bpf_printk("[Go] [uprobe/ret]: event: location:%x,ip:%x,time_ns_end:%lld\n", e->location, e->ip, e->time_ns_end);
+
+	struct trace_stack_entry_index_key_t trace_index_key = {};
+	trace_index_key.goid = e->goid;
+	trace_index_key.trace_id = trace_id;
+
+	__u64 *fun_key_p = bpf_map_lookup_elem(&trace_stack_entry_index, &trace_index_key);
+	__u64 fun_key = 1;
+	if (fun_key_p) {
+		bpf_printk("[Go] [uprobe/ret]: get fun_key: %d", *fun_key_p);
+		fun_key = *fun_key_p;
+	} else {
+		bpf_printk("[Go] [uprobe/ret]:ErrorErrorErrorError Not get fun_key");
+	}
+	struct trace_stack_entry_key_t trace_key = {};
+	trace_key.goid = e->goid;
+	trace_key.trace_id = trace_id;
+	trace_key.fun_key = fun_key;
+
+	struct event *event_p = bpf_map_lookup_elem(&trace_stack_entry, &trace_key);
+
+	if (!event_p) {
+		bpf_printk("[Go] [uprobe/ret]:ErrorErrorErrorError Not get funEntry");
+		return 0;
+	}
+
+	event_p->time_ns_end = e->time_ns_end;
+
+	bpf_printk("[Go] [uprobe/ret]: ent:event: location:%d,ip:%x,time_ns_start:%lld\n", event_p->location, event_p->ip, event_p->time_ns_start);
+	bpf_printk("[Go] [uprobe/ret]: ent:event: bp:%x,caller_bp:%x,caller_ip:%x\n", event_p->bp,event_p->caller_bp,event_p->caller_ip);
+	bpf_printk("[Go] [uprobe/ret]: ent:event: nid:%d,fpid:%d,level:%d\n", event_p->nid,event_p->fpid,event_p->level);
+
+	bpf_map_delete_elem(&trace_stack_entry, &trace_key);
+	__u64 fun_key_new = fun_key - 1;
+	bpf_map_update_elem(&trace_stack_entry_index, &trace_index_key, &fun_key_new, BPF_ANY);
+
+	// return bpf_map_push_elem(&event_queue, e, BPF_EXIST);
+	// 最后一个函数结束,同时推送协程信息
+	if (fun_key == 1) {
+		struct event event_coroutine = {};
+		event_coroutine.bp = e->bp,
+		event_coroutine.caller_bp = e->caller_bp,
+		event_coroutine.caller_ip = e->caller_ip,
+		event_coroutine.pid = e->pid,
+		event_coroutine.trace_id = e->trace_id,
+		event_coroutine.goid = e->goid,
+		event_coroutine.ip = e->ip,
+		event_coroutine.time_ns_start = e->time_ns_start,
+		event_coroutine.location = 2,
+		event_coroutine.nid = e->goid,
+		event_coroutine.fpid = 1,
+		event_coroutine.level = 2,
+		bpf_printk("[Go] [uprobe/ret]: ent:event_coroutine push: nid:%d,fpid:%d,level:%d\n", event_coroutine.nid,event_coroutine.fpid,event_coroutine.level);
+		bpf_perf_event_output(ctx, &event_queue, BPF_F_CURRENT_CPU, &event_coroutine, sizeof(event_coroutine));
+	}
+	bpf_printk("[Go] [uprobe/ret]: ent:event_ret push: nid:%d,fpid:%d,level:%d\n", event_p->nid,event_p->fpid,event_p->level);
+	bpf_perf_event_output(ctx, &event_queue, BPF_F_CURRENT_CPU, event_p, sizeof(*e));
+
+	bpf_printk("[Go] [uprobe/ret] end");
+	return 1;
+}
+
+
+
+

+ 203 - 0
ebpftracer/stack.go

@@ -0,0 +1,203 @@
+package ebpftracer
+
+import (
+	"context"
+	"debug/elf"
+	debugelf "debug/elf"
+	"fmt"
+	"io"
+	"log"
+	"os"
+	"regexp"
+	"sort"
+	"strconv"
+
+	"github.com/cilium/ebpf"
+	"github.com/cilium/ebpf/link"
+	"github.com/coroot/coroot-node-agent/ebpftracer/tracer"
+	"github.com/coroot/coroot-node-agent/proc"
+	"golang.org/x/sync/semaphore"
+)
+
+func (t *Tracer) stack() error {
+	// uprobes := []tracer.Uprobe{}
+	if t.disableL7Tracing {
+		return nil
+	}
+
+	ENV_PID := os.Getenv("FILTER_PID")
+	if ENV_PID == "" {
+		return nil
+	}
+
+	MatchString := ".*HandleFunc|.*main.*|testfun.*|.*serverHandler.*|.*ServeHTTP.*"
+
+	pid, _ := strconv.ParseInt(ENV_PID, 10, 32)
+	path := proc.Path(uint32(pid), "exe")
+
+	t.Uprobes, _ = t.getUprobes(path, MatchString)
+	t.UprobesMap = map[string]tracer.Uprobe{}
+	fmt.Println("UprobesMap::: init")
+	for _, up := range t.Uprobes {
+		fmt.Println("UprobesMap:::", up.Funcname)
+		t.UprobesMap[fmt.Sprintf("%s", up.Funcname)] = up
+	}
+	links := t.attachUprobes(path, t.Uprobes)
+	t.links = append(t.links, links...)
+
+	// defer t.detachUprobes(links)
+
+	return nil
+}
+
+func (t *Tracer) getUprobes(path string, MatchString string) ([]tracer.Uprobe, error) {
+	uprobes := []tracer.Uprobe{}
+
+	binFile, err := os.Open(path)
+	if err != nil {
+		return nil, err
+	}
+
+	// cache := map[string]interface{}{}
+	// 解析 elf 文件
+	elfFile, _ := debugelf.NewFile(binFile)
+	// 获取所有符号表
+	symbols, _ := elfFile.Symbols()
+
+	sort.Slice(symbols, func(i, j int) bool { return symbols[i].Value < symbols[j].Value })
+
+	t.Symbols = symbols
+	// 符号表组装成键值 map,方便使用
+	symnames := map[string]debugelf.Symbol{}
+	for _, symbol := range symbols {
+		fmt.Println(symbol.Name, symbol)
+		symnames[symbol.Name] = symbol
+	}
+
+	textSection := elfFile.Section(".text")
+	if textSection == nil {
+		fmt.Println("no text section", nil)
+		return nil, nil
+	}
+	textSectionData, err := textSection.Data()
+	if err != nil {
+		fmt.Println("failed to read text section", err)
+		return nil, nil
+	}
+	textSectionLen := uint64(len(textSectionData) - 1)
+
+	//  遍历符号表
+	for _, symbol := range symbols {
+		if debugelf.ST_TYPE(symbol.Info) != debugelf.STT_FUNC {
+			continue
+		}
+		// fmt.Println("Hello FunName: ", symbol.Name)
+		// 使用正则表达式匹配函数白名单列表
+		found, err := regexp.MatchString(MatchString, symbol.Name)
+		// found, err := regexp.MatchString("main.*", symbol.Name)
+
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		if found {
+			// 匹配到了加入 attachFuncs 列表
+			fmt.Println("Fuck This: ", symbol.Name)
+			// attachFuncs = append(attachFuncs, symbol.Name)
+			// 根据函数名拿到当前函数的符号结构体
+			sym := symnames[symbol.Name]
+			if err != nil {
+				fmt.Printf("symnames[symbol.Name]", symbol.Name, err)
+				return nil, err
+			}
+
+			address := sym.Value
+			for _, p := range elfFile.Progs {
+				if p.Type != elf.PT_LOAD || (p.Flags&elf.PF_X) == 0 {
+					continue
+				}
+
+				if p.Vaddr <= sym.Value && sym.Value < (p.Vaddr+p.Memsz) {
+					address = sym.Value - p.Vaddr + p.Off
+					break
+				}
+			}
+
+			// 函数入口加入待 attach 列表
+			uprobes = append(uprobes, tracer.Uprobe{
+				Funcname:  symbol.Name,    // 函数名
+				Location:  tracer.AtEntry, // 入口
+				Address:   address,        // 函数地址
+				AbsOffset: 0,              // 函数相对 ELF 偏移
+				RelOffset: 0,              // 函数真实偏移
+				Wanted:    true,
+			})
+
+			sStart := sym.Value - textSection.Addr
+			sEnd := sStart + sym.Size
+			if sEnd > textSectionLen {
+				continue
+			}
+			sBytes := textSectionData[sStart:sEnd]
+			returnOffsets := getReturnOffsets(elfFile.Machine, sBytes)
+
+			for _, offset := range returnOffsets {
+				uprobes = append(uprobes, tracer.Uprobe{
+					Funcname:  symbol.Name,
+					Location:  tracer.AtRet,
+					Address:   address,
+					AbsOffset: uint64(offset),
+					RelOffset: 0,
+				})
+			}
+		}
+	}
+	return uprobes, nil
+}
+
+func (t *Tracer) attachUprobes(path string, uprobes []tracer.Uprobe) []link.Link {
+	var links []link.Link
+
+	ex, err := link.OpenExecutable(path)
+	if err != nil {
+		return nil
+	}
+	fmt.Println("AttachAttachAttach", path)
+
+	for i, up := range t.uprobes {
+		fmt.Println("attachingERROR---", i, up)
+	}
+
+	for i, up := range uprobes {
+		fmt.Printf("attaching %d -> %d -> %s -> 0x%x -> 0x%x\n", i, len(uprobes), up.Funcname, up.AbsOffset, up.Address)
+		var prog *ebpf.Program
+		switch up.Location {
+		case tracer.AtEntry:
+			prog = t.uprobes["ent"]
+		case tracer.AtRet:
+			prog = t.uprobes["ret"]
+		}
+		fmt.Println("progprogprogprogprogprog---", prog)
+		up, err := ex.Uprobe(up.Funcname, prog, &link.UprobeOptions{Address: up.Address, Offset: up.AbsOffset})
+		if err != nil {
+			fmt.Println("attachingERROR", err)
+			// return nil
+		} else {
+			links = append(links, up)
+		}
+	}
+	return links
+}
+
+func (t *Tracer) detachUprobes(links []link.Link) {
+	sem := semaphore.NewWeighted(10)
+	for i, closer := range links {
+		fmt.Printf("detaching %d/%d\r", i+1, len(links))
+		sem.Acquire(context.Background(), 1)
+		go func(closer io.Closer) {
+			defer sem.Release(1)
+			closer.Close()
+		}(closer)
+	}
+	fmt.Println()
+}

+ 64 - 16
ebpftracer/tracer.go

@@ -2,9 +2,16 @@ package ebpftracer
 
 import (
 	"bytes"
+	debugelf "debug/elf"
 	"encoding/binary"
 	"errors"
 	"fmt"
+	"os"
+	"runtime"
+	"strconv"
+	"strings"
+	"time"
+
 	"github.com/cilium/ebpf"
 	"github.com/cilium/ebpf/link"
 	"github.com/cilium/ebpf/perf"
@@ -16,11 +23,6 @@ import (
 	"golang.org/x/sys/unix"
 	"inet.af/netaddr"
 	"k8s.io/klog/v2"
-	"os"
-	"runtime"
-	"strconv"
-	"strings"
-	"time"
 )
 
 /*
@@ -90,20 +92,23 @@ const (
 	EventTypeFileOpen        EventType = 8
 	EventTypeTCPRetransmit   EventType = 9
 	EventTypeL7Request       EventType = 10
+	EventTypeFunEnt          EventType = 11
+	EventTypeFunRet          EventType = 12
 
 	EventReasonNone    EventReason = 0
 	EventReasonOOMKill EventReason = 1
 )
 
 type Event struct {
-	Type      EventType
-	Reason    EventReason
-	Pid       uint32
-	SrcAddr   netaddr.IPPort
-	DstAddr   netaddr.IPPort
-	Fd        uint64
-	Timestamp uint64
-	L7Request *l7.RequestData
+	Type       EventType
+	Reason     EventReason
+	Pid        uint32
+	SrcAddr    netaddr.IPPort
+	DstAddr    netaddr.IPPort
+	Fd         uint64
+	Timestamp  uint64
+	L7Request  *l7.RequestData
+	StackEvent *StackEvent
 }
 
 type perfMapType uint8
@@ -114,6 +119,7 @@ const (
 	perfMapTypeFileEvents   perfMapType = 3
 	perfMapTypeL7Events     perfMapType = 4
 	perfMapTypeSocketEvents perfMapType = 5
+	perfMapTypeEventQueue   perfMapType = 6
 )
 
 type Tracer struct {
@@ -124,6 +130,9 @@ type Tracer struct {
 	readers    map[string]*perf.Reader
 	links      []link.Link
 	uprobes    map[string]*ebpf.Program
+	Symbols    []debugelf.Symbol
+	Uprobes    []tracer.Uprobe
+	UprobesMap map[string]tracer.Uprobe
 }
 
 func NewTracer(kernelVersion string, disableL7Tracing bool) *Tracer {
@@ -143,6 +152,9 @@ func (t *Tracer) Run(events chan<- Event) error {
 	if err := t.ebpf(events); err != nil {
 		return err
 	}
+	if err := t.stack(); err != nil {
+		return err
+	}
 	if err := t.init(events); err != nil {
 		return err
 	}
@@ -246,7 +258,7 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 	if err != nil {
 		var verr *ebpf.VerifierError
 		if errors.As(err, &verr) {
-			klog.Errorf("%+v", verr)
+			klog.Errorf("----%+v", verr)
 		}
 		return fmt.Errorf("failed to load collection: %w", err)
 	}
@@ -260,6 +272,7 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 		{name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
 		{name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
 		{name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
+		{name: "event_queue", typ: perfMapTypeEventQueue, perCPUBufferSizePages: 32},
 	}
 	fmt.Println(len(collectionSpec.Programs))
 	fmt.Println(len(c.Programs))
@@ -270,7 +283,7 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 	perfMaps = append(perfMaps, perfMap{name: tracer.MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64})
 	fmt.Println("perfMaps start --")
 	for _, pm := range perfMaps {
-		fmt.Println(pm.name)
+		fmt.Println("pm.namepm.name: ", pm.name)
 		m, ok := t.collection.Maps[pm.name]
 		if ok {
 			r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize())
@@ -287,7 +300,7 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 
 	for _, programSpec := range collectionSpec.Programs {
 		program := t.collection.Programs[programSpec.Name]
-		fmt.Println(programSpec.Name, programSpec.SectionName, programSpec.Type)
+		fmt.Println("programSpecprogramSpec:--:", programSpec.Name, programSpec.SectionName, programSpec.Type)
 		if t.disableL7Tracing {
 			switch programSpec.Name {
 			case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg":
@@ -445,6 +458,30 @@ type SocketDatadddd struct {
 	Data           [BURST_DATA_BUF_SIZE]byte
 }
 
+type StackEvent struct {
+	Pid         uint64
+	TraceId     uint64
+	Goid        uint64
+	Ip          uint64
+	Bp          uint64
+	CallerIp    uint64
+	CallerBp    uint64
+	TimeNsStart uint64
+	TimeNsEnd   uint64
+	Nid         uint64
+	Fpid        uint64
+	Level       uint64
+	Location    byte
+}
+
+type StackFunEvent struct {
+	StackEvent StackEvent
+	Uprobe     *tracer.Uprobe
+	Level      int
+	Pid        int
+	Nid        int
+}
+
 func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
 	for {
 		rec, err := r.Read()
@@ -616,6 +653,17 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				Fd:        v.Fd,
 				Timestamp: v.Timestamp,
 			}
+		case perfMapTypeEventQueue:
+			v := &StackEvent{}
+			if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
+				klog.Warningln("failed to read msg:", err)
+				continue
+			}
+
+			event = Event{
+				Type:       EventTypeFunEnt,
+				StackEvent: v,
+			}
 		default:
 			continue
 		}

+ 21 - 0
ebpftracer/tracer/uprobe.go

@@ -0,0 +1,21 @@
+package tracer
+
+type UprobeLocation int
+
+const (
+	AtEntry UprobeLocation = iota
+	AtRet
+	AtGoroutineExit
+	AtGoroutineNewproc1
+	AtGoroutineNewproc1Ret
+)
+
+type Uprobe struct {
+	Funcname  string
+	Address   uint64         // absolute address of the function entry
+	AbsOffset uint64         // absolute offset to the binary entry (ELF file beginning)
+	RelOffset uint64         // relative to the function entry
+	Location  UprobeLocation // location of the probe
+	// FetchArgs []*FetchArg    // fetch arguments
+	Wanted bool
+}

+ 17 - 4
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -3,12 +3,13 @@ package otlptrace
 import (
 	"crypto/md5"
 	"fmt"
-	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform"
-	tracesdk "go.opentelemetry.io/otel/sdk/trace"
-	tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
 	"strconv"
 	"strings"
 	"sync"
+
+	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform"
+	tracesdk "go.opentelemetry.io/otel/sdk/trace"
+	tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
 )
 
 const (
@@ -206,10 +207,22 @@ func initMapNode(spanSd *tracepb.Span) (MapInfoT, string) {
 		ServiceType:    "",
 		WallTime:       0,
 	}
+	mNode.MethodName = spanSd.Name
 	mNode.PureTime = (spanSd.EndTimeUnixNano - spanSd.StartTimeUnixNano) / 1e3
 	mNode.WallTime = mNode.PureTime
 	mNode.StartTime = spanSd.StartTimeUnixNano / 1e6
 
+	for _, attr := range spanSd.GetAttributes() {
+		switch attr.Key {
+		case "nid":
+			mNode.Nid = int(attr.Value.GetIntValue())
+		case "pid":
+			mNode.Pid = int(attr.Value.GetIntValue())
+		case "level":
+			mNode.Level = int(attr.Value.GetIntValue())
+		}
+	}
+
 	return mNode, spanSd.Name
 }
 
@@ -228,7 +241,7 @@ func buildAndAssemblyMap(sd apmTraceSpan, traceRoot *TraceMapT) MapInfoT {
 		buildRedisMap(&mNode, sd)
 	}
 	if mapType != "" {
-		mNode.Nid = traceRoot.Index
+		// mNode.Nid = traceRoot.Index
 		traceRoot.RootData.Maps = append(traceRoot.RootData.Maps, mNode)
 	}
 	return mNode

+ 13 - 1
tracing/apm_tracing.go

@@ -3,13 +3,14 @@ package tracing
 import (
 	"context"
 	"fmt"
+	"time"
+
 	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
 	"go.opentelemetry.io/otel/attribute"
 	"go.opentelemetry.io/otel/codes"
 	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
 	"go.opentelemetry.io/otel/trace"
 	"inet.af/netaddr"
-	"time"
 )
 
 /**
@@ -114,3 +115,14 @@ func (t *Trace) HttpTraceRequest(method, path, ip string, port uint16, status l7
 		attribute.Int("http.port", int(port)),
 	)
 }
+
+func (t *Trace) FuncTraceQuery(funcname string, duration time.Duration, level int, pid int, nid int) {
+	if t == nil || funcname == "" {
+		return
+	}
+	t.createTraceSpan(funcname, duration, false,
+		attribute.Int("level", level),
+		attribute.Int("pid", pid),
+		attribute.Int("nid", nid),
+	)
+}