package containers import ( debugelf "debug/elf" "fmt" "os" "sort" "strconv" "strings" "sync" "time" "github.com/coroot/coroot-node-agent/utils" . "github.com/coroot/coroot-node-agent/utils/modelse" "github.com/coroot/coroot-node-agent/cgroup" "github.com/coroot/coroot-node-agent/common" "github.com/coroot/coroot-node-agent/ebpftracer" "github.com/coroot/coroot-node-agent/ebpftracer/l7" "github.com/coroot/coroot-node-agent/ebpftracer/tracer" "github.com/coroot/coroot-node-agent/flags" "github.com/coroot/coroot-node-agent/logs" "github.com/coroot/coroot-node-agent/node" "github.com/coroot/coroot-node-agent/pinger" "github.com/coroot/coroot-node-agent/proc" "github.com/coroot/coroot-node-agent/tracing" "github.com/coroot/logparser" "github.com/prometheus/client_golang/prometheus" klog "github.com/sirupsen/logrus" "github.com/vishvananda/netns" "golang.org/x/exp/maps" "inet.af/netaddr" ) var ( gcInterval = 10 * time.Second pingTimeout = 300 * time.Millisecond ) type ContainerID string type ContainerNetwork struct { NetworkID string } type ContainerMetadata struct { name string labels map[string]string volumes map[string]string logPath string image string logDecoder logparser.Decoder hostListens map[string][]netaddr.IPPort networks map[string]ContainerNetwork env map[string]string systemdTriggeredBy string rootfs string } type Delays struct { cpu time.Duration disk time.Duration } type LogParser struct { parser *logparser.Parser stop func() } func (p *LogParser) Stop() { if p.stop != nil { p.stop() } p.parser.Stop() } type AddrPair struct { src netaddr.IPPort dst netaddr.IPPort } type ActiveConnection struct { Dest netaddr.IPPort Src netaddr.IPPort ActualDest netaddr.IPPort Pid uint32 Fd uint64 Timestamp uint64 Closed time.Time BytesSent uint64 BytesReceived uint64 http2Parser *l7.Http2Parser postgresParser *l7.PostgresParser mysqlParser *l7.MysqlParser dmParser *l7.DmParser } type ActiveAccept struct { Dest netaddr.IPPort Src netaddr.IPPort Pid uint32 Fd uint64 Timestamp uint64 Closed time.Time BytesSent uint64 BytesReceived uint64 } type ListenDetails struct { ClosedAt time.Time NsIPs []netaddr.IP } type PidFd struct { Pid uint32 Fd uint64 } type K8sContainer struct { ns string podName string podId string workload string containerName string pid string } type ConnectionStats struct { Count uint64 TotalTime time.Duration Retransmissions uint64 BytesSent uint64 PerBytesSent uint64 BytesReceived uint64 PerBytesReceived uint64 Src netaddr.IPPort ConEstTime time.Duration FirstReadTime uint64 FirstWriteTime uint64 NewReadTime uint64 } type AcceptStats struct { BytesSent uint64 BytesReceived uint64 } type Container struct { id ContainerID cgroup *cgroup.Cgroup metadata *ContainerMetadata K8sContainer processes map[uint32]*Process startedAt time.Time zombieAt time.Time restarts int delays Delays delaysByPid map[uint32]Delays delaysLock sync.Mutex listens map[netaddr.IPPort]map[uint32]*ListenDetails connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count connectsFailed map[netaddr.IPPort]int64 // dst -> count connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time connectionsActive map[AddrPair]*ActiveConnection connectionsByPidFd map[PidFd]*ActiveConnection acceptsSuccessful map[AddrPair]*AcceptStats acceptLastAttempt map[netaddr.IPPort]time.Time // dst -> time acceptsActive map[AddrPair]*ActiveAccept acceptsByPidFd map[PidFd]*ActiveAccept l7Stats L7Stats dnsStats *L7Metrics oomKills int pythonThreadLockWaitTime time.Duration mounts map[string]proc.MountInfo logParsers map[string]*LogParser hostConntrack *Conntrack nsConntrack *Conntrack lbConntracks []*Conntrack registry *Registry lock sync.RWMutex done chan struct{} traceMap map[uint64]*tracing.Trace //instanceID utils.ID Symbols []debugelf.Symbol Uprobes []tracer.Uprobe UprobesMap map[string]tracer.Uprobe l7EventReady bool l7Attach bool // 白名单详情 WhiteSettingInfo WhiteSettingInfo // 应用详情 AppInfo AppInfo } func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) { netNs, err := proc.GetNetNs(pid) if err != nil { return nil, err } defer netNs.Close() c := &Container{ id: id, cgroup: cg, metadata: md, processes: map[uint32]*Process{}, delaysByPid: map[uint32]Delays{}, listens: map[netaddr.IPPort]map[uint32]*ListenDetails{}, connectsSuccessful: map[AddrPair]*ConnectionStats{}, connectsFailed: map[netaddr.IPPort]int64{}, connectLastAttempt: map[netaddr.IPPort]time.Time{}, connectionsActive: map[AddrPair]*ActiveConnection{}, connectionsByPidFd: map[PidFd]*ActiveConnection{}, acceptsSuccessful: map[AddrPair]*AcceptStats{}, acceptLastAttempt: map[netaddr.IPPort]time.Time{}, acceptsActive: map[AddrPair]*ActiveAccept{}, acceptsByPidFd: map[PidFd]*ActiveAccept{}, l7Stats: L7Stats{}, dnsStats: &L7Metrics{}, mounts: map[string]proc.MountInfo{}, logParsers: map[string]*LogParser{}, hostConntrack: hostConntrack, done: make(chan struct{}), traceMap: make(map[uint64]*tracing.Trace), registry: registry, } for _, n := range md.networks { if nsHandle := FindNetworkLoadBalancerNs(n.NetworkID); nsHandle.IsOpen() { if ct, err := NewConntrack(nsHandle); err != nil { klog.Warningln(err) } else { c.lbConntracks = append(c.lbConntracks, ct) } _ = nsHandle.Close() } } c.runLogParser("") // go func() { // ticker := time.NewTicker(gcInterval) // defer ticker.Stop() // for { // select { // case <-c.done: // return // case t := <-ticker.C: // c.gc(t) // } // } // }() return c, nil } func (c *Container) Close() { for _, p := range c.logParsers { p.Stop() } for _, ct := range c.lbConntracks { _ = ct.Close() } if c.nsConntrack != nil { _ = c.nsConntrack.Close() } close(c.done) } func (c *Container) Dead(now time.Time) bool { return !c.zombieAt.IsZero() && now.Sub(c.zombieAt) > gcInterval } func (c *Container) Describe(ch chan<- *prometheus.Desc) { // some fixed metric description is required here to register/unregister the collector correctly ch <- prometheus.NewDesc("container", "", nil, nil) } func (c *Container) Collect(ch chan<- prometheus.Metric) { c.registry.updateTrafficStatsIfNecessary() c.lock.RLock() defer c.lock.RUnlock() if c.metadata.image != "" || c.metadata.systemdTriggeredBy != "" { ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image, c.metadata.systemdTriggeredBy) } ch <- counter(metrics.Restarts, float64(c.restarts)) if cpu, err := c.cgroup.CpuStat(); err == nil { if cpu.LimitCores > 0 { ch <- gauge(metrics.CPULimit, cpu.LimitCores) } ch <- counter(metrics.CPUUsage, cpu.UsageSeconds) ch <- counter(metrics.ThrottledTime, cpu.ThrottledTimeSeconds) } if taskstatsClient != nil { c.updateDelays() ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second)) ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second)) } if s, err := c.cgroup.MemoryStat(); err == nil { ch <- gauge(metrics.MemoryRss, float64(s.RSS)) ch <- gauge(metrics.MemoryCache, float64(s.Cache)) if s.Limit > 0 { ch <- gauge(metrics.MemoryLimit, float64(s.Limit)) } } if c.oomKills > 0 { ch <- counter(metrics.OOMKills, float64(c.oomKills)) } if disks, err := node.GetDisks(); err == nil { ioStat, _ := c.cgroup.IOStat() for majorMinor, mounts := range c.getMounts() { dev := disks.GetParentBlockDevice(majorMinor) if dev == nil { continue } for mountPoint, fsStat := range mounts { dls := []string{mountPoint, dev.Name, c.metadata.volumes[mountPoint]} ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...) ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...) ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...) if io, ok := ioStat[majorMinor]; ok { ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...) ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...) ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...) ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...) } } } } for addr, open := range c.getListens() { ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "") } for proxy, addrs := range c.getProxiedListens() { for addr := range addrs { ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy) } } strInstanceID := strconv.FormatInt(c.AppInfo.InstanceIdHash.IntVal, 10) strAppId := strconv.FormatInt(c.AppInfo.AppIdHash.IntVal, 10) strAppName := c.AppInfo.AppName for d, stats := range c.connectsSuccessful { ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), strInstanceID, strAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String()) ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), strInstanceID, strAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String()) if stats.Retransmissions > 0 { ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), strInstanceID, strAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String()) } ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String()) ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String()) ch <- counter(metrics.NetBytesSentPer, float64(stats.PerBytesSent), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String()) ch <- counter(metrics.NetBytesReceivedPer, float64(stats.PerBytesReceived), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String()) ch <- counter(metrics.NetDataLatency, float64(stats.FirstReadTime-stats.FirstWriteTime), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String()) ch <- counter(metrics.NetDataDuration, float64(stats.NewReadTime-stats.FirstWriteTime), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String()) ch <- counter(metrics.NetEstTime, float64(stats.ConEstTime), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String()) klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime) stats.PerBytesReceived = 0 stats.PerBytesSent = 0 } // for d, stats := range c.acceptsSuccessful { // ch <- counter(metrics.NetAcceptsSuccessful, float64(0), d.src.String(), d.dst.String()) // ch <- counter(metrics.NetAcceptBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String()) // ch <- counter(metrics.NetAcceptBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String()) // klog.Infof("c.acceptsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d", d.src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived) // } for dst, count := range c.connectsFailed { ch <- counter(metrics.NetConnectionsFailed, float64(count), strInstanceID, strAppId, strAppName, dst.String()) } connections := map[AddrPair]int{} for addrPair, conn := range c.connectionsActive { if !conn.Closed.IsZero() { continue } connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest}]++ } for d, count := range connections { ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String()) } for source, p := range c.logParsers { for _, c := range p.parser.GetCounters() { ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample) } } appTypes := map[string]struct{}{} seenJvms := map[string]bool{} seenDotNetApps := map[string]bool{} pids := maps.Keys(c.processes) sort.Slice(pids, func(i, j int) bool { return pids[i] < pids[j] }) for _, pid := range pids { process := c.processes[pid] cmdline := proc.GetCmdline(pid) if len(cmdline) == 0 { continue } appType := guessApplicationType(cmdline) if appType != "" { appTypes[appType] = struct{}{} } if process.isGolangApp { appTypes["golang"] = struct{}{} } switch { case isJvm(cmdline): jvm, jMetrics := jvmMetrics(pid) if len(jMetrics) > 0 && !seenJvms[jvm] { seenJvms[jvm] = true for _, m := range jMetrics { ch <- m } } case process.dotNetMonitor != nil: appTypes["dotnet"] = struct{}{} appName := process.dotNetMonitor.AppName() if !seenDotNetApps[appName] { seenDotNetApps[appName] = true process.dotNetMonitor.Collect(ch) } } } for appType := range appTypes { ch <- gauge(metrics.ApplicationType, 1, appType) } if c.pythonThreadLockWaitTime > 0 { ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds()) } if c.dnsStats.Requests != nil { c.dnsStats.Requests.Collect(ch) } if c.dnsStats.Latency != nil { c.dnsStats.Latency.Collect(ch) } c.l7Stats.collect(ch) if !*flags.DisablePinger { for ip, rtt := range c.ping() { ch <- gauge(metrics.NetLatency, rtt, ip.String()) } } c.gc(time.Now()) } func (c *Container) onProcessStart(pid uint32) *Process { c.lock.Lock() defer c.lock.Unlock() stats, err := TaskstatsPID(pid) if err != nil { klog.WithError(err).Errorf("Failed onProcessStart [%d]", pid) return nil } c.zombieAt = time.Time{} p := NewProcess(pid, stats, c.registry.tracer) if p == nil { return nil } c.processes[pid] = p if c.startedAt.IsZero() { c.startedAt = stats.BeginTime } else { min := stats.BeginTime for _, p := range c.processes { if p.StartedAt.Before(min) { min = p.StartedAt } } if min.After(c.startedAt) { c.restarts++ c.startedAt = min } } return p } func (c *Container) onProcessExit(pid uint32, oomKill bool) { c.lock.Lock() defer c.lock.Unlock() if p := c.processes[pid]; p != nil { p.Close() } delete(c.processes, pid) if len(c.processes) == 0 { c.zombieAt = time.Now() } delete(c.delaysByPid, pid) if oomKill { c.oomKills++ } } func (c *Container) onFileOpen(pid uint32, fd uint64) { mntId, logPath := resolveFd(pid, fd) func() { if mntId == "" { return } c.lock.Lock() _, ok := c.mounts[mntId] c.lock.Unlock() if ok { return } byMountId := proc.GetMountInfo(pid) if byMountId == nil { return } if mi, ok := byMountId[mntId]; ok { c.lock.Lock() c.mounts[mntId] = mi c.lock.Unlock() } }() if logPath != "" { c.lock.Lock() c.runLogParser(logPath) c.lock.Unlock() } } // set func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) { klog.Infof("TCP listen open pid=%d id=%s addr=%s", pid, c.id, addr) if common.PortFilter.ShouldBeSkipped(addr.Port()) { return } if !safe { c.lock.Lock() defer c.lock.Unlock() } if _, ok := c.listens[addr]; !ok { c.listens[addr] = map[uint32]*ListenDetails{} } details := &ListenDetails{} c.listens[addr][pid] = details if addr.IP().IsUnspecified() { ns, err := proc.GetNetNs(pid) if err != nil { if !common.IsNotExist(err) { klog.Warningln(err) } return } defer ns.Close() ips, err := proc.GetNsIps(ns) if err != nil { klog.Warningln(err) return } klog.Infof("got IPs %s for %s", ips, ns.UniqueId()) details.NsIPs = ips } } func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) { klog.Infof("TCP listen close pid=%d id=%s addr=%s", pid, c.id, addr) c.lock.Lock() defer c.lock.Unlock() if _, byAddr := c.listens[addr]; byAddr { if _, byPid := c.listens[addr][pid]; byPid { if details := c.listens[addr][pid]; details != nil { details.ClosedAt = time.Now() } } } } func (c *Container) onAcceptOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) { klog.Infof("accept pid=%d id=%s dstaddr=%s srcaddr=%s", pid, c.id, dst.IP(), src.IP()) // if common.PortFilter.ShouldBeSkipped(dst.Port()) { // return // } p := c.processes[pid] if p == nil { return } if dst.IP().IsLoopback() && !p.isHostNs() { return } // actualDst, err := c.getActualDestination(p, src, dst) // if err != nil { // if !common.IsNotExist(err) { // klog.Warningf("cannot open NetNs for pid %d: %s", pid, err) // } // return // } // switch { // case actualDst == nil: // actualDst = &dst // case actualDst.IP().IsLoopback() && !p.isHostNs(): // return // } // if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) { // return // } c.lock.Lock() defer c.lock.Unlock() if !failed { key := AddrPair{src: dst, dst: src} stats := c.acceptsSuccessful[key] if stats == nil { stats = &AcceptStats{} c.acceptsSuccessful[key] = stats } acceptCon := &ActiveAccept{ Dest: src, Src: dst, Pid: pid, Fd: fd, Timestamp: timestamp, } c.acceptsActive[AddrPair{src: dst, dst: src}] = acceptCon c.acceptsByPidFd[PidFd{Pid: pid, Fd: fd}] = acceptCon } c.acceptLastAttempt[dst] = time.Now() } func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) { if common.PortFilter.ShouldBeSkipped(dst.Port()) { return } p := c.processes[pid] if p == nil { return } if dst.IP().IsLoopback() && !p.isHostNs() { return } actualDst, err := c.getActualDestination(p, src, dst) if err != nil { if !common.IsNotExist(err) { klog.Warningf("cannot open NetNs for pid %d: %s", pid, err) } return } switch { case actualDst == nil: actualDst = &dst case actualDst.IP().IsLoopback() && !p.isHostNs(): return } if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) { return } c.lock.Lock() defer c.lock.Unlock() if failed { c.connectsFailed[dst]++ } else { key := AddrPair{src: dst, dst: *actualDst} stats := c.connectsSuccessful[key] if stats == nil { stats = &ConnectionStats{} c.connectsSuccessful[key] = stats } stats.Count++ stats.TotalTime += duration stats.Src = src stats.ConEstTime = duration connection := &ActiveConnection{ Dest: dst, Src: src, ActualDest: *actualDst, Pid: pid, Fd: fd, Timestamp: timestamp, } c.connectionsActive[AddrPair{src: src, dst: dst}] = connection c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection } c.connectLastAttempt[dst] = time.Now() } func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) { if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil { return actualDst, nil } for _, lb := range c.lbConntracks { if actualDst := lb.GetActualDestination(src, dst); actualDst != nil { return actualDst, nil } } actualDst := c.hostConntrack.GetActualDestination(src, dst) if actualDst != nil { return actualDst, nil } if !p.isHostNs() { if c.nsConntrack == nil { netNs, err := proc.GetNetNs(p.Pid) if err != nil { return nil, err } defer netNs.Close() c.nsConntrack, err = NewConntrack(netNs) if err != nil { return nil, err } } return c.nsConntrack.GetActualDestination(src, dst), nil } return nil, nil } func (c *Container) onConnectionClose(e ebpftracer.Event) { c.lock.Lock() conn := c.connectionsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}] c.lock.Unlock() if conn != nil { if conn.Closed.IsZero() { if e.TrafficStats != nil { c.lock.Lock() c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived, e.FirstReadTime, e.FirstWriteTime, e.NewReadTime) c.lock.Unlock() } conn.Closed = time.Now() } } } func (c *Container) onAcceptClose(e ebpftracer.Event) { c.lock.Lock() conn := c.acceptsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}] c.lock.Unlock() if conn != nil { if conn.Closed.IsZero() { if e.TrafficStats != nil { c.lock.Lock() c.updateAcceptTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived) c.lock.Unlock() } conn.Closed = time.Now() } } } func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) { if u == nil { return } c.lock.Lock() defer c.lock.Unlock() c.updateConnectionTrafficStats(c.connectionsByPidFd[PidFd{Pid: u.Pid, Fd: u.FD}], u.BytesSent, u.BytesReceived, 0, 0, 0) } func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, received, firstreadtime, firstwritetime, newreadtime uint64) { if ac == nil { return } key := AddrPair{src: ac.Dest, dst: ac.ActualDest} stats := c.connectsSuccessful[key] if stats == nil { stats = &ConnectionStats{} c.connectsSuccessful[key] = stats } if sent > ac.BytesSent { stats.BytesSent += sent - ac.BytesSent stats.PerBytesSent = sent - ac.BytesSent } if received > ac.BytesReceived { stats.BytesReceived += received - ac.BytesReceived stats.PerBytesReceived = received - ac.BytesReceived } if firstreadtime != 0 && firstwritetime != 0 && newreadtime != 0 { stats.FirstReadTime = firstreadtime stats.FirstWriteTime = firstwritetime stats.NewReadTime = newreadtime } ac.BytesSent = sent ac.BytesReceived = received } func (c *Container) updateAcceptTrafficStats(ac *ActiveAccept, sent, received uint64) { if ac == nil { return } klog.Infoln("TCP onConnectionClose5", ac.BytesSent, ac.BytesReceived, ac) key := AddrPair{src: ac.Src, dst: ac.Dest} stats := c.acceptsSuccessful[key] if stats == nil { stats = &AcceptStats{} c.acceptsSuccessful[key] = stats } if sent > ac.BytesSent { stats.BytesSent += sent - ac.BytesSent } if received > ac.BytesReceived { stats.BytesReceived += received - ac.BytesReceived } ac.BytesSent = sent ac.BytesReceived = received } func (c *Container) onDNSRequest(r *l7.RequestData) (map[netaddr.IP]string, string, string) { status := r.Status.DNS() if status == "" { return nil, "", "" } t, fqdn, ips := l7.ParseDns(r.Payload) if t == "" { return nil, "", "" } if c.dnsStats.Requests == nil { dnsReq := L7Requests[l7.ProtocolDNS] c.dnsStats.Requests = prometheus.NewCounterVec( prometheus.CounterOpts{Name: dnsReq.Name, Help: dnsReq.Help}, []string{"request_type", "domain", "status"}, ) } if m, _ := c.dnsStats.Requests.GetMetricWithLabelValues(t, fqdn, status); m != nil { m.Inc() } if r.Duration != 0 { if c.dnsStats.Latency == nil { dnsLatency := L7Latency[l7.ProtocolDNS] c.dnsStats.Latency = prometheus.NewHistogram(prometheus.HistogramOpts{Name: dnsLatency.Name, Help: dnsLatency.Help}) } c.dnsStats.Latency.Observe(r.Duration.Seconds()) } ip2fqdn := map[netaddr.IP]string{} if fqdn != "" { for _, ip := range ips { ip2fqdn[ip] = fqdn } } return ip2fqdn, t, fqdn } func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string { c.lock.Lock() defer c.lock.Unlock() if r.Protocol == l7.ProtocolDNS { //return c.onDNSRequest(r) } conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] if conn == nil { return nil } if timestamp != 0 && conn.Timestamp != timestamp { return nil } stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest) trace := tracing.NewTrace(string(c.id), conn.ActualDest) switch r.Protocol { case l7.ProtocolHTTP: stats.observe(r.Status.Http(), "", r.Duration) method, path := l7.ParseHttp(r.Payload) trace.HttpRequest(method, path, r.Status, r.Duration) case l7.ProtocolHTTP2: if conn.http2Parser == nil { conn.http2Parser = l7.NewHttp2Parser() } requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration)) for _, req := range requests { stats.observe(req.Status.Http(), "", req.Duration) trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration) } case l7.ProtocolPostgres: if r.Method != l7.MethodStatementClose { stats.observe(r.Status.String(), "", r.Duration) } if conn.postgresParser == nil { conn.postgresParser = l7.NewPostgresParser() } query := conn.postgresParser.Parse(r.Payload) trace.PostgresQuery(query, r.Status.Error(), r.Duration) case l7.ProtocolMysql: if r.Method != l7.MethodStatementClose { stats.observe(r.Status.String(), "", r.Duration) } if conn.mysqlParser == nil { conn.mysqlParser = l7.NewMysqlParser() } query := conn.mysqlParser.Parse(r.Payload, r.StatementId) trace.MysqlQuery(query, r.Status.Error(), r.Duration) case l7.ProtocolMemcached: stats.observe(r.Status.String(), "", r.Duration) cmd, items := l7.ParseMemcached(r.Payload) trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration) case l7.ProtocolRedis: stats.observe(r.Status.String(), "", r.Duration) cmd, args := l7.ParseRedis(r.Payload) trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration) case l7.ProtocolMongo: stats.observe(r.Status.String(), "", r.Duration) query := l7.ParseMongo(r.Payload) trace.MongoQuery(query, r.Status.Error(), r.Duration) case l7.ProtocolKafka, l7.ProtocolCassandra: stats.observe(r.Status.String(), "", r.Duration) case l7.ProtocolRabbitmq, l7.ProtocolNats: stats.observe(r.Status.String(), r.Method.String(), 0) case l7.ProtocolDubbo2: stats.observe(r.Status.String(), "", r.Duration) } return nil } func (c *Container) onRetransmission(srcDst AddrPair) bool { c.lock.Lock() defer c.lock.Unlock() conn, ok := c.connectionsActive[srcDst] if !ok { return false } key := AddrPair{src: srcDst.dst, dst: conn.ActualDest} stats := c.connectsSuccessful[key] if stats == nil { stats = &ConnectionStats{} c.connectsSuccessful[key] = stats } stats.Retransmissions++ return true } func (c *Container) updateDelays() { c.delaysLock.Lock() defer c.delaysLock.Unlock() for pid := range c.processes { stats, err := TaskstatsTGID(pid) if err != nil { continue } d := c.delaysByPid[pid] c.delays.cpu += stats.CPUDelay - d.cpu c.delays.disk += stats.BlockIODelay - d.disk d.cpu = stats.CPUDelay d.disk = stats.BlockIODelay c.delaysByPid[pid] = d } } func (c *Container) getMounts() map[string]map[string]*proc.FSStat { if len(c.mounts) == 0 { return nil } res := map[string]map[string]*proc.FSStat{} for _, mi := range c.mounts { var stat *proc.FSStat for pid := range c.processes { s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint)) if err == nil { stat = &s break } } if stat == nil { continue } if _, ok := res[mi.MajorMinor]; !ok { res[mi.MajorMinor] = map[string]*proc.FSStat{} } res[mi.MajorMinor][mi.MountPoint] = stat } return res } func (c *Container) getListens() map[netaddr.IPPort]int { res := map[netaddr.IPPort]int{} for addr, byPid := range c.listens { open := 0 isHostNs := false ips := map[netaddr.IP]bool{} for pid, details := range byPid { p := c.processes[pid] if p == nil { continue } if p.isHostNs() { isHostNs = true } if details.ClosedAt.IsZero() { open = 1 } for _, ip := range details.NsIPs { ips[ip] = true } } if !addr.IP().IsUnspecified() { ips = map[netaddr.IP]bool{addr.IP(): true} } for ip := range ips { if ip.IsLoopback() && !isHostNs { continue } res[netaddr.IPPortFrom(ip, addr.Port())] = open } } return res } func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} { if len(c.metadata.hostListens) == 0 { return nil } hasUnspecified := false for _, addrs := range c.metadata.hostListens { for _, addr := range addrs { if addr.IP().IsUnspecified() { hasUnspecified = true break } } } var hostIps []netaddr.IP if hasUnspecified { if ns, err := proc.GetHostNetNs(); err != nil { klog.Warningln(err) } else { ips, err := proc.GetNsIps(ns) _ = ns.Close() if err != nil { klog.Warningln(err) } else { hostIps = ips } } } res := map[string]map[netaddr.IPPort]struct{}{} for proxy, addrs := range c.metadata.hostListens { res[proxy] = map[netaddr.IPPort]struct{}{} for _, addr := range addrs { if addr.IP().IsUnspecified() { for _, ip := range hostIps { if addr.IP().Is4() && ip.Is4() || addr.IP().Is6() && ip.Is6() { res[proxy][netaddr.IPPortFrom(ip, addr.Port())] = struct{}{} } } } else { res[proxy][addr] = struct{}{} } } } return res } func (c *Container) ping() map[netaddr.IP]float64 { netNs := netns.None() for pid := range c.processes { if pid == agentPid { netNs = selfNetNs break } ns, err := proc.GetNetNs(pid) if err != nil { if !common.IsNotExist(err) { klog.Warningln(err) } continue } netNs = ns defer netNs.Close() break } if !netNs.IsOpen() { return nil } ips := map[netaddr.IP]struct{}{} for d := range c.connectsSuccessful { ips[d.dst.IP()] = struct{}{} } for dst := range c.connectsFailed { ips[dst.IP()] = struct{}{} } if len(ips) == 0 { return nil } targets := make([]netaddr.IP, 0, len(ips)) for ip := range ips { if ip.IsLoopback() { continue } if !ip.Is4() { // pinger doesn't support IPv6 yet continue } targets = append(targets, ip) } rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout) if err != nil { klog.Warningln(err) return nil } return rtt } func (c *Container) runLogParser(logPath string) { if *flags.DisableLogParsing { return } containerId := string(c.id) if logPath != "" { if c.logParsers[logPath] != nil { return } ch := make(chan logparser.LogEntry) parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId)) reader, err := logs.NewTailReader(proc.HostPath(logPath), ch) if err != nil { klog.Warningln(err) parser.Stop() return } klog.Infoln("started varlog logparser", "cg", c.cgroup.Id, "log", logPath) c.logParsers[logPath] = &LogParser{parser: parser, stop: reader.Stop} return } switch c.cgroup.ContainerType { case cgroup.ContainerTypeSystemdService: ch := make(chan logparser.LogEntry) if err := JournaldSubscribe(c.cgroup, ch); err != nil { klog.Warningln(err) return } parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId)) stop := func() { JournaldUnsubscribe(c.cgroup) } klog.Infoln("started journald logparser", "cg", c.cgroup.Id) c.logParsers["journald"] = &LogParser{parser: parser, stop: stop} case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd, cgroup.ContainerTypeCrio: if c.metadata.logPath == "" { return } if parser := c.logParsers["stdout/stderr"]; parser != nil { parser.Stop() delete(c.logParsers, "stdout/stderr") } ch := make(chan logparser.LogEntry) parser := logparser.NewParser(ch, c.metadata.logDecoder, logs.OtelLogEmitter(containerId)) reader, err := logs.NewTailReader(proc.HostPath(c.metadata.logPath), ch) if err != nil { klog.Warningln(err) parser.Stop() return } klog.Infoln("started container logparser", "cg", c.cgroup.Id) c.logParsers["stdout/stderr"] = &LogParser{parser: parser, stop: reader.Stop} } } func (c *Container) gc(now time.Time) { // c.lock.Lock() // defer c.lock.Unlock() established := map[AddrPair]struct{}{} establishedDst := map[netaddr.IPPort]struct{}{} listens := map[netaddr.IPPort]string{} seenNamespaces := map[string]bool{} fdMap := map[uint64]struct{}{} for _, p := range c.processes { if seenNamespaces[p.NetNsId()] { continue } sockets, err := proc.GetSockets(p.Pid) if err != nil { continue } fds, err := proc.ReadFds(p.Pid) if err == nil { for _, fd := range fds { fdMap[fd.Fd] = struct{}{} } } for _, s := range sockets { if s.Listen { listens[s.SAddr] = s.Inode } else { established[AddrPair{src: s.SAddr, dst: s.DAddr}] = struct{}{} establishedDst[s.DAddr] = struct{}{} } } seenNamespaces[p.NetNsId()] = true } c.revalidateListens(now, listens) for srcDst, conn := range c.connectionsActive { pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd} if _, ok := established[srcDst]; !ok { delete(c.connectionsActive, srcDst) if conn == c.connectionsByPidFd[pidFd] { delete(c.connectionsByPidFd, pidFd) } continue } if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval { delete(c.connectionsActive, srcDst) if conn == c.connectionsByPidFd[pidFd] { delete(c.connectionsByPidFd, pidFd) } } } for srcDst, conn := range c.acceptsActive { pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd} if _, ok := established[srcDst]; !ok { delete(c.acceptsActive, srcDst) if conn == c.acceptsByPidFd[pidFd] { delete(c.acceptsByPidFd, pidFd) } continue } if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval { delete(c.acceptsActive, srcDst) if conn == c.acceptsByPidFd[pidFd] { delete(c.acceptsByPidFd, pidFd) } } } for _, conn := range c.connectionsByPidFd { if _, ok := fdMap[conn.Fd]; !ok { delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd}) } } for _, conn := range c.acceptsByPidFd { if _, ok := fdMap[conn.Fd]; !ok { delete(c.acceptsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd}) } } for dst, at := range c.connectLastAttempt { _, active := establishedDst[dst] if !active && !at.IsZero() && now.Sub(at) > gcInterval { delete(c.connectLastAttempt, dst) delete(c.connectsFailed, dst) for d := range c.connectsSuccessful { if d.src == dst { delete(c.connectsSuccessful, d) } } c.l7Stats.delete(dst) } } for dst, at := range c.acceptLastAttempt { _, active := establishedDst[dst] if !active && !at.IsZero() && now.Sub(at) > gcInterval { delete(c.acceptLastAttempt, dst) for d := range c.acceptsSuccessful { if d.src == dst { delete(c.acceptsSuccessful, d) } } c.l7Stats.delete(dst) } } } func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.IPPort]string) { for addr, byPid := range c.listens { if _, open := actualListens[addr]; open { continue } klog.Warningln("deleting the outdated listen:", addr) for _, details := range byPid { if details.ClosedAt.IsZero() { details.ClosedAt = now } } } missingListens := map[netaddr.IPPort]string{} for addr, inode := range actualListens { byPids, found := c.listens[addr] if !found { missingListens[addr] = inode continue } open := false for _, details := range byPids { if details.ClosedAt.IsZero() { open = true break } } if !open { missingListens[addr] = inode } } if len(missingListens) > 0 { inodeToPid := map[string]uint32{} for pid := range c.processes { fds, err := proc.ReadFds(pid) if err != nil { klog.Warningln(err) continue } for _, fd := range fds { if fd.SocketInode != "" { inodeToPid[fd.SocketInode] = pid } } } for addr, inode := range missingListens { pid, found := inodeToPid[inode] if !found { continue } //klog.Warningln("missing listen found:", addr, pid) c.onListenOpen(pid, addr, true) } } for addr, pids := range c.listens { for pid, details := range pids { if !details.ClosedAt.IsZero() && now.Sub(details.ClosedAt) > gcInterval { delete(c.listens[addr], pid) } } if len(c.listens[addr]) == 0 { delete(c.listens, addr) } } } func (c *Container) attachUprobes(tracer *ebpftracer.Tracer, pid uint32) error { klog.Infoln("[attach] attachUprobes start") if tracer.DisableL7Tracing() { return nil } codeType := c.GetCodeTypeFromCache(pid) if codeType.IsUnknownCode() { return nil } var err error switch codeType { case CodeTypeJava: err = c.attachJVMUprobes(tracer, pid) case CodeTypeJavaAot: err = c.attachJavaAotUprobes(tracer, pid) case CodeTypeGo: err = c.attachTlsUprobes(tracer, pid) case CodeTypeNetCoreAot: err = c.attachNetCoreUprobes(tracer, pid) } if err != nil { c.DetachUprobes(pid, ERROR_DETACH) return err } c.l7AttachSuccess() return nil } func (c *Container) GetCodeTypeFromCache(pid uint32) CodeType { p := c.processes[pid] if p == nil { return CodeTypeUnknown } if p.codeType.IsWaitCheck() { p.codeType = GetExeType(pid, c.getRootfs()) } return p.codeType } func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) error { p := c.processes[pid] if p == nil { return nil } if !p.openSslUprobesChecked { sslProbes, err := tracer.AttachOpenSslUprobes(pid) if err != nil { return err } p.uprobes = append(p.uprobes, sslProbes...) p.openSslUprobesChecked = true } if !p.goTlsUprobesChecked { codeType := c.GetCodeTypeFromCache(pid) goProbes, err := tracer.AttachGoTlsUprobes(pid, &c.AppInfo, uint16(codeType)) if err != nil { return err } p.uprobes = append(p.uprobes, goProbes...) p.goTlsUprobesChecked = true } return nil } func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) error { if common.IsOpenFilter() && !common.IsFilterPid(pid) { return nil } p := c.processes[pid] if p == nil { return nil } codeType := c.GetCodeTypeFromCache(pid) if !p.jvmAttachOnce { p.jvmAttachOnce = true rootfs := c.getRootfs() tracer.InitKProcInfo(pid, &c.AppInfo) // TODO java Aot if codeType.IsJvmCode() { // check version libjavaso, err := utils.GetSoPath(pid, "libjava.so", rootfs) if err != nil { klog.WithError(err).Errorf("[attach] Failed get so path") return err } v, err := ebpftracer.GetJvmVersion(libjavaso) if err != nil { klog.WithError(err).Errorf("[attach] Failed get Java version") return err } c.AppInfo.Version = v major, minor, patch, err := ebpftracer.ParseVersion(v) klog.Infof("[attach] version: %s (Major: %d, Minor: %d, Patch: %d)", v, major, minor, patch) if major != 1 || minor != 8 { return fmt.Errorf("[attach] Unsupported Java version") } } libNioProbes, err := tracer.AttachJavaNioReadUprobes(pid, codeType, rootfs) if err != nil { klog.Error(err) return err } p.uprobes = append(p.uprobes, libNioProbes...) libNetProbes, err := tracer.AttachJavaNetWriteUprobes(pid, rootfs) if err != nil { klog.Error(err) return err } p.uprobes = append(p.uprobes, libNetProbes...) //p.jvmUprobesChecked = true } else { klog.Infof("[attach] %s-%d already attach", codeType.String(), pid) } return nil } func (c *Container) attachJavaAotUprobes(tracer *ebpftracer.Tracer, pid uint32) error { if common.IsOpenFilter() && !common.IsFilterPid(pid) { return nil } p := c.processes[pid] if p == nil { return nil } codeType := c.GetCodeTypeFromCache(pid) if !p.jvmUprobesChecked { p.jvmUprobesChecked = true rootfs := c.getRootfs() tracer.InitKProcInfo(pid, &c.AppInfo) // // TODO java Aot // if codeType.IsJavaAotCode() { // // check version // libjavaso, err := utils.GetSoPath(pid, "libjava.so", rootfs) // if err != nil { // klog.WithError(err).Errorf("[attach] Failed get so path") // return err // } // v, err := ebpftracer.GetJvmVersion(libjavaso) // if err != nil { // klog.WithError(err).Errorf("[attach] Failed get Java version") // return err // } // c.AppInfo.Version = v // major, minor, patch, err := ebpftracer.ParseVersion(v) // klog.Infof("[attach] version: %s (Major: %d, Minor: %d, Patch: %d)", v, major, minor, patch) // if major != 1 || minor != 8 { // return fmt.Errorf("[attach] Unsupported Java version") // } // } libNioProbes, err := tracer.AttachJavaAotNioReadUprobes(pid, codeType, rootfs) if err != nil { klog.Error(err) return err } p.uprobes = append(p.uprobes, libNioProbes...) libNetProbes, err := tracer.AttachJavaAotNetWriteUprobes(pid, rootfs) if err != nil { klog.Error(err) return err } p.uprobes = append(p.uprobes, libNetProbes...) //p.jvmUprobesChecked = true } else { klog.Infof("[attach] %s-%d already attach", codeType.String(), pid) } return nil } func (c *Container) errorClose(pid uint32, closeType int) { p := c.processes[pid] if p != nil { p.DynamicClose(closeType) } } func (c *Container) attachNetCoreUprobes(tracer *ebpftracer.Tracer, pid uint32) error { if common.IsOpenFilter() && !common.IsFilterPid(pid) { return nil } p := c.processes[pid] if p == nil { return nil } if !p.jvmAttachOnce { p.jvmAttachOnce = true //codeType := c.GetCodeTypeFromCache(pid) tracer.InitKProcInfo(pid, &c.AppInfo) p.uprobes = append(p.uprobes, tracer.AttachNetCoreNetThreadUprobes(pid)...) readProbes, err := tracer.AttachNetCoreNetReadUprobes(pid) if err != nil { klog.Error(err) return err } p.uprobes = append(p.uprobes, readProbes...) WriteProbes, err := tracer.AttachNetCoreNetWriteUprobes(pid) if err != nil { klog.Error(err) return err } p.uprobes = append(p.uprobes, WriteProbes...) } return nil } func resolveFd(pid uint32, fd uint64) (mntId string, logPath string) { info := proc.GetFdInfo(pid, fd) if info == nil { return } switch { case info.Flags&os.O_WRONLY == 0 && info.Flags&os.O_RDWR == 0, !strings.HasPrefix(info.Dest, "/"), strings.HasPrefix(info.Dest, "/proc/"), strings.HasPrefix(info.Dest, "/dev/"), strings.HasPrefix(info.Dest, "/sys/"), strings.HasSuffix(info.Dest, "(deleted)"): return } mntId = info.MntId if info.Flags&os.O_WRONLY != 0 && strings.HasPrefix(info.Dest, "/var/log/") && !strings.HasPrefix(info.Dest, "/var/log/pods/") && !strings.HasPrefix(info.Dest, "/var/log/containers/") && !strings.HasPrefix(info.Dest, "/var/log/journal/") { logPath = info.Dest } return } func counter(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric { return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labelValues...) } func gauge(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric { return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...) }