package containers import ( debugelf "debug/elf" "fmt" "os" "sort" "strconv" "strings" "sync" "time" "github.com/coroot/coroot-node-agent/utils" . "github.com/coroot/coroot-node-agent/utils/modelse" "github.com/coroot/coroot-node-agent/cgroup" "github.com/coroot/coroot-node-agent/common" "github.com/coroot/coroot-node-agent/ebpftracer" "github.com/coroot/coroot-node-agent/ebpftracer/l7" "github.com/coroot/coroot-node-agent/ebpftracer/tracer" "github.com/coroot/coroot-node-agent/flags" "github.com/coroot/coroot-node-agent/logs" "github.com/coroot/coroot-node-agent/node" "github.com/coroot/coroot-node-agent/pinger" "github.com/coroot/coroot-node-agent/proc" "github.com/coroot/coroot-node-agent/tracing" "github.com/coroot/logparser" "github.com/prometheus/client_golang/prometheus" klog "github.com/sirupsen/logrus" "github.com/vishvananda/netns" "golang.org/x/exp/maps" "inet.af/netaddr" ) var ( gcInterval = 1 * time.Minute AllAppInfoInterval = 1 * time.Minute RegisterAppInterval = 1 * time.Minute RegisterHostInterval = 10 * time.Minute pingTimeout = 300 * time.Millisecond ) type ContainerID string type ContainerNetwork struct { NetworkID string } type ContainerMetadata struct { name string labels map[string]string volumes map[string]string logPath string image string logDecoder logparser.Decoder hostListens map[string][]netaddr.IPPort networks map[string]ContainerNetwork env map[string]string systemdTriggeredBy string rootfs string } type Delays struct { cpu time.Duration disk time.Duration } type LogParser struct { parser *logparser.Parser stop func() } func (p *LogParser) Stop() { if p.stop != nil { p.stop() } p.parser.Stop() } type AddrPair struct { src netaddr.IPPort dst netaddr.IPPort } type ActiveConnection struct { Dest netaddr.IPPort Src netaddr.IPPort ActualDest netaddr.IPPort Pid uint32 Fd uint64 Timestamp uint64 Closed time.Time Retransmissions uint64 BytesSent uint64 PerBytesSent uint64 BytesReceived uint64 PerBytesReceived uint64 ConEstTime time.Duration FirstReadTime uint64 FirstWriteTime uint64 NewReadTime uint64 http2Parser *l7.Http2Parser postgresParser *l7.PostgresParser mysqlParser *l7.MysqlParser dmParser *l7.DmParser } type ActiveAccept struct { Dest netaddr.IPPort Src netaddr.IPPort Pid uint32 Fd uint64 Timestamp uint64 Closed time.Time BytesSent uint64 BytesReceived uint64 } type ListenDetails struct { ClosedAt time.Time NsIPs []netaddr.IP } type PidFd struct { Pid uint32 Fd uint64 } type K8sContainer struct { ns string podName string podId string workload string containerName string pid string } type ConnectionStats struct { Count uint64 TotalTime time.Duration Retransmissions uint64 BytesSent uint64 PerBytesSent uint64 BytesReceived uint64 PerBytesReceived uint64 Src netaddr.IPPort ConEstTime time.Duration FirstReadTime uint64 FirstWriteTime uint64 NewReadTime uint64 } type AcceptStats struct { BytesSent uint64 BytesReceived uint64 } type Container struct { id ContainerID cgroup *cgroup.Cgroup metadata *ContainerMetadata K8sContainer processes map[uint32]*Process startedAt time.Time zombieAt time.Time restarts int delays Delays delaysByPid map[uint32]Delays delaysLock sync.Mutex listens map[netaddr.IPPort]map[uint32]*ListenDetails connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count // connectsFailed map[netaddr.IPPort]int64 // dst -> count connectsFailed map[AddrPair]int64 connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time connectionsActive map[AddrPair]*ActiveConnection connectionsByPidFd map[PidFd]*ActiveConnection acceptsSuccessful map[AddrPair]*AcceptStats acceptLastAttempt map[netaddr.IPPort]time.Time // dst -> time acceptsActive map[AddrPair]*ActiveAccept acceptsByPidFd map[PidFd]*ActiveAccept l7Stats L7Stats dnsStats *L7Metrics oomKills int pythonThreadLockWaitTime time.Duration mounts map[string]proc.MountInfo logParsers map[string]*LogParser hostConntrack *Conntrack nsConntrack *Conntrack lbConntracks []*Conntrack registry *Registry lock sync.RWMutex done chan struct{} traceMap map[uint64]*tracing.Trace //instanceID utils.ID Symbols []debugelf.Symbol Uprobes []tracer.Uprobe UprobesMap map[string]tracer.Uprobe l7EventReady bool l7Attach bool // 白名单详情 WhiteSettingInfo WhiteSettingInfo // 应用详情 AppInfo AppInfo RegTime int } func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) { netNs, err := proc.GetNetNs(pid) if err != nil { return nil, err } defer netNs.Close() c := &Container{ id: id, cgroup: cg, metadata: md, processes: map[uint32]*Process{}, delaysByPid: map[uint32]Delays{}, listens: map[netaddr.IPPort]map[uint32]*ListenDetails{}, connectsSuccessful: map[AddrPair]*ConnectionStats{}, // connectsFailed: map[netaddr.IPPort]int64{}, connectsFailed: map[AddrPair]int64{}, connectLastAttempt: map[netaddr.IPPort]time.Time{}, connectionsActive: map[AddrPair]*ActiveConnection{}, connectionsByPidFd: map[PidFd]*ActiveConnection{}, acceptsSuccessful: map[AddrPair]*AcceptStats{}, acceptLastAttempt: map[netaddr.IPPort]time.Time{}, acceptsActive: map[AddrPair]*ActiveAccept{}, acceptsByPidFd: map[PidFd]*ActiveAccept{}, l7Stats: L7Stats{}, dnsStats: &L7Metrics{}, mounts: map[string]proc.MountInfo{}, logParsers: map[string]*LogParser{}, hostConntrack: hostConntrack, done: make(chan struct{}), traceMap: make(map[uint64]*tracing.Trace), registry: registry, } for _, n := range md.networks { if nsHandle := FindNetworkLoadBalancerNs(n.NetworkID); nsHandle.IsOpen() { if ct, err := NewConntrack(nsHandle); err != nil { klog.Warningln(err) } else { c.lbConntracks = append(c.lbConntracks, ct) } _ = nsHandle.Close() } } c.runLogParser("") // go func() { // ticker := time.NewTicker(gcInterval) // defer ticker.Stop() // for { // select { // case <-c.done: // return // case t := <-ticker.C: // c.gc(t) // } // } // }() return c, nil } func (c *Container) Close() { for _, p := range c.logParsers { p.Stop() } for _, ct := range c.lbConntracks { _ = ct.Close() } if c.nsConntrack != nil { _ = c.nsConntrack.Close() } close(c.done) } func (c *Container) Dead(now time.Time) bool { return !c.zombieAt.IsZero() && now.Sub(c.zombieAt) > gcInterval } func (c *Container) Describe(ch chan<- *prometheus.Desc) { // some fixed metric description is required here to register/unregister the collector correctly ch <- prometheus.NewDesc("container", "", nil, nil) } func (c *Container) Collect(ch chan<- prometheus.Metric) { c.registry.updateTrafficStatsIfNecessary() c.lock.RLock() defer c.lock.RUnlock() if c.metadata.image != "" || c.metadata.systemdTriggeredBy != "" { ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image, c.metadata.systemdTriggeredBy) } ch <- counter(metrics.Restarts, float64(c.restarts)) if cpu, err := c.cgroup.CpuStat(); err == nil { if cpu.LimitCores > 0 { ch <- gauge(metrics.CPULimit, cpu.LimitCores) } ch <- counter(metrics.CPUUsage, cpu.UsageSeconds) ch <- counter(metrics.ThrottledTime, cpu.ThrottledTimeSeconds) } if taskstatsClient != nil { c.updateDelays() ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second)) ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second)) } if s, err := c.cgroup.MemoryStat(); err == nil { ch <- gauge(metrics.MemoryRss, float64(s.RSS)) ch <- gauge(metrics.MemoryCache, float64(s.Cache)) if s.Limit > 0 { ch <- gauge(metrics.MemoryLimit, float64(s.Limit)) } } if c.oomKills > 0 { ch <- counter(metrics.OOMKills, float64(c.oomKills)) } if disks, err := node.GetDisks(); err == nil { ioStat, _ := c.cgroup.IOStat() for majorMinor, mounts := range c.getMounts() { dev := disks.GetParentBlockDevice(majorMinor) if dev == nil { continue } for mountPoint, fsStat := range mounts { dls := []string{mountPoint, dev.Name, c.metadata.volumes[mountPoint]} ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...) ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...) ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...) if io, ok := ioStat[majorMinor]; ok { ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...) ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...) ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...) ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...) } } } } for addr, open := range c.getListens() { ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "") } for proxy, addrs := range c.getProxiedListens() { for addr := range addrs { ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy) } } strInstanceID := strconv.FormatInt(c.AppInfo.InstanceIdHash.IntVal, 10) strAppId := strconv.FormatInt(c.AppInfo.AppIdHash.IntVal, 10) strAppName := c.AppInfo.AppName apps := c.registry.GetAllAppInfoFromServer() for d, stats := range c.connectsSuccessful { targetAppId := apps[d.dst.String()] ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String()) ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String()) stats.Count = 0 // if stats.Retransmissions > 0 { // ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String()) // } // ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String()) // ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String()) // ch <- counter(metrics.NetBytesSentPer, float64(stats.PerBytesSent), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String()) // ch <- counter(metrics.NetBytesReceivedPer, float64(stats.PerBytesReceived), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String()) // ch <- counter(metrics.NetDataLatency, float64(stats.FirstReadTime-stats.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String()) // ch <- counter(metrics.NetDataDuration, float64(stats.NewReadTime-stats.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String()) // ch <- counter(metrics.NetEstTime, float64(stats.ConEstTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String()) // klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime) // stats.PerBytesReceived = 0 // stats.PerBytesSent = 0 } // for d, stats := range c.acceptsSuccessful { // ch <- counter(metrics.NetAcceptsSuccessful, float64(0), d.src.String(), d.dst.String()) // ch <- counter(metrics.NetAcceptBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String()) // ch <- counter(metrics.NetAcceptBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String()) // klog.Infof("c.acceptsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d", d.src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived) // } // for dst, count := range c.connectsFailed { // targetAppId := apps[dst.String()] // ch <- counter(metrics.NetConnectionsFailed, float64(count), strInstanceID, strAppId, targetAppId, strAppName, dst.String()) // } for addrPair, count := range c.connectsFailed { targetAppId := apps[addrPair.dst.String()] ch <- counter(metrics.NetConnectionsFailed, float64(count), strInstanceID, strAppId, targetAppId, strAppName, addrPair.src.String(), addrPair.dst.String()) c.connectsFailed[addrPair] = 0 } connections := map[AddrPair]int{} for addrPair, conn := range c.connectionsActive { targetAppId := apps[conn.Dest.String()] if conn.Retransmissions > 0 { ch <- counter(metrics.NetRetransmits, float64(conn.Retransmissions), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String()) conn.Retransmissions = 0 } ch <- counter(metrics.NetBytesSent, float64(conn.BytesSent), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String()) ch <- counter(metrics.NetBytesReceived, float64(conn.BytesReceived), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String()) ch <- counter(metrics.NetBytesSentPer, float64(conn.PerBytesSent), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String()) ch <- counter(metrics.NetBytesReceivedPer, float64(conn.PerBytesReceived), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String()) ch <- counter(metrics.NetDataLatency, float64(conn.FirstReadTime-conn.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String()) ch <- counter(metrics.NetDataDuration, float64(conn.NewReadTime-conn.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String()) ch <- counter(metrics.NetEstTime, float64(conn.ConEstTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String()) // klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime) conn.PerBytesReceived = 0 conn.PerBytesSent = 0 if !conn.Closed.IsZero() { continue } connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest}]++ } for d, count := range connections { ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String()) } for source, p := range c.logParsers { for _, c := range p.parser.GetCounters() { ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample) } } appTypes := map[string]struct{}{} seenJvms := map[string]bool{} seenDotNetApps := map[string]bool{} pids := maps.Keys(c.processes) sort.Slice(pids, func(i, j int) bool { return pids[i] < pids[j] }) for _, pid := range pids { process := c.processes[pid] cmdline := proc.GetCmdline(pid) if len(cmdline) == 0 { continue } appType := guessApplicationType(cmdline) if appType != "" { appTypes[appType] = struct{}{} } if process.isGolangApp { appTypes["golang"] = struct{}{} } switch { case isJvm(cmdline): jvm, jMetrics := jvmMetrics(pid) if len(jMetrics) > 0 && !seenJvms[jvm] { seenJvms[jvm] = true for _, m := range jMetrics { ch <- m } } case process.dotNetMonitor != nil: appTypes["dotnet"] = struct{}{} appName := process.dotNetMonitor.AppName() if !seenDotNetApps[appName] { seenDotNetApps[appName] = true process.dotNetMonitor.Collect(ch) } } } for appType := range appTypes { ch <- gauge(metrics.ApplicationType, 1, appType) } if c.pythonThreadLockWaitTime > 0 { ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds()) } if c.dnsStats.Requests != nil { c.dnsStats.Requests.Collect(ch) } if c.dnsStats.Latency != nil { c.dnsStats.Latency.Collect(ch) } c.l7Stats.collect(ch) if !*flags.DisablePinger { for ip, rtt := range c.ping() { ch <- gauge(metrics.NetLatency, rtt, ip.String()) } } c.gc(time.Now()) } func (c *Container) onProcessStart(pid uint32) *Process { c.lock.Lock() defer c.lock.Unlock() stats, err := TaskstatsPID(pid) if err != nil { klog.WithError(err).Debugf("Failed onProcessStart [%d]", pid) return nil } c.zombieAt = time.Time{} p := NewProcess(pid, stats, c.registry.tracer) if p == nil { return nil } c.processes[pid] = p if c.startedAt.IsZero() { c.startedAt = stats.BeginTime } else { min := stats.BeginTime for _, p := range c.processes { if p.StartedAt.Before(min) { min = p.StartedAt } } if min.After(c.startedAt) { c.restarts++ c.startedAt = min } } return p } func (c *Container) onProcessExit(pid uint32, oomKill bool) { c.lock.Lock() defer c.lock.Unlock() if p := c.processes[pid]; p != nil { p.Close() } delete(c.processes, pid) if len(c.processes) == 0 { c.zombieAt = time.Now() } delete(c.delaysByPid, pid) if oomKill { c.oomKills++ } } func (c *Container) onFileOpen(pid uint32, fd uint64) { mntId, logPath := resolveFd(pid, fd) func() { if mntId == "" { return } c.lock.Lock() _, ok := c.mounts[mntId] c.lock.Unlock() if ok { return } byMountId := proc.GetMountInfo(pid) if byMountId == nil { return } if mi, ok := byMountId[mntId]; ok { c.lock.Lock() c.mounts[mntId] = mi c.lock.Unlock() } }() if logPath != "" { c.lock.Lock() c.runLogParser(logPath) c.lock.Unlock() } } // set func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) { klog.Infof("TCP listen open pid=%d id=%s addr=%s", pid, c.id, addr) if common.PortFilter.ShouldBeSkipped(addr.Port()) { return } if !safe { c.lock.Lock() defer c.lock.Unlock() } if _, ok := c.listens[addr]; !ok { c.listens[addr] = map[uint32]*ListenDetails{} } details := &ListenDetails{} c.listens[addr][pid] = details if addr.IP().IsUnspecified() { ns, err := proc.GetNetNs(pid) if err != nil { if !common.IsNotExist(err) { klog.Warningln(err) } return } defer ns.Close() ips, err := proc.GetNsIps(ns) if err != nil { klog.Warningln(err) return } //klog.Infof("got IPs %s for %s", ips, ns.UniqueId()) details.NsIPs = ips } } func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) { klog.Infof("TCP listen close pid=%d id=%s addr=%s", pid, c.id, addr) c.lock.Lock() defer c.lock.Unlock() if _, byAddr := c.listens[addr]; byAddr { if _, byPid := c.listens[addr][pid]; byPid { if details := c.listens[addr][pid]; details != nil { details.ClosedAt = time.Now() } } } } func (c *Container) onAcceptOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) { //klog.Debugf("accept pid=%d id=%s dstaddr=%s srcaddr=%s", pid, c.id, dst.IP(), src.IP()) // if common.PortFilter.ShouldBeSkipped(dst.Port()) { // return // } p := c.processes[pid] if p == nil { return } if dst.IP().IsLoopback() && !p.isHostNs() { return } // actualDst, err := c.getActualDestination(p, src, dst) // if err != nil { // if !common.IsNotExist(err) { // klog.Warningf("cannot open NetNs for pid %d: %s", pid, err) // } // return // } // switch { // case actualDst == nil: // actualDst = &dst // case actualDst.IP().IsLoopback() && !p.isHostNs(): // return // } // if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) { // return // } c.lock.Lock() defer c.lock.Unlock() if !failed { key := AddrPair{src: dst, dst: src} stats := c.acceptsSuccessful[key] if stats == nil { stats = &AcceptStats{} c.acceptsSuccessful[key] = stats } acceptCon := &ActiveAccept{ Dest: src, Src: dst, Pid: pid, Fd: fd, Timestamp: timestamp, } c.acceptsActive[AddrPair{src: dst, dst: src}] = acceptCon c.acceptsByPidFd[PidFd{Pid: pid, Fd: fd}] = acceptCon } c.acceptLastAttempt[dst] = time.Now() } func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) { if common.PortFilter.ShouldBeSkipped(dst.Port()) { return } p := c.processes[pid] if p == nil { return } if dst.IP().IsLoopback() && !p.isHostNs() { return } actualDst, err := c.getActualDestination(p, src, dst) if err != nil { if !common.IsNotExist(err) { klog.Warningf("cannot open NetNs for pid %d: %s", pid, err) } return } switch { case actualDst == nil: actualDst = &dst case actualDst.IP().IsLoopback() && !p.isHostNs(): return } if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) { return } c.lock.Lock() defer c.lock.Unlock() if failed { key := AddrPair{src: dst, dst: *actualDst} c.connectsFailed[key]++ } else { key := AddrPair{src: dst, dst: *actualDst} stats := c.connectsSuccessful[key] if stats == nil { stats = &ConnectionStats{} c.connectsSuccessful[key] = stats } stats.Count++ stats.TotalTime += duration stats.Src = src stats.ConEstTime = duration connection := &ActiveConnection{ Dest: dst, Src: src, ActualDest: *actualDst, Pid: pid, Fd: fd, Timestamp: timestamp, ConEstTime: duration, } c.connectionsActive[AddrPair{src: src, dst: dst}] = connection c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection } c.connectLastAttempt[dst] = time.Now() } func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) { if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil { return actualDst, nil } for _, lb := range c.lbConntracks { if actualDst := lb.GetActualDestination(src, dst); actualDst != nil { return actualDst, nil } } actualDst := c.hostConntrack.GetActualDestination(src, dst) if actualDst != nil { return actualDst, nil } if !p.isHostNs() { if c.nsConntrack == nil { netNs, err := proc.GetNetNs(p.Pid) if err != nil { return nil, err } defer netNs.Close() c.nsConntrack, err = NewConntrack(netNs) if err != nil { return nil, err } } return c.nsConntrack.GetActualDestination(src, dst), nil } return nil, nil } func (c *Container) onConnectionClose(e ebpftracer.Event) { c.lock.Lock() conn := c.connectionsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}] c.lock.Unlock() if conn != nil { if conn.Closed.IsZero() { if e.TrafficStats != nil { c.lock.Lock() c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived, e.FirstReadTime, e.FirstWriteTime, e.NewReadTime) c.lock.Unlock() } conn.Closed = time.Now() } } } func (c *Container) onAcceptClose(e ebpftracer.Event) { c.lock.Lock() conn := c.acceptsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}] c.lock.Unlock() if conn != nil { if conn.Closed.IsZero() { if e.TrafficStats != nil { c.lock.Lock() c.updateAcceptTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived) c.lock.Unlock() } conn.Closed = time.Now() } } } func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) { if u == nil { return } c.lock.Lock() defer c.lock.Unlock() c.updateConnectionTrafficStats(c.connectionsByPidFd[PidFd{Pid: u.Pid, Fd: u.FD}], u.BytesSent, u.BytesReceived, 0, 0, 0) } func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, received, firstreadtime, firstwritetime, newreadtime uint64) { if ac == nil { return } key := AddrPair{src: ac.Dest, dst: ac.ActualDest} stats := c.connectsSuccessful[key] if stats == nil { stats = &ConnectionStats{} c.connectsSuccessful[key] = stats } if sent > ac.BytesSent { stats.BytesSent += sent - ac.BytesSent stats.PerBytesSent = sent - ac.BytesSent ac.PerBytesSent = sent - ac.BytesSent } if received > ac.BytesReceived { stats.BytesReceived += received - ac.BytesReceived stats.PerBytesReceived = received - ac.BytesReceived ac.PerBytesReceived = received - ac.BytesReceived } if firstreadtime != 0 && firstwritetime != 0 && newreadtime != 0 { stats.FirstReadTime = firstreadtime stats.FirstWriteTime = firstwritetime stats.NewReadTime = newreadtime ac.FirstReadTime = firstreadtime ac.FirstWriteTime = firstwritetime ac.NewReadTime = newreadtime } ac.BytesSent = sent ac.BytesReceived = received } func (c *Container) updateAcceptTrafficStats(ac *ActiveAccept, sent, received uint64) { if ac == nil { return } //klog.Infoln("TCP onConnectionClose5", ac.BytesSent, ac.BytesReceived, ac) key := AddrPair{src: ac.Src, dst: ac.Dest} stats := c.acceptsSuccessful[key] if stats == nil { stats = &AcceptStats{} c.acceptsSuccessful[key] = stats } if sent > ac.BytesSent { stats.BytesSent += sent - ac.BytesSent } if received > ac.BytesReceived { stats.BytesReceived += received - ac.BytesReceived } ac.BytesSent = sent ac.BytesReceived = received } func (c *Container) onDNSRequest(r *l7.RequestData) (map[netaddr.IP]string, string, string, uint32, []netaddr.IP) { status := r.Status.DNS() if status == "" { return nil, "", "", 0, nil } t, fqdn, ttl, ips := l7.ParseDns(r.Payload) if t == "" { return nil, "", "", 0, nil } if c.dnsStats.Requests == nil { dnsReq := L7Requests[l7.ProtocolDNS] c.dnsStats.Requests = prometheus.NewCounterVec( prometheus.CounterOpts{Name: dnsReq.Name, Help: dnsReq.Help}, []string{"request_type", "domain", "status"}, ) } if m, _ := c.dnsStats.Requests.GetMetricWithLabelValues(t, fqdn, status); m != nil { m.Inc() } if r.Duration != 0 { if c.dnsStats.Latency == nil { dnsLatency := L7Latency[l7.ProtocolDNS] c.dnsStats.Latency = prometheus.NewHistogram(prometheus.HistogramOpts{Name: dnsLatency.Name, Help: dnsLatency.Help}) } c.dnsStats.Latency.Observe(r.Duration.Seconds()) } ip2fqdn := map[netaddr.IP]string{} if fqdn != "" { for _, ip := range ips { ip2fqdn[ip] = fqdn } } return ip2fqdn, t, fqdn, ttl, ips } func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string { c.lock.Lock() defer c.lock.Unlock() if r.Protocol == l7.ProtocolDNS { //return c.onDNSRequest(r) } conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] if conn == nil { return nil } if timestamp != 0 && conn.Timestamp != timestamp { return nil } stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest) trace := tracing.NewTrace(string(c.id), conn.ActualDest) switch r.Protocol { case l7.ProtocolHTTP: stats.observe(r.Status.Http(), "", r.Duration) method, path := l7.ParseHttp(r.Payload) trace.HttpRequest(method, path, r.Status, r.Duration) case l7.ProtocolHTTP2: if conn.http2Parser == nil { conn.http2Parser = l7.NewHttp2Parser() } requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration)) for _, req := range requests { stats.observe(req.Status.Http(), "", req.Duration) trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration) } case l7.ProtocolPostgres: if r.Method != l7.MethodStatementClose { stats.observe(r.Status.String(), "", r.Duration) } if conn.postgresParser == nil { conn.postgresParser = l7.NewPostgresParser() } query := conn.postgresParser.Parse(r.Payload) trace.PostgresQuery(query, r.Status.Error(), r.Duration) case l7.ProtocolMysql: if r.Method != l7.MethodStatementClose { stats.observe(r.Status.String(), "", r.Duration) } if conn.mysqlParser == nil { conn.mysqlParser = l7.NewMysqlParser() } query := conn.mysqlParser.Parse(r.Payload, r.StatementId) trace.MysqlQuery(query, r.Status.Error(), r.Duration) case l7.ProtocolMemcached: stats.observe(r.Status.String(), "", r.Duration) cmd, items := l7.ParseMemcached(r.Payload) trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration) case l7.ProtocolRedis: stats.observe(r.Status.String(), "", r.Duration) cmd, args := l7.ParseRedis(r.Payload) trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration) case l7.ProtocolMongo: stats.observe(r.Status.String(), "", r.Duration) query := l7.ParseMongo(r.Payload) trace.MongoQuery(query, r.Status.Error(), r.Duration) case l7.ProtocolKafka, l7.ProtocolCassandra: stats.observe(r.Status.String(), "", r.Duration) case l7.ProtocolRabbitmq, l7.ProtocolNats: stats.observe(r.Status.String(), r.Method.String(), 0) case l7.ProtocolDubbo2: stats.observe(r.Status.String(), "", r.Duration) } return nil } func (c *Container) onRetransmission(srcDst AddrPair) bool { c.lock.Lock() defer c.lock.Unlock() conn, ok := c.connectionsActive[srcDst] if !ok { return false } key := AddrPair{src: srcDst.dst, dst: conn.ActualDest} stats := c.connectsSuccessful[key] if stats == nil { stats = &ConnectionStats{} c.connectsSuccessful[key] = stats } stats.Retransmissions++ conn.Retransmissions++ return true } func (c *Container) updateDelays() { c.delaysLock.Lock() defer c.delaysLock.Unlock() for pid := range c.processes { stats, err := TaskstatsTGID(pid) if err != nil { continue } d := c.delaysByPid[pid] c.delays.cpu += stats.CPUDelay - d.cpu c.delays.disk += stats.BlockIODelay - d.disk d.cpu = stats.CPUDelay d.disk = stats.BlockIODelay c.delaysByPid[pid] = d } } func (c *Container) getMounts() map[string]map[string]*proc.FSStat { if len(c.mounts) == 0 { return nil } res := map[string]map[string]*proc.FSStat{} for _, mi := range c.mounts { var stat *proc.FSStat for pid := range c.processes { s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint)) if err == nil { stat = &s break } } if stat == nil { continue } if _, ok := res[mi.MajorMinor]; !ok { res[mi.MajorMinor] = map[string]*proc.FSStat{} } res[mi.MajorMinor][mi.MountPoint] = stat } return res } func (c *Container) getListens() map[netaddr.IPPort]int { res := map[netaddr.IPPort]int{} for addr, byPid := range c.listens { open := 0 isHostNs := false ips := map[netaddr.IP]bool{} for pid, details := range byPid { p := c.processes[pid] if p == nil { continue } if p.isHostNs() { isHostNs = true } if details.ClosedAt.IsZero() { open = 1 } for _, ip := range details.NsIPs { ips[ip] = true } } if !addr.IP().IsUnspecified() { ips = map[netaddr.IP]bool{addr.IP(): true} } for ip := range ips { if ip.IsLoopback() && !isHostNs { continue } res[netaddr.IPPortFrom(ip, addr.Port())] = open } } return res } func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} { if len(c.metadata.hostListens) == 0 { return nil } hasUnspecified := false for _, addrs := range c.metadata.hostListens { for _, addr := range addrs { if addr.IP().IsUnspecified() { hasUnspecified = true break } } } var hostIps []netaddr.IP if hasUnspecified { if ns, err := proc.GetHostNetNs(); err != nil { klog.Warningln(err) } else { ips, err := proc.GetNsIps(ns) _ = ns.Close() if err != nil { klog.Warningln(err) } else { hostIps = ips } } } res := map[string]map[netaddr.IPPort]struct{}{} for proxy, addrs := range c.metadata.hostListens { res[proxy] = map[netaddr.IPPort]struct{}{} for _, addr := range addrs { if addr.IP().IsUnspecified() { for _, ip := range hostIps { if addr.IP().Is4() && ip.Is4() || addr.IP().Is6() && ip.Is6() { res[proxy][netaddr.IPPortFrom(ip, addr.Port())] = struct{}{} } } } else { res[proxy][addr] = struct{}{} } } } return res } func (c *Container) ping() map[netaddr.IP]float64 { netNs := netns.None() for pid := range c.processes { if pid == agentPid { netNs = selfNetNs break } ns, err := proc.GetNetNs(pid) if err != nil { if !common.IsNotExist(err) { klog.Warningln(err) } continue } netNs = ns defer netNs.Close() break } if !netNs.IsOpen() { return nil } ips := map[netaddr.IP]struct{}{} for d := range c.connectsSuccessful { ips[d.dst.IP()] = struct{}{} } for dst := range c.connectsFailed { ips[dst.src.IP()] = struct{}{} } if len(ips) == 0 { return nil } targets := make([]netaddr.IP, 0, len(ips)) for ip := range ips { if ip.IsLoopback() { continue } if !ip.Is4() { // pinger doesn't support IPv6 yet continue } targets = append(targets, ip) } rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout) if err != nil { klog.Warningln(err) return nil } return rtt } func (c *Container) runLogParser(logPath string) { if *flags.DisableLogParsing { return } containerId := string(c.id) if logPath != "" { if c.logParsers[logPath] != nil { return } ch := make(chan logparser.LogEntry) parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId)) reader, err := logs.NewTailReader(proc.HostPath(logPath), ch) if err != nil { klog.Warningln(err) parser.Stop() return } klog.Debugln("started varlog logparser", "cg", c.cgroup.Id, "log", logPath) c.logParsers[logPath] = &LogParser{parser: parser, stop: reader.Stop} return } switch c.cgroup.ContainerType { case cgroup.ContainerTypeSystemdService: ch := make(chan logparser.LogEntry) if err := JournaldSubscribe(c.cgroup, ch); err != nil { klog.Warningln(err) return } parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId)) stop := func() { JournaldUnsubscribe(c.cgroup) } klog.Infoln("started journald logparser", "cg", c.cgroup.Id) c.logParsers["journald"] = &LogParser{parser: parser, stop: stop} case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd, cgroup.ContainerTypeCrio: if c.metadata.logPath == "" { return } if parser := c.logParsers["stdout/stderr"]; parser != nil { parser.Stop() delete(c.logParsers, "stdout/stderr") } ch := make(chan logparser.LogEntry) parser := logparser.NewParser(ch, c.metadata.logDecoder, logs.OtelLogEmitter(containerId)) reader, err := logs.NewTailReader(proc.HostPath(c.metadata.logPath), ch) if err != nil { klog.Warningln(err) parser.Stop() return } klog.Infoln("started container logparser", "cg", c.cgroup.Id) c.logParsers["stdout/stderr"] = &LogParser{parser: parser, stop: reader.Stop} } } func (c *Container) gc(now time.Time) { // c.lock.Lock() // defer c.lock.Unlock() established := map[AddrPair]struct{}{} establishedDst := map[netaddr.IPPort]struct{}{} listens := map[netaddr.IPPort]string{} seenNamespaces := map[string]bool{} fdMap := map[uint64]struct{}{} for _, p := range c.processes { if seenNamespaces[p.NetNsId()] { continue } sockets, err := proc.GetSockets(p.Pid) if err != nil { continue } fds, err := proc.ReadFds(p.Pid) if err == nil { for _, fd := range fds { fdMap[fd.Fd] = struct{}{} } } for _, s := range sockets { if s.Listen { listens[s.SAddr] = s.Inode } else { established[AddrPair{src: s.SAddr, dst: s.DAddr}] = struct{}{} establishedDst[s.DAddr] = struct{}{} } } seenNamespaces[p.NetNsId()] = true } c.revalidateListens(now, listens) for srcDst, conn := range c.connectionsActive { pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd} if _, ok := established[srcDst]; !ok { delete(c.connectionsActive, srcDst) if conn == c.connectionsByPidFd[pidFd] { delete(c.connectionsByPidFd, pidFd) } continue } if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval { delete(c.connectionsActive, srcDst) if conn == c.connectionsByPidFd[pidFd] { delete(c.connectionsByPidFd, pidFd) } } } for srcDst, conn := range c.acceptsActive { pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd} if _, ok := established[srcDst]; !ok { delete(c.acceptsActive, srcDst) if conn == c.acceptsByPidFd[pidFd] { delete(c.acceptsByPidFd, pidFd) } continue } if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval { delete(c.acceptsActive, srcDst) if conn == c.acceptsByPidFd[pidFd] { delete(c.acceptsByPidFd, pidFd) } } } for _, conn := range c.connectionsByPidFd { if _, ok := fdMap[conn.Fd]; !ok { delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd}) } } for _, conn := range c.acceptsByPidFd { if _, ok := fdMap[conn.Fd]; !ok { delete(c.acceptsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd}) } } for dst, at := range c.connectLastAttempt { _, active := establishedDst[dst] if !active && !at.IsZero() && now.Sub(at) > gcInterval { delete(c.connectLastAttempt, dst) // delete(c.connectsFailed, dst) for dfailed := range c.connectsFailed { if dfailed.src == dst { delete(c.connectsFailed, dfailed) } } for d := range c.connectsSuccessful { if d.src == dst { delete(c.connectsSuccessful, d) } } c.l7Stats.delete(dst) } } for dst, at := range c.acceptLastAttempt { _, active := establishedDst[dst] if !active && !at.IsZero() && now.Sub(at) > gcInterval { delete(c.acceptLastAttempt, dst) for d := range c.acceptsSuccessful { if d.src == dst { delete(c.acceptsSuccessful, d) } } c.l7Stats.delete(dst) } } } func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.IPPort]string) { for addr, byPid := range c.listens { if _, open := actualListens[addr]; open { continue } klog.Warningln("deleting the outdated listen:", addr) for _, details := range byPid { if details.ClosedAt.IsZero() { details.ClosedAt = now } } } missingListens := map[netaddr.IPPort]string{} for addr, inode := range actualListens { byPids, found := c.listens[addr] if !found { missingListens[addr] = inode continue } open := false for _, details := range byPids { if details.ClosedAt.IsZero() { open = true break } } if !open { missingListens[addr] = inode } } if len(missingListens) > 0 { inodeToPid := map[string]uint32{} for pid := range c.processes { fds, err := proc.ReadFds(pid) if err != nil { klog.Warningln(err) continue } for _, fd := range fds { if fd.SocketInode != "" { inodeToPid[fd.SocketInode] = pid } } } for addr, inode := range missingListens { pid, found := inodeToPid[inode] if !found { continue } //klog.Warningln("missing listen found:", addr, pid) c.onListenOpen(pid, addr, true) } } for addr, pids := range c.listens { for pid, details := range pids { if !details.ClosedAt.IsZero() && now.Sub(details.ClosedAt) > gcInterval { delete(c.listens[addr], pid) } } if len(c.listens[addr]) == 0 { delete(c.listens, addr) } } } func (c *Container) AttachUprobes(tracer *ebpftracer.Tracer, pid uint32, _type string) error { klog.Infoln("[attach] attachUprobes start by :", _type) if tracer.DisableL7Tracing() { return nil } codeType := c.GetCodeTypeFromCache(pid) if codeType.IsUnknownCode() { return nil } var err error switch codeType { case CodeTypeJava: err = c.attachJVMUprobes(tracer, pid) case CodeTypeJavaAot: err = c.attachJavaAotUprobes(tracer, pid) case CodeTypeGo: err = c.attachTlsUprobes(tracer, pid) case CodeTypeNetCoreAot: err = c.attachNetCoreUprobes(tracer, pid) } if err != nil { klog.WithField("pid", pid).Errorf("[attach] error %v :", err) deErr := c.DetachUprobes(tracer, pid, APP_UPROBE_ERROR) if deErr != nil { klog.WithField("pid", pid).Errorf("[attach] Detach Uprobes error %v :", deErr) } return err } return nil } func (c *Container) GetCodeTypeFromCache(pid uint32) CodeType { p := c.processes[pid] if p == nil { return CodeTypeUnknown } if p.codeType.IsWaitCheck() { p.codeType = GetExeType(pid, c.getRootfs()) } return p.codeType } func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) error { p := c.processes[pid] if p == nil { return nil } if p.openSslUprobesChecked || p.goTlsUprobesChecked { return nil } if !p.openSslUprobesChecked { p.openSslUprobesChecked = true sslProbes, err := tracer.AttachOpenSslUprobes(pid) if err != nil { return err } p.uprobes = append(p.uprobes, sslProbes...) } if !p.goTlsUprobesChecked { p.goTlsUprobesChecked = true codeType := c.GetCodeTypeFromCache(pid) goProbes, err := tracer.AttachGoTlsUprobes(pid, &c.AppInfo, uint16(codeType)) if err != nil { return err } p.uprobes = append(p.uprobes, goProbes...) c.l7AttachSuccess() } return nil } func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) error { if common.IsOpenFilter() && !common.IsFilterPid(pid) { return nil } p := c.processes[pid] if p == nil { return nil } codeType := c.GetCodeTypeFromCache(pid) if !p.jvmAttachOnce { p.jvmAttachOnce = true rootfs := c.getRootfs() // TODO java Aot if codeType.IsJvmCode() { // check version libjavaso, err := utils.GetSoPath(pid, "libjava.so", rootfs) if err != nil { klog.WithError(err).Errorf("[attach] Failed get so path") return err } v, err := ebpftracer.GetJvmVersion(libjavaso) if err != nil { klog.WithError(err).Errorf("[attach] Failed get Java version") return err } c.AppInfo.Version = v major, minor, patch, err := ebpftracer.ParseVersion(v) klog.Infof("[attach] version: %s (Major: %d, Minor: %d, Patch: %d)", v, major, minor, patch) if major != 1 || minor != 8 { klog.Errorf("[attach] Unsupported Java version.") return fmt.Errorf("[attach] Unsupported Java version") } } /*libNioProbes, err := tracer.AttachJavaNioReadUprobes(pid, codeType, rootfs) if err != nil { klog.Error(err) return err } p.uprobes = append(p.uprobes, libNioProbes...)*/ libNetProbes, err := tracer.AttachJavaNetWriteUprobes(pid, rootfs) if err != nil { klog.Error(err) return err } p.uprobes = append(p.uprobes, libNetProbes...) //p.jvmUprobesChecked = true c.l7AttachSuccess() err = tracer.InitKProcInfo(pid, &c.AppInfo) if err != nil { klog.Errorf("[attach] InitKProcInfo failed pid:[%d] ;%s.", pid, err.Error()) return err } else { klog.Infof("[attach] InitKProcInfo succeed! pid:[%d]", pid) } } else { klog.Infof("[attach] %s-%d already attach status:%v", codeType.String(), pid, c.Isl7AttachSuccess()) } return nil } func (c *Container) attachJavaAotUprobes(tracer *ebpftracer.Tracer, pid uint32) error { if common.IsOpenFilter() && !common.IsFilterPid(pid) { return nil } p := c.processes[pid] if p == nil { return nil } codeType := c.GetCodeTypeFromCache(pid) if !p.jvmAttachOnce { p.jvmAttachOnce = true rootfs := c.getRootfs() // // TODO java Aot // if codeType.IsJavaAotCode() { // // check version // libjavaso, err := utils.GetSoPath(pid, "libjava.so", rootfs) // if err != nil { // klog.WithError(err).Errorf("[attach] Failed get so path") // return err // } // v, err := ebpftracer.GetJvmVersion(libjavaso) // if err != nil { // klog.WithError(err).Errorf("[attach] Failed get Java version") // return err // } // c.AppInfo.Version = v // major, minor, patch, err := ebpftracer.ParseVersion(v) // klog.Infof("[attach] version: %s (Major: %d, Minor: %d, Patch: %d)", v, major, minor, patch) // if major != 1 || minor != 8 { // return fmt.Errorf("[attach] Unsupported Java version") // } // } libNioProbes, err := tracer.AttachJavaAotNioReadUprobes(pid, codeType, rootfs) if err != nil { klog.Error(err) return err } p.uprobes = append(p.uprobes, libNioProbes...) libNetProbes, err := tracer.AttachJavaAotNetWriteUprobes(pid, rootfs) if err != nil { klog.Error(err) return err } p.uprobes = append(p.uprobes, libNetProbes...) //p.jvmUprobesChecked = true c.l7AttachSuccess() tracer.InitKProcInfo(pid, &c.AppInfo) } else { klog.Infof("[attach] %s-%d already attach", codeType.String(), pid) } return nil } func (c *Container) errorClose(pid uint32, closeType int) { p := c.processes[pid] if p != nil { p.DynamicClose(closeType) } } func (c *Container) attachNetCoreUprobes(tracer *ebpftracer.Tracer, pid uint32) error { if common.IsOpenFilter() && !common.IsFilterPid(pid) { return nil } p := c.processes[pid] if p == nil { return nil } if !p.jvmAttachOnce { p.jvmAttachOnce = true //codeType := c.GetCodeTypeFromCache(pid) tracer.InitKProcInfo(pid, &c.AppInfo) p.uprobes = append(p.uprobes, tracer.AttachNetCoreNetThreadUprobes(pid)...) readProbes, err := tracer.AttachNetCoreNetReadUprobes(pid) if err != nil { klog.Error(err) return err } p.uprobes = append(p.uprobes, readProbes...) WriteProbes, err := tracer.AttachNetCoreNetWriteUprobes(pid) if err != nil { klog.Error(err) return err } p.uprobes = append(p.uprobes, WriteProbes...) c.l7AttachSuccess() } return nil } func resolveFd(pid uint32, fd uint64) (mntId string, logPath string) { info := proc.GetFdInfo(pid, fd) if info == nil { return } switch { case info.Flags&os.O_WRONLY == 0 && info.Flags&os.O_RDWR == 0, !strings.HasPrefix(info.Dest, "/"), strings.HasPrefix(info.Dest, "/proc/"), strings.HasPrefix(info.Dest, "/dev/"), strings.HasPrefix(info.Dest, "/sys/"), strings.HasSuffix(info.Dest, "(deleted)"): return } mntId = info.MntId if info.Flags&os.O_WRONLY != 0 && strings.HasPrefix(info.Dest, "/var/log/") && !strings.HasPrefix(info.Dest, "/var/log/pods/") && !strings.HasPrefix(info.Dest, "/var/log/containers/") && !strings.HasPrefix(info.Dest, "/var/log/journal/") { logPath = info.Dest } return } func counter(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric { return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labelValues...) } func gauge(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric { return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...) }