| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497 |
- package containers
- import (
- "bufio"
- "debug/elf"
- "fmt"
- "github.com/coroot/coroot-node-agent/ebpftracer"
- "github.com/coroot/coroot-node-agent/ebpftracer/l7"
- "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
- "github.com/coroot/coroot-node-agent/proc"
- "github.com/coroot/coroot-node-agent/tracing"
- "github.com/coroot/coroot-node-agent/utils"
- . "github.com/coroot/coroot-node-agent/utils/modelse"
- "github.com/pkg/errors"
- klog "github.com/sirupsen/logrus"
- "inet.af/netaddr"
- "os"
- "sort"
- "strconv"
- "strings"
- "time"
- )
- const TRACE_STATUS = 1
- func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
- trace, ok := c.traceMap[traceId]
- return trace, ok
- }
- func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
- c.traceMap[traceId] = trace
- }
- // 查询或创建trace信息
- func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
- trace, ok := c.getTrace(traceId)
- if !ok {
- //new trace
- trace = tracing.NewTraceFromEvent(string(c.id))
- //create TraceMap
- c.createTraceMap(traceId, trace)
- //create ParentSpan
- trace.CreateRootSpan(traceId)
- }
- return trace, nil
- }
- func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
- method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
- ip, err := netaddr.ParseIP(hostIp)
- if err != nil {
- fmt.Println("host ip error")
- hostIp = "127.0.0.1"
- }
- addr := netaddr.IPPortFrom(ip, port)
- trace := tracing.NewTrace(string(c.id), addr)
- if trace == nil {
- return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
- }
- c.traceMap[traceId] = trace
- trace.TraceStart(method, path, r.Status, r.Duration)
- return nil
- }
- // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
- // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
- func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
- if t.AllEventReady(traceID) {
- t.SendEvent()
- klog.Infof("SendEvent %d", traceID)
- //fmt.Println(t.GetSpan())
- //fmt.Println("===============")
- delete(c.traceMap, traceID)
- }
- }
- func (c *Container) valuableTrace(traceID uint64) bool {
- return traceID != 0
- }
- func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
- c.lock.Lock()
- defer c.lock.Unlock()
- if r.Protocol == l7.ProtocolDNS {
- return c.onDNSRequest(r)
- }
- //if !c.valuableTrace(r.TraceId) {
- // return nil
- //}
- if r.Protocol == l7.ProtocolTrace && c.l7Attach && c.valuableTrace(r.TraceId) {
- if r.TraceStart == TRACE_STATUS {
- klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId)
- trace, err := c.getOrInitTrace(r.TraceId)
- if err == nil {
- method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
- ip, _ := netaddr.ParseIP(hostIp)
- //codeType := c.GetCodeTypeFromCache(pid)
- trace.TraceStartEvent(method, path, r.Status, netaddr.IPPortFrom(ip, port), pid, c.GetAppInfo())
- c.SendEvent(trace, r.TraceId)
- }
- return nil
- }
- if r.TraceEnd == TRACE_STATUS {
- klog.Infof("====ProtocolTrace end==== %d %d", pid, r.TraceId)
- trace, err := c.getOrInitTrace(r.TraceId)
- if err == nil {
- trace.TraceEndEvent(r)
- c.SendEvent(trace, r.TraceId)
- }
- return nil
- }
- }
- if r.Protocol == l7.ProtocolHTTP {
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
- apmTrace, err := c.getOrInitTrace(r.TraceId)
- //fmt.Println("ProtocolHTTP-----", r.TraceId, err)
- if err == nil {
- apmTrace.HttpTraceRequestEvent(method, path, hostIp, port, r)
- c.SendEvent(apmTrace, r.TraceId)
- }
- }
- //return nil
- }
- conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
- //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
- if conn == nil {
- return nil
- }
- if timestamp != 0 && conn.Timestamp != timestamp {
- return nil
- }
- stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
- //trace := tracing.NewTrace(string(c.id), conn.ActualDest)
- switch r.Protocol {
- case l7.ProtocolHTTP:
- stats.observe(r.Status.Http(), "", r.Duration)
- case l7.ProtocolHTTP2:
- if conn.http2Parser == nil {
- conn.http2Parser = l7.NewHttp2Parser()
- }
- requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
- for _, req := range requests {
- stats.observe(req.Status.Http(), "", req.Duration)
- //trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
- }
- case l7.ProtocolPostgres:
- if r.Method != l7.MethodStatementClose {
- stats.observe(r.Status.String(), "", r.Duration)
- }
- //if conn.postgresParser == nil {
- // conn.postgresParser = l7.NewPostgresParser()
- //}
- //query := conn.postgresParser.Parse(r.Payload)
- //trace.PostgresQuery(query, r.Status.Error(), r.Duration)
- case l7.ProtocolMysql:
- if r.Method != l7.MethodStatementClose {
- stats.observe(r.Status.String(), "", r.Duration)
- }
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- if conn.mysqlParser == nil {
- conn.mysqlParser = l7.NewMysqlParser()
- }
- query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
- //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
- //apmTrace, ok := c.getTrace(r.TraceId)
- apmTrace, err := c.getOrInitTrace(r.TraceId)
- //fmt.Println("mysql r.TraceId:", r.TraceId)
- //fmt.Println("ok:", ok)
- //fmt.Println("traceMap:", len(c.traceMap))
- if err == nil {
- //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
- apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
- c.SendEvent(apmTrace, r.TraceId)
- }
- }
- case l7.ProtocolMemcached:
- stats.observe(r.Status.String(), "", r.Duration)
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- }
- //cmd, items := l7.ParseMemcached(r.Payload)
- //trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
- case l7.ProtocolRedis:
- stats.observe(r.Status.String(), "", r.Duration)
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- cmd, args := l7.ParseRedis(r.Payload)
- //fmt.Println("cmd", cmd)
- //fmt.Println("args", args)
- //apmTrace, ok := c.getTrace(r.TraceId)
- apmTrace, err := c.getOrInitTrace(r.TraceId)
- if err == nil {
- //apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
- apmTrace.RedisTraceQueryEvent(cmd, args, r, conn.ActualDest)
- c.SendEvent(apmTrace, r.TraceId)
- }
- }
- //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
- case l7.ProtocolMongo:
- stats.observe(r.Status.String(), "", r.Duration)
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- }
- //query := l7.ParseMongo(r.Payload)
- //trace.MongoQuery(query, r.Status.Error(), r.Duration)
- case l7.ProtocolKafka, l7.ProtocolCassandra:
- stats.observe(r.Status.String(), "", r.Duration)
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- }
- case l7.ProtocolRabbitmq, l7.ProtocolNats:
- stats.observe(r.Status.String(), r.Method.String(), 0)
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- }
- case l7.ProtocolDubbo2:
- stats.observe(r.Status.String(), "", r.Duration)
- if c.l7Attach && c.valuableTrace(r.TraceId) {
- }
- }
- return nil
- }
- func (c *Container) buildIDs(pid uint32) bool {
- c.lock.Lock()
- defer c.lock.Unlock()
- p := c.processes[pid]
- if p != nil {
- p.cmdline = string(proc.GetRealCmdline(pid))
- }
- for address, val := range c.getListens() {
- if val == 1 {
- ip := address.IP()
- if ip.Is4() && !ip.IsLoopback() {
- // 获取端口号
- port := address.Port()
- //c.instanceID.IntVal, c.instanceID.HashtVal, _ =
- c.AppInfo.Sn = ip.String()
- c.AppInfo.Sport = int(port)
- strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip.String(), port))
- c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
- c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
- //c.AppInfo.InstanceId = c.instanceID.IntVal
- strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid))))
- c.AppInfo.AgentId, _ = strAgentID.ToInt64()
- c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
- return true
- }
- }
- }
- return false
- }
- func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
- c.lock.Lock()
- defer c.lock.Unlock()
- // get the associated uprobe
- uprobe, err := c.GetUprobe(event, tracer)
- if err != nil {
- fmt.Println("GetUprobeGetUprobe errer: %v", err)
- // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
- return
- }
- if event.TraceId <= 0 {
- fmt.Println("StackProcess TraceId id 0")
- // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
- return
- }
- // fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
- stackFun := ebpftracer.StackFunEvent{}
- stackFun.Uprobe = &uprobe
- stackFun.StackEvent = event
- apmTrace, ok := c.getTrace(event.TraceId)
- if ok {
- apmTrace.FunAdd(stackFun)
- }
- }
- func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
- c.lock.Lock()
- defer c.lock.Unlock()
- // get the associated uprobe
- switch event.Location {
- case 0: // ret
- uprobe, err := c.GetUprobe(event, tracer)
- if err != nil {
- fmt.Println("GetUprobeGetUprobe errer: %v", err)
- // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
- return
- }
- if event.TraceId <= 0 {
- fmt.Println("StackProcess TraceId id 0")
- // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
- return
- }
- //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
- apmTrace, err := c.getOrInitTrace(event.TraceId)
- if err == nil {
- //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
- duration := event.TimeNsEnd - event.TimeNsStart
- apmTrace.FuncTraceQuery(uprobe.Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
- c.SendEvent(apmTrace, event.TraceId)
- }
- }
- }
- // ResolveAddress returns the symbol(s) and offset of the given address.
- func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
- if addr == 0 {
- // err = errors.Wrapf(SymbolNotFoundError, "0")
- return
- }
- // symbols, _, err := e.Symbols()
- if err != nil {
- return
- }
- idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
- if idx == 0 {
- // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
- return
- }
- // why diff symbol may contains the same addr?
- sym := symbols[idx-1]
- for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
- syms = append(syms, symbols[i])
- }
- for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
- syms = append(syms, symbols[i])
- }
- return syms, uint(addr - sym.Value), nil
- }
- type MemoryMap struct {
- Start, End uint64
- }
- // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
- func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
- file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
- if err != nil {
- return nil, err
- }
- defer file.Close()
- scanner := bufio.NewScanner(file)
- if scanner.Scan() {
- fields := strings.Fields(scanner.Text())
- addresses := strings.Split(fields[0], "-")
- if len(addresses) != 2 {
- return nil, errors.New("unexpected format in /proc/<pid>/maps")
- }
- start, err := strconv.ParseUint(addresses[0], 16, 64)
- if err != nil {
- return nil, err
- }
- end, err := strconv.ParseUint(addresses[1], 16, 64)
- if err != nil {
- return nil, err
- }
- return &MemoryMap{
- Start: start,
- End: end,
- }, nil
- }
- if err := scanner.Err(); err != nil {
- return nil, err
- }
- return nil, errors.New("empty /proc/<pid>/maps")
- }
- func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
- //fmt.Println("GetUprobe entory:")
- memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
- Address := event.Ip - memoryMap.Start
- // fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
- for _, fun := range c.UprobesMap {
- funAddress := fun.Address + fun.AbsOffset
- // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
- if funAddress == Address {
- // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x ---- %s--%x\n", memoryMap.Start, event.Ip, fun.Funcname, fun.Address)
- return fun, nil
- }
- }
- syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
- if err != nil {
- return
- }
- for _, sym := range syms {
- //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
- uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
- if ok {
- return uprobe, nil
- }
- }
- err = errors.New("uprobe not found")
- return
- }
- func (c *Container) GetAppInfo() AppInfo {
- return c.AppInfo
- }
- // 可注入前置
- func (c *Container) checkEventReady() bool {
- c.lock.Lock()
- defer c.lock.Unlock()
- return c.l7EventReady
- }
- func (c *Container) eventReady() {
- c.lock.Lock()
- defer c.lock.Unlock()
- c.l7EventReady = true
- }
- // uprobe前置
- func (c *Container) checkL7AttachReady() bool {
- c.lock.Lock()
- defer c.lock.Unlock()
- return c.l7Attach
- }
- func (c *Container) l7AttachSuccess() {
- c.lock.Lock()
- defer c.lock.Unlock()
- c.l7Attach = true
- }
- func (c *Container) verifyAttachConditions(r *Registry, pid uint32) bool {
- p := c.processes[pid]
- if p != nil && c.checkEventReady() {
- codeType := c.GetCodeTypeFromCache(pid)
- if codeType.IsUnknownCode() {
- klog.WithField("pid", pid).Infof("[verify] unknown language.")
- return false
- }
- cmdline := p.GetCmdline()
- if len(cmdline) == 0 {
- return false
- }
- whiteListByCode := r.getWhiteListByCodeType(codeType)
- klog.WithField("pid", pid).WithField("codeType", codeType.String()).
- Infof("[verify] white list %v", whiteListByCode)
- // 当前语言的白名单规则
- for _, setting := range whiteListByCode {
- ruleVal := setting.Filters
- if ruleVal == "" {
- continue
- }
- // 判断规则
- if strings.Contains(cmdline, ruleVal) {
- c.WhiteSettingInfo = setting
- klog.WithField("pid", pid).
- WithField("ruleVal", ruleVal).
- WithField("cmdline", cmdline).
- Infoln("[verify] check successful.")
- return true
- }
- }
- }
- return false
- }
- func (c *Container) detachUprobes(pid uint32) {
- c.lock.Lock()
- defer c.lock.Unlock()
- // close uprobe
- if p := c.processes[pid]; p != nil {
- if len(p.uprobes) > 0 {
- klog.Infof("detachUprobes", pid)
- p.DynamicClose()
- c.l7Attach = false
- c.AppInfo = AppInfo{}
- }
- }
- }
|