package containers import ( "debug/elf" "fmt" "math/rand" "sort" "strconv" "time" "github.com/coroot/coroot-node-agent/ebpftracer" "github.com/coroot/coroot-node-agent/ebpftracer/l7" "github.com/coroot/coroot-node-agent/ebpftracer/tracer" "github.com/coroot/coroot-node-agent/tracing" "github.com/coroot/coroot-node-agent/utils" "github.com/pkg/errors" "inet.af/netaddr" ) func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) { trace, ok := c.traceMap[traceId] return trace, ok } func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error { method, path, hostIp, port := l7.ParseHttpHost(r.Payload) ip, err := netaddr.ParseIP(hostIp) if err != nil { return fmt.Errorf("host ip error") } addr := netaddr.IPPortFrom(ip, port) trace := tracing.NewTrace(string(c.id), addr) if trace == nil { return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null") } c.traceMap[traceId] = trace trace.TraceStart(method, path, r.Status, r.Duration) return nil } func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string { c.lock.Lock() defer c.lock.Unlock() if r.Protocol == l7.ProtocolDNS { return c.onDNSRequest(r) } if r.Protocol == l7.ProtocolTrace { //fmt.Println("r.TraceStart:", r.TraceStart) //fmt.Println("r.TraceEnd:", r.TraceEnd) if r.TraceStart == 1 { //fmt.Println("====ProtocolTrace start1====", r.TraceId) err := c.InitTrace(r.TraceId, r) if err != nil { fmt.Println(err) } //fmt.Println("init r.TraceId:", r.TraceId) //trace, _ := c.getTrace(r.TraceId) //fmt.Println("init traceId", trace) //stats.observe(r.Status.Http(), "", r.Duration) //method, path := l7.ParseHttp(r.Payload) //fmt.Println("r.Payload:", string(r.Payload)) //fmt.Println("method:", method) //fmt.Println("path:", path) //fmt.Println("====ProtocolTrace start2====") return nil } if r.TraceEnd == 1 { //fmt.Println("r:", r) //fmt.Println("r.Payload:", string(r.Payload)) //fmt.Println("====ProtocolTrace end2====") trace, ok := c.getTrace(r.TraceId) if ok { trace.TraceEnd(r) delete(c.traceMap, r.TraceId) } //fmt.Println("====ProtocolTrace end1====", ok, r.TraceId) return nil } } if r.Protocol == l7.ProtocolHTTP { //stats.observe(r.Status.Http(), "", r.Duration) method, path, hostIp, port := l7.ParseHttpHost(r.Payload) //trace.HttpRequest(method, path, r.Status, r.Duration) apmTrace, ok := c.getTrace(r.TraceId) if ok { apmTrace.HttpTraceRequest(method, path, hostIp, port, r) } return nil } conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] //fmt.Println("l7.connectionsByPidFd", conn, pid, fd) if conn == nil { return nil } if timestamp != 0 && conn.Timestamp != timestamp { return nil } stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest) trace := tracing.NewTrace(string(c.id), conn.ActualDest) switch r.Protocol { case l7.ProtocolHTTP: fmt.Println("l7.ProtocolHTTP", r.TraceId) //stats.observe(r.Status.Http(), "", r.Duration) method, path, hostIp, port := l7.ParseHttpHost(r.Payload) //trace.HttpRequest(method, path, r.Status, r.Duration) apmTrace, ok := c.getTrace(r.TraceId) if ok { apmTrace.HttpTraceRequest(method, path, hostIp, port, r) } case l7.ProtocolHTTP2: if conn.http2Parser == nil { conn.http2Parser = l7.NewHttp2Parser() } requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration)) for _, req := range requests { stats.observe(req.Status.Http(), "", req.Duration) trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration) } case l7.ProtocolPostgres: if r.Method != l7.MethodStatementClose { stats.observe(r.Status.String(), "", r.Duration) } if conn.postgresParser == nil { conn.postgresParser = l7.NewPostgresParser() } query := conn.postgresParser.Parse(r.Payload) trace.PostgresQuery(query, r.Status.Error(), r.Duration) case l7.ProtocolMysql: //fmt.Println("mysql mysql") //fmt.Println(conn) if r.Method != l7.MethodStatementClose { stats.observe(r.Status.String(), "", r.Duration) } if conn.mysqlParser == nil { conn.mysqlParser = l7.NewMysqlParser() } query := conn.mysqlParser.Parse(r.Payload, r.StatementId) //trace.MysqlQuery(query, r.Status.Error(), r.Duration) apmTrace, ok := c.getTrace(r.TraceId) //fmt.Println("mysql r.TraceId:", r.TraceId) //fmt.Println("ok:", ok) //fmt.Println("traceMap:", len(c.traceMap)) if ok { apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest) } case l7.ProtocolMemcached: stats.observe(r.Status.String(), "", r.Duration) cmd, items := l7.ParseMemcached(r.Payload) trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration) case l7.ProtocolRedis: fmt.Println("redis redis") stats.observe(r.Status.String(), "", r.Duration) cmd, args := l7.ParseRedis(r.Payload) fmt.Println("cmd", cmd) fmt.Println("args", args) apmTrace, ok := c.getTrace(r.TraceId) fmt.Println("redis r.TraceId:", r.TraceId) fmt.Println("ok:", ok) fmt.Println("traceMap:", len(c.traceMap)) if ok { apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration) } //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration) case l7.ProtocolMongo: stats.observe(r.Status.String(), "", r.Duration) query := l7.ParseMongo(r.Payload) trace.MongoQuery(query, r.Status.Error(), r.Duration) case l7.ProtocolKafka, l7.ProtocolCassandra: stats.observe(r.Status.String(), "", r.Duration) case l7.ProtocolRabbitmq, l7.ProtocolNats: stats.observe(r.Status.String(), r.Method.String(), 0) } return nil } func (c *Container) buildInstanceID() { c.lock.Lock() defer c.lock.Unlock() for address, val := range c.getListens() { if val == 1 { ip := address.IP() if ip.Is4() && !ip.IsLoopback() { // 获取端口号 port := address.Port() c.instanceID.IntVal, c.instanceID.HashtVal = utils.SetInsID(fmt.Sprintf("%s:%d", ip, port)) break } } } } func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) { c.lock.Lock() defer c.lock.Unlock() // get the associated uprobe switch event.Location { case 0: // ret uprobe, err := c.GetUprobe(event, tracer) if err != nil { //fmt.Println("GetUprobeGetUprobe errer: %v", err) // log.Errorf("failed to get uprobe for event %+v: %+v", event, err) return } if event.TraceId <= 0 { //fmt.Println("StackProcess TraceId id 0") // log.Errorf("failed to get uprobe for event %+v: %+v", event, err) return } //fmt.Println("StackProcess 函数入口开始处理 fun:", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart) apmTrace, ok := c.getTrace(event.TraceId) if ok { //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid) duration := event.TimeNsEnd - event.TimeNsStart apmTrace.FuncTraceQuery(uprobe.Funcname, time.Duration(duration), int(event.Level), int(event.Fpid), int(event.Nid)) } case 2: // coroutine //fmt.Println("StackProcess 协程入口开始处理 fun:", event.TraceId, event.Goid, event.TimeNsEnd-event.TimeNsStart) apmTrace, ok := c.getTrace(event.TraceId) if ok { //fmt.Println("append FuncTraceQuery fun:", event.TraceId, "coroutine"+strconv.FormatUint(event.Goid, 10), event.Pid, event.Fpid) duration := event.TimeNsEnd - event.TimeNsStart apmTrace.FuncTraceQuery("coroutine"+strconv.FormatUint(event.Goid, 10), time.Duration(duration), int(event.Level), int(event.Fpid), int(event.Nid)) } } } func (c *Container) StackProcessBak(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) { c.lock.Lock() defer c.lock.Unlock() // get the associated uprobe uprobe, err := c.GetUprobe(event, tracer) if err != nil { //fmt.Println("GetUprobeGetUprobe errer: %v", err) // log.Errorf("failed to get uprobe for event %+v: %+v", event, err) return } if event.TraceId <= 0 { //fmt.Println("StackProcess TraceId id 0") // log.Errorf("failed to get uprobe for event %+v: %+v", event, err) return } length := len(c.goEventStacks[event.TraceId]) if length <= 0 { c.goEventStacks = map[uint64]map[uint64][]ebpftracer.StackFunEvent{} c.goEventStacks[event.TraceId] = map[uint64][]ebpftracer.StackFunEvent{} c.goEventStacks[event.TraceId][event.Goid] = []ebpftracer.StackFunEvent{} } switch event.Location { case 0: // entry level := 100 pid := 100000 + event.Goid length := len(c.goEventStacks[event.TraceId][event.Goid]) //fmt.Println("StackProcess 函数入口开始处理 fun:", event.TraceId, uprobe.Funcname, length) if length > 0 { funEvent := c.goEventStacks[event.TraceId][event.Goid][length-1] //fmt.Println("funEvent goEventStacks fun:", event.TraceId, funEvent.Uprobe.Funcname, funEvent.Nid, funEvent.Level) lastEvent := funEvent.StackEvent if lastEvent.Location == event.Location && lastEvent.Ip == event.Ip && lastEvent.Bp != event.CallerBp { // duplicated entry event due to stack expansion/shrinkage // log.Debugf("duplicated entry event: %+v", event) //fmt.Println("GetUprobeGetUprobe duplicated entry event: %+v", event) c.goEventStacks[event.TraceId][event.Goid][length-1].StackEvent = event return } level = int(funEvent.Level) pid = uint64(funEvent.Nid) } rand.Seed(time.Now().UnixNano()) // append new event //fmt.Println("append goEventStacks fun:", event.TraceId, uprobe.Funcname, pid, level+1) c.goEventStacks[event.TraceId][event.Goid] = append(c.goEventStacks[event.TraceId][event.Goid], ebpftracer.StackFunEvent{ StackEvent: event, Uprobe: &uprobe, Level: level + 1, Pid: int(pid), Nid: rand.Intn(100000000), }) length = len(c.goEventStacks[event.TraceId][event.Goid]) //fmt.Println("append goEventStacks end:", event.TraceId, uprobe.Funcname, pid, level+1, length) case 1: // ret //// fmt.Println("StackProcess 函数出口开始处理 fun:", event.TraceId, uprobe.Funcname) length := len(c.goEventStacks[event.TraceId][event.Goid]) //fmt.Println("StackProcess 函数出口开始处理 fun:", event.TraceId, uprobe.Funcname, length) if length > 0 { funEvent := c.goEventStacks[event.TraceId][event.Goid][length-1] entFun := funEvent.StackEvent apmTrace, ok := c.getTrace(event.TraceId) //fmt.Println("StackProcess 函数出口处理 fun:", event.TraceId, funEvent.Uprobe.Funcname, length) if ok { //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, funEvent.Pid, funEvent.Level, funEvent.Nid) duration := event.TimeNsEnd - entFun.TimeNsStart c.goEventStacks[event.TraceId][event.Goid] = c.goEventStacks[event.TraceId][event.Goid][:length-1] apmTrace.FuncTraceQuery(funEvent.Uprobe.Funcname, time.Duration(duration), funEvent.Level, funEvent.Pid, funEvent.Nid) } } } } // ResolveAddress returns the symbol(s) and offset of the given address. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) { if addr == 0 { // err = errors.Wrapf(SymbolNotFoundError, "0") return } // symbols, _, err := e.Symbols() if err != nil { return } idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr }) if idx == 0 { // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr)) return } // why diff symbol may contains the same addr? sym := symbols[idx-1] for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- { syms = append(syms, symbols[i]) } for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ { syms = append(syms, symbols[i]) } return syms, uint(addr - sym.Value), nil } func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) { //fmt.Println("GetUprobe entory:") syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols) if err != nil { return } for _, sym := range syms { //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset) uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s", sym.Name)] if ok { return uprobe, nil } } err = errors.New("uprobe not found") return }