package containers import ( "bufio" "bytes" "debug/elf" "fmt" "github.com/coroot/coroot-node-agent/flags" "os" "path" "sort" "strconv" "strings" "time" "github.com/coroot/coroot-node-agent/ebpftracer" "github.com/coroot/coroot-node-agent/ebpftracer/l7" "github.com/coroot/coroot-node-agent/ebpftracer/tracer" "github.com/coroot/coroot-node-agent/proc" "github.com/coroot/coroot-node-agent/tracing" "github.com/coroot/coroot-node-agent/utils" . "github.com/coroot/coroot-node-agent/utils/modelse" "github.com/pkg/errors" klog "github.com/sirupsen/logrus" "inet.af/netaddr" ) const ( TRACE_STATUS = 1 ) func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) { trace, ok := c.traceMap[traceId] return trace, ok } func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) { c.traceMap[traceId] = trace } // 查询或创建trace信息 func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) { trace, ok := c.getTrace(traceId) if !ok { //new trace trace = tracing.NewTraceFromEvent(string(c.id)) //create TraceMap c.createTraceMap(traceId, trace) //create ParentSpan trace.CreateRootSpan(traceId) } return trace, nil } func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error { method, path, hostIp, port := l7.ParseHttpHost(r.Payload) ip, err := netaddr.ParseIP(hostIp) if err != nil { fmt.Println("host ip error") hostIp = "127.0.0.1" } addr := netaddr.IPPortFrom(ip, port) trace := tracing.NewTrace(string(c.id), addr) if trace == nil { return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null") } c.traceMap[traceId] = trace trace.TraceStart(method, path, r.Status, r.Duration) return nil } // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) { if t.AllEventReady(traceID) { t.SendEvent() klog.Infof("SendEvent %d", traceID) //fmt.Println(t.GetSpan()) //fmt.Println("===============") delete(c.traceMap, traceID) } } func (c *Container) valuableTrace(traceID uint64) bool { return traceID != 0 } func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string { c.lock.Lock() defer c.lock.Unlock() if r.Protocol == l7.ProtocolDNS { ip2fqdn, _type, fqdn := c.onDNSRequest(r) if c.l7Attach && c.valuableTrace(r.TraceId) { apmTrace, err := c.getOrInitTrace(r.TraceId) if err == nil { apmTrace.DNSTraceQueryEvent(r, _type, fqdn) c.SendEvent(apmTrace, r.TraceId) } } return ip2fqdn } //if !c.valuableTrace(r.TraceId) { // return nil //} if r.Protocol == l7.ProtocolTrace && c.l7Attach && c.valuableTrace(r.TraceId) { if r.TraceStart == TRACE_STATUS { klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId) trace, err := c.getOrInitTrace(r.TraceId) if err == nil { method, path, hostIp, port := l7.ParseHttpHost(r.Payload) ip, _ := netaddr.ParseIP(hostIp) //codeType := c.GetCodeTypeFromCache(pid) trace.TraceStartEvent(method, path, r.Status, netaddr.IPPortFrom(ip, port), pid, c.GetAppInfo()) c.SendEvent(trace, r.TraceId) } return nil } if r.TraceEnd == TRACE_STATUS { klog.Infof("====ProtocolTrace end==== %d %d", pid, r.TraceId) trace, err := c.getOrInitTrace(r.TraceId) if err == nil { trace.TraceEndEvent(r) c.SendEvent(trace, r.TraceId) } return nil } } if r.Protocol == l7.ProtocolHTTP { if c.l7Attach && c.valuableTrace(r.TraceId) { method, path, hostIp, port := l7.ParseHttpHost(r.Payload) apmTrace, err := c.getOrInitTrace(r.TraceId) //fmt.Println("ProtocolHTTP-----", r.TraceId, err) if err == nil { apmTrace.HttpTraceRequestEvent(method, path, hostIp, port, r) c.SendEvent(apmTrace, r.TraceId) } } //return nil } conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] //fmt.Println("l7.connectionsByPidFd", conn, pid, fd) if conn == nil { return nil } if timestamp != 0 && conn.Timestamp != timestamp { return nil } stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest) //trace := tracing.NewTrace(string(c.id), conn.ActualDest) switch r.Protocol { case l7.ProtocolHTTP: stats.observe(r.Status.Http(), "", r.Duration) case l7.ProtocolHTTP2: if conn.http2Parser == nil { conn.http2Parser = l7.NewHttp2Parser() } requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration)) for _, req := range requests { stats.observe(req.Status.Http(), "", req.Duration) //trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration) } case l7.ProtocolPostgres: if r.Method != l7.MethodStatementClose { stats.observe(r.Status.String(), "", r.Duration) } //if conn.postgresParser == nil { // conn.postgresParser = l7.NewPostgresParser() //} //query := conn.postgresParser.Parse(r.Payload) //trace.PostgresQuery(query, r.Status.Error(), r.Duration) case l7.ProtocolMysql: if r.Method != l7.MethodStatementClose { stats.observe(r.Status.String(), "", r.Duration) } if c.l7Attach && c.valuableTrace(r.TraceId) { if conn.mysqlParser == nil { conn.mysqlParser = l7.NewMysqlParser() } query := conn.mysqlParser.Parse(r.Payload, r.StatementId) //trace.MysqlQuery(query, r.Status.Error(), r.Duration) //apmTrace, ok := c.getTrace(r.TraceId) apmTrace, err := c.getOrInitTrace(r.TraceId) fmt.Println(err) //fmt.Println("mysql r.TraceId:", r.TraceId) //fmt.Println("ok:", ok) //fmt.Println("traceMap:", len(c.traceMap)) if err == nil { //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest) apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest) c.SendEvent(apmTrace, r.TraceId) } } case l7.ProtocolDM: //统计dm的query次数 stats.observe(r.Status.String(), "", r.Duration) //是否发送数据 if c.l7Attach && c.valuableTrace(r.TraceId) { if conn.dmParser == nil { conn.dmParser = l7.NewDmParser() } query := conn.dmParser.Parse(r.Payload, r.StatementId) apmTrace, err := c.getOrInitTrace(r.TraceId) if err == nil { apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest) c.SendEvent(apmTrace, r.TraceId) } } case l7.ProtocolMemcached: stats.observe(r.Status.String(), "", r.Duration) if c.l7Attach && c.valuableTrace(r.TraceId) { } //cmd, items := l7.ParseMemcached(r.Payload) //trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration) case l7.ProtocolRedis: stats.observe(r.Status.String(), "", r.Duration) if c.l7Attach && c.valuableTrace(r.TraceId) { cmd, args := l7.ParseRedis(r.Payload) //fmt.Println("cmd", cmd) //fmt.Println("args", args) //apmTrace, ok := c.getTrace(r.TraceId) apmTrace, err := c.getOrInitTrace(r.TraceId) if err == nil { //apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration) apmTrace.RedisTraceQueryEvent(cmd, args, r, conn.ActualDest) c.SendEvent(apmTrace, r.TraceId) } } //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration) case l7.ProtocolMongo: stats.observe(r.Status.String(), "", r.Duration) if c.l7Attach && c.valuableTrace(r.TraceId) { } //query := l7.ParseMongo(r.Payload) //trace.MongoQuery(query, r.Status.Error(), r.Duration) case l7.ProtocolKafka, l7.ProtocolCassandra: stats.observe(r.Status.String(), "", r.Duration) if c.l7Attach && c.valuableTrace(r.TraceId) { } case l7.ProtocolRabbitmq, l7.ProtocolNats: stats.observe(r.Status.String(), r.Method.String(), 0) if c.l7Attach && c.valuableTrace(r.TraceId) { } case l7.ProtocolDubbo2: stats.observe(r.Status.String(), "", r.Duration) if c.l7Attach && c.valuableTrace(r.TraceId) { } } return nil } func (c *Container) buildIDs(pid uint32) bool { c.lock.Lock() defer c.lock.Unlock() p := c.processes[pid] if p != nil { p.cmdline = string(proc.GetRealCmdline(pid)) } for address, val := range c.getListens() { if val == 1 { ip := address.IP() if ip.Is4() && !ip.IsLoopback() { // 获取端口号 port := address.Port() //c.instanceID.IntVal, c.instanceID.HashtVal, _ = c.AppInfo.Sn = ip.String() c.AppInfo.Sport = int(port) strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip.String(), port)) c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64() c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte() //c.AppInfo.InstanceId = c.instanceID.IntVal strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid)))) c.AppInfo.AgentId, _ = strAgentID.ToInt64() c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid) return true } } } return false } func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) { c.lock.Lock() defer c.lock.Unlock() // get the associated uprobe uprobe, err := c.GetUprobe(event, tracer) if err != nil { fmt.Println("GetUprobeGetUprobe errer: %v", err) // log.Errorf("failed to get uprobe for event %+v: %+v", event, err) return } if event.TraceId <= 0 { fmt.Println("StackProcess TraceId id 0") // log.Errorf("failed to get uprobe for event %+v: %+v", event, err) return } // fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart) stackFun := ebpftracer.StackFunEvent{} stackFun.Uprobe = &uprobe stackFun.StackEvent = event apmTrace, ok := c.getTrace(event.TraceId) if ok { apmTrace.FunAdd(stackFun) } } func byteExtractString(nameString [100]byte) string { n := bytes.IndexByte(nameString[:], 0) if n == -1 { n = len(nameString) // 没找到零值,使用数组长度 } return string(nameString[:n]) } func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) { c.lock.Lock() defer c.lock.Unlock() // get the associated uprobe switch event.Location { case 0: // ret Funcname := "" if event.Type != uint64(CodeTypeJava) { uprobe, err := c.GetUprobe(event, tracer) if err != nil { fmt.Println("GetUprobeGetUprobe errer: %v", err) // log.Errorf("failed to get uprobe for event %+v: %+v", event, err) return } Funcname = uprobe.Funcname } else { ClassName := byteExtractString(event.ClassName) MethedName := byteExtractString(event.MethedName) Funcname = ClassName + "." + MethedName } if event.TraceId <= 0 { fmt.Println("StackProcess TraceId id 0") // log.Errorf("failed to get uprobe for event %+v: %+v", event, err) return } //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart) apmTrace, err := c.getOrInitTrace(event.TraceId) if err == nil { //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid) duration := event.TimeNsEnd - event.TimeNsStart apmTrace.FuncTraceQuery(Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd) c.SendEvent(apmTrace, event.TraceId) } } } // ResolveAddress returns the symbol(s) and offset of the given address. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) { if addr == 0 { // err = errors.Wrapf(SymbolNotFoundError, "0") return } // symbols, _, err := e.Symbols() if err != nil { return } idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr }) if idx == 0 { // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr)) return } // why diff symbol may contains the same addr? sym := symbols[idx-1] for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- { syms = append(syms, symbols[i]) } for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ { syms = append(syms, symbols[i]) } return syms, uint(addr - sym.Value), nil } type MemoryMap struct { Start, End uint64 } // ReadFirstLineOfMapsFile reads the first line of /proc//maps file and return the memory map as a MemoryMap struct func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) { file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid)) if err != nil { return nil, err } defer file.Close() scanner := bufio.NewScanner(file) if scanner.Scan() { fields := strings.Fields(scanner.Text()) addresses := strings.Split(fields[0], "-") if len(addresses) != 2 { return nil, errors.New("unexpected format in /proc//maps") } start, err := strconv.ParseUint(addresses[0], 16, 64) if err != nil { return nil, err } end, err := strconv.ParseUint(addresses[1], 16, 64) if err != nil { return nil, err } return &MemoryMap{ Start: start, End: end, }, nil } if err := scanner.Err(); err != nil { return nil, err } return nil, errors.New("empty /proc//maps") } func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) { //fmt.Println("GetUprobe entory:") memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid))) Address := event.Ip - memoryMap.Start // fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address) for _, fun := range c.UprobesMap { funAddress := fun.Address + fun.AbsOffset // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset) if funAddress == Address { // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x ---- %s--%x\n", memoryMap.Start, event.Ip, fun.Funcname, fun.Address) return fun, nil } } syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols) if err != nil { return } for _, sym := range syms { //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset) uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)] if ok { return uprobe, nil } } err = errors.New("uprobe not found") return } func (c *Container) GetAppInfo() AppInfo { return c.AppInfo } // 可注入前置 func (c *Container) checkEventReady() bool { c.lock.Lock() defer c.lock.Unlock() return c.l7EventReady } func (c *Container) eventReady() { c.lock.Lock() defer c.lock.Unlock() c.l7EventReady = true } // uprobe前置 func (c *Container) checkL7AttachReady() bool { c.lock.Lock() defer c.lock.Unlock() return c.l7Attach } func (c *Container) l7AttachSuccess() { c.lock.Lock() defer c.lock.Unlock() c.l7Attach = true } func (c *Container) verifyAttachConditions(r *Registry, pid uint32) bool { p := c.processes[pid] if p != nil && c.checkEventReady() { codeType := c.GetCodeTypeFromCache(pid) if codeType.IsUnknownCode() { klog.WithField("pid", pid).Debug("[verify] unknown language.") return false } cmdline := p.GetCmdline() if len(cmdline) == 0 { return false } whiteListByCode := r.getWhiteListByCodeType(codeType) //klog.WithField("pid", pid).WithField("codeType", codeType.String()). // Infof("[verify] white list %v", utils.ToString(whiteListByCode)) // 当前语言的白名单规则 for _, setting := range whiteListByCode { ruleVal := setting.Filters if ruleVal == "" { continue } // 判断规则 if strings.Contains(cmdline, ruleVal) { c.WhiteSettingInfo = setting klog.WithField("pid", pid). WithField("codeType", codeType.String()). WithField("ruleVal", ruleVal). WithField("cmdline", cmdline). WithField("white list", utils.ToString(whiteListByCode)). Infoln("[verify] check successful.") return true } } } return false } func (c *Container) detachUprobes(pid uint32) { c.lock.Lock() defer c.lock.Unlock() // close uprobe if p := c.processes[pid]; p != nil { if len(p.uprobes) > 0 { klog.Infof("detachUprobes", pid) // 关闭应用层uprobes p.DynamicClose() // 关闭7层监控 c.l7Attach = false // 变更应用为卸载状态 c.AppInfo.AppUninstall() } } } func (c *Container) getRootfs() string { if c.metadata != nil && c.metadata.rootfs != "" { return path.Join(*flags.HostDirPathPrefix, c.metadata.rootfs) } return "" }