Browse Source

Merge remote-tracking branch 'origin/main' into dev-newcoroot

rock 1 year ago
parent
commit
cfcf7c689b

+ 5 - 2
cgroup/cgroup.go

@@ -105,7 +105,10 @@ func NewFromProcessCgroupFile(filePath string) (*Cgroup, error) {
 			cg.subsystems[cgType] = path.Join(baseCgroupPath, parts[2])
 		}
 	}
-	if p := cg.subsystems["cpu"]; p != "" {
+	if p := cg.subsystems["name=systemd"]; p != "" {
+		cg.Id = p
+		cg.Version = V1
+	} else if p = cg.subsystems["cpu"]; p != "" {
 		cg.Id = p
 		cg.Version = V1
 	} else {
@@ -164,7 +167,7 @@ func containerByCgroup(path string) (ContainerType, string, error) {
 		if matches == nil {
 			return ContainerTypeUnknown, "", fmt.Errorf("invalid systemd cgroup %s", path)
 		}
-		return ContainerTypeSystemdService, matches[1], nil
+		return ContainerTypeSystemdService, strings.Replace(matches[1], "\\x2d", "-", -1), nil
 	}
 	return ContainerTypeUnknown, "", fmt.Errorf("unknown container: %s", path)
 }

+ 8 - 1
cgroup/cgroup_test.go

@@ -55,6 +55,13 @@ func TestNewFromProcessCgroupFile(t *testing.T) {
 	assert.Equal(t, "73051af271105c07e1f493b34856a77e665e3b0b4fc72f76c807dfbffeb881bd", cg.ContainerId)
 	assert.Equal(t, ContainerTypeDocker, cg.ContainerType)
 
+	cg, err = NewFromProcessCgroupFile(path.Join("fixtures/proc/600/cgroup"))
+	assert.Nil(t, err)
+	assert.Equal(t, V1, cg.Version)
+	assert.Equal(t, "/system.slice/springboot.service", cg.Id)
+	assert.Equal(t, "/system.slice/springboot.service", cg.ContainerId)
+	assert.Equal(t, ContainerTypeSystemdService, cg.ContainerType)
+
 	baseCgroupPath = "/kubepods.slice/kubepods-besteffort.slice/kubepods-besteffort-podc83d0428_58af_41eb_8dba_b9e6eddffe7b.slice/docker-0e612005fd07e7f47e2cd07df99a2b4e909446814d71d0b5e4efc7159dd51252.scope"
 	defer func() {
 		baseCgroupPath = ""
@@ -112,7 +119,7 @@ func TestContainerByCgroup(t *testing.T) {
 
 	typ, id, err = containerByCgroup("/system.slice/system-serial\\x2dgetty.slice")
 	as.Equal(typ, ContainerTypeSystemdService)
-	as.Equal("/system.slice/system-serial\\x2dgetty.slice", id)
+	as.Equal("/system.slice/system-serial-getty.slice", id)
 	as.Nil(err)
 
 	typ, id, err = containerByCgroup("/runtime.slice/kubelet.service")

+ 21 - 23
cgroup/cpu.go

@@ -2,7 +2,7 @@ package cgroup
 
 import (
 	"fmt"
-	"io/ioutil"
+	"os"
 	"path"
 	"strconv"
 	"strings"
@@ -57,28 +57,26 @@ func (cg Cgroup) cpuStatV2() (*CPUStat, error) {
 		UsageSeconds:         float64(vars["usage_usec"]) / 1e6,
 		ThrottledTimeSeconds: float64(vars["throttled_usec"]) / 1e6,
 	}
-	payload, err := ioutil.ReadFile(path.Join(cgRoot, cg.subsystems[""], "cpu.max"))
-	if err != nil {
-		return nil, err
-	}
-	data := strings.TrimSpace(string(payload))
-	parts := strings.Fields(data)
-	if len(parts) != 2 {
-		return nil, fmt.Errorf("invalid cpu.max payload: %s", data)
-	}
-	if parts[0] == "max" { //no limit
-		return res, nil
-	}
-	quotaUs, err := strconv.ParseUint(parts[0], 10, 64)
-	if err != nil {
-		return nil, fmt.Errorf("invalid quota value in cpu.max: %s", parts[0])
-	}
-	periodUs, err := strconv.ParseUint(parts[1], 10, 64)
-	if err != nil {
-		return nil, fmt.Errorf("invalid period value in cpu.max: %s", parts[1])
-	}
-	if periodUs > 0 {
-		res.LimitCores = float64(quotaUs) / float64(periodUs)
+	if payload, err := os.ReadFile(path.Join(cgRoot, cg.subsystems[""], "cpu.max")); err == nil {
+		data := strings.TrimSpace(string(payload))
+		parts := strings.Fields(data)
+		if len(parts) != 2 {
+			return nil, fmt.Errorf("invalid cpu.max payload: %s", data)
+		}
+		if parts[0] == "max" { //no limit
+			return res, nil
+		}
+		quotaUs, err := strconv.ParseUint(parts[0], 10, 64)
+		if err != nil {
+			return nil, fmt.Errorf("invalid quota value in cpu.max: %s", parts[0])
+		}
+		periodUs, err := strconv.ParseUint(parts[1], 10, 64)
+		if err != nil {
+			return nil, fmt.Errorf("invalid period value in cpu.max: %s", parts[1])
+		}
+		if periodUs > 0 {
+			res.LimitCores = float64(quotaUs) / float64(periodUs)
+		}
 	}
 	return res, nil
 }

+ 11 - 0
cgroup/fixtures/proc/600/cgroup

@@ -0,0 +1,11 @@
+11:freezer:/
+10:blkio:/init.scope
+9:hugetlb:/
+8:memory:/init.scope
+7:pids:/init.scope
+6:devices:/init.scope
+5:rdma:/
+4:cpuset:/
+3:net_cls,net_prio:/
+2:cpu,cpuacct:/
+1:name=systemd:/system.slice/springboot.service

+ 1 - 5
cgroup/memory.go

@@ -47,17 +47,13 @@ func (cg *Cgroup) memoryStatV1() (*MemoryStat, error) {
 }
 
 func (cg *Cgroup) memoryStatV2() (*MemoryStat, error) {
-	current, err := readUintFromFile(path.Join(cgRoot, cg.subsystems[""], "memory.current"))
-	if err != nil {
-		return nil, err
-	}
 	vars, err := readVariablesFromFile(path.Join(cgRoot, cg.subsystems[""], "memory.stat"))
 	if err != nil {
 		return nil, err
 	}
 	limit, _ := readUintFromFile(path.Join(cgRoot, cg.subsystems[""], "memory.max"))
 	return &MemoryStat{
-		RSS:   current - vars["file"],
+		RSS:   vars["anon"] + vars["file_mapped"],
 		Cache: vars["file"],
 		Limit: limit,
 	}, nil

+ 2 - 2
cgroup/memory_test.go

@@ -25,14 +25,14 @@ func TestCgroup_MemoryStat(t *testing.T) {
 	cg, _ = NewFromProcessCgroupFile(path.Join("fixtures/proc/400/cgroup"))
 	stat, err = cg.MemoryStat()
 	assert.Nil(t, err)
-	assert.Equal(t, uint64(48648192-1044480), stat.RSS)
+	assert.Equal(t, uint64(44892160+0), stat.RSS)
 	assert.Equal(t, uint64(1044480), stat.Cache)
 	assert.Equal(t, uint64(0), stat.Limit)
 
 	cg, _ = NewFromProcessCgroupFile(path.Join("fixtures/proc/500/cgroup"))
 	stat, err = cg.MemoryStat()
 	assert.Nil(t, err)
-	assert.Equal(t, uint64(131047424-50835456), stat.RSS)
+	assert.Equal(t, uint64(75247616+4038656), stat.RSS)
 	assert.Equal(t, uint64(50835456), stat.Cache)
 	assert.Equal(t, uint64(4294967296), stat.Limit)
 

+ 4 - 4
cgroup/utils.go

@@ -1,7 +1,7 @@
 package cgroup
 
 import (
-	"io/ioutil"
+	"os"
 	"strconv"
 	"strings"
 
@@ -9,7 +9,7 @@ import (
 )
 
 func readVariablesFromFile(filePath string) (map[string]uint64, error) {
-	data, err := ioutil.ReadFile(filePath)
+	data, err := os.ReadFile(filePath)
 	if err != nil {
 		return nil, err
 	}
@@ -29,7 +29,7 @@ func readVariablesFromFile(filePath string) (map[string]uint64, error) {
 }
 
 func readIntFromFile(filePath string) (int64, error) {
-	data, err := ioutil.ReadFile(filePath)
+	data, err := os.ReadFile(filePath)
 	if err != nil {
 		return 0, err
 	}
@@ -37,7 +37,7 @@ func readIntFromFile(filePath string) (int64, error) {
 }
 
 func readUintFromFile(filePath string) (uint64, error) {
-	data, err := ioutil.ReadFile(filePath)
+	data, err := os.ReadFile(filePath)
 	if err != nil {
 		return 0, err
 	}

+ 9 - 7
common/net.go

@@ -17,14 +17,16 @@ var (
 )
 
 func init() {
-	if flags.ExternalNetworksWhitelist != nil {
-		for _, prefix := range *flags.ExternalNetworksWhitelist {
-			p, err := netaddr.ParseIPPrefix(prefix)
-			if err != nil {
-				klog.Fatalf("invalid network %s: %s", prefix, err)
-			}
-			ConnectionFilter.WhitelistPrefix(p)
+	klog.Infoln("whitelisted public IPs:", *flags.ExternalNetworksWhitelist)
+	for _, prefix := range *flags.ExternalNetworksWhitelist {
+		if prefix == "" {
+			continue
 		}
+		p, err := netaddr.ParseIPPrefix(prefix)
+		if err != nil {
+			klog.Fatalf("invalid network %s: %s", prefix, err)
+		}
+		ConnectionFilter.WhitelistPrefix(p)
 	}
 	if r := flags.EphemeralPortRange; r != nil && *r != "" {
 		klog.Infoln("ephemeral-port-range:", *r)

+ 2 - 1
common/otel.go

@@ -9,13 +9,14 @@ var (
 	deploymentPodRegex  = regexp.MustCompile(`(/k8s/[a-z0-9-]+/[a-z0-9-]+)-[0-9a-f]{1,10}-[bcdfghjklmnpqrstvwxz2456789]{5}/.+`)
 	daemonsetPodRegex   = regexp.MustCompile(`(/k8s/[a-z0-9-]+/[a-z0-9-]+)-[bcdfghjklmnpqrstvwxz2456789]{5}/.+`)
 	statefulsetPodRegex = regexp.MustCompile(`(/k8s/[a-z0-9-]+/[a-z0-9-]+)-\d+/.+`)
+	cronjobPodRegex     = regexp.MustCompile(`(/k8s-cronjob/[a-z0-9-]+/[a-z0-9-]+)/.+`)
 )
 
 func ContainerIdToOtelServiceName(containerId string) string {
 	if !strings.HasPrefix(containerId, "/k8s/") {
 		return containerId
 	}
-	for _, r := range []*regexp.Regexp{deploymentPodRegex, daemonsetPodRegex, statefulsetPodRegex} {
+	for _, r := range []*regexp.Regexp{deploymentPodRegex, daemonsetPodRegex, statefulsetPodRegex, cronjobPodRegex} {
 		if g := r.FindStringSubmatch(containerId); len(g) == 2 {
 			return g[1]
 		}

+ 5 - 0
containers/app.go

@@ -13,6 +13,9 @@ var (
 
 func guessApplicationType(cmdline []byte) string {
 	parts := bytes.Split(cmdline, []byte{0})
+	if len(parts) == 0 || len(parts[0]) == 0 {
+		return ""
+	}
 	cmd := bytes.TrimSuffix(bytes.Fields(parts[0])[0], []byte{':'})
 	switch {
 	case bytes.HasSuffix(cmd, []byte("memcached")):
@@ -29,6 +32,8 @@ func guessApplicationType(cmdline []byte) string {
 		return "mongos"
 	case bytes.HasSuffix(cmd, []byte("mysqld")):
 		return "mysql"
+	case bytes.HasSuffix(cmd, []byte("mariadbd")):
+		return "mysql"
 	case bytes.Contains(cmdline, []byte("org.apache.zookeeper.server.quorum.QuorumPeerMain")):
 		return "zookeeper"
 	case bytes.HasSuffix(cmd, []byte("redis-server")):

+ 144 - 63
containers/container.go

@@ -4,6 +4,7 @@ import (
 	debugelf "debug/elf"
 	. "github.com/coroot/coroot-node-agent/utils/modelse"
 	"os"
+	"sort"
 	"strings"
 	"sync"
 	"time"
@@ -23,6 +24,7 @@ import (
 	"github.com/prometheus/client_golang/prometheus"
 	klog "github.com/sirupsen/logrus"
 	"github.com/vishvananda/netns"
+	"golang.org/x/exp/maps"
 	"inet.af/netaddr"
 )
 
@@ -38,14 +40,16 @@ type ContainerNetwork struct {
 }
 
 type ContainerMetadata struct {
-	name        string
-	labels      map[string]string
-	volumes     map[string]string
-	logPath     string
-	image       string
-	logDecoder  logparser.Decoder
-	hostListens map[string][]netaddr.IPPort
-	networks    map[string]ContainerNetwork
+	name               string
+	labels             map[string]string
+	volumes            map[string]string
+	logPath            string
+	image              string
+	logDecoder         logparser.Decoder
+	hostListens        map[string][]netaddr.IPPort
+	networks           map[string]ContainerNetwork
+	env                map[string]string
+	systemdTriggeredBy string
 }
 
 type Delays struct {
@@ -78,6 +82,9 @@ type ActiveConnection struct {
 	Timestamp  uint64
 	Closed     time.Time
 
+	BytesSent     uint64
+	BytesReceived uint64
+
 	http2Parser    *l7.Http2Parser
 	postgresParser *l7.PostgresParser
 	mysqlParser    *l7.MysqlParser
@@ -101,6 +108,12 @@ type K8sContainer struct {
 	workload      string
 	containerName string
 	pid           string
+type ConnectionStats struct {
+	Count           uint64
+	TotalTime       time.Duration
+	Retransmissions uint64
+	BytesSent       uint64
+	BytesReceived   uint64
 }
 
 type Container struct {
@@ -121,19 +134,18 @@ type Container struct {
 	delaysLock  sync.Mutex
 
 	listens map[netaddr.IPPort]map[uint32]*ListenDetails
-	ipsByNs map[string][]netaddr.IP
 
-	connectsSuccessful map[AddrPair]int64           // dst:actual_dst -> count
-	connectsFailed     map[netaddr.IPPort]int64     // dst -> count
-	connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
+	connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count
+	connectsFailed     map[netaddr.IPPort]int64      // dst -> count
+	connectLastAttempt map[netaddr.IPPort]time.Time  // dst -> time
 	connectionsActive  map[AddrPair]*ActiveConnection
 	connectionsByPidFd map[PidFd]*ActiveConnection
-	retransmits        map[AddrPair]int64 // dst:actual_dst -> count
 
 	l7Stats  L7Stats
 	dnsStats *L7Metrics
 
-	oomKills int
+	oomKills                 int
+	pythonThreadLockWaitTime time.Duration
 
 	mounts map[string]proc.MountInfo
 
@@ -143,6 +155,8 @@ type Container struct {
 	nsConntrack   *Conntrack
 	lbConntracks  []*Conntrack
 
+	registry *Registry
+
 	lock sync.RWMutex
 
 	done chan struct{}
@@ -160,7 +174,7 @@ type Container struct {
 	AppInfo AppInfo
 }
 
-func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32) (*Container, error) {
+func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) {
 	netNs, err := proc.GetNetNs(pid)
 	if err != nil {
 		return nil, err
@@ -176,14 +190,12 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
 		delaysByPid: map[uint32]Delays{},
 
 		listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
-		ipsByNs: map[string][]netaddr.IP{},
 
-		connectsSuccessful: map[AddrPair]int64{},
+		connectsSuccessful: map[AddrPair]*ConnectionStats{},
 		connectsFailed:     map[netaddr.IPPort]int64{},
 		connectLastAttempt: map[netaddr.IPPort]time.Time{},
 		connectionsActive:  map[AddrPair]*ActiveConnection{},
 		connectionsByPidFd: map[PidFd]*ActiveConnection{},
-		retransmits:        map[AddrPair]int64{},
 		l7Stats:            L7Stats{},
 		dnsStats:           &L7Metrics{},
 
@@ -195,6 +207,7 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
 
 		done:     make(chan struct{}),
 		traceMap: make(map[uint64]*tracing.Trace),
+		registry: registry,
 	}
 
 	for _, n := range md.networks {
@@ -249,11 +262,13 @@ func (c *Container) Describe(ch chan<- *prometheus.Desc) {
 }
 
 func (c *Container) Collect(ch chan<- prometheus.Metric) {
+	c.registry.updateTrafficStatsIfNecessary()
+
 	c.lock.RLock()
 	defer c.lock.RUnlock()
 
-	if c.metadata.image != "" {
-		ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image)
+	if c.metadata.image != "" || c.metadata.systemdTriggeredBy != "" {
+		ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image, c.metadata.systemdTriggeredBy)
 	}
 
 	ch <- counter(metrics.Restarts, float64(c.restarts))
@@ -315,14 +330,17 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 		}
 	}
 
-	for d, count := range c.connectsSuccessful {
-		ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String())
+	for d, stats := range c.connectsSuccessful {
+		ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), d.src.String(), d.dst.String())
+		ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), d.src.String(), d.dst.String())
+		if stats.Retransmissions > 0 {
+			ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), d.src.String(), d.dst.String())
+		}
+		ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String())
+		ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String())
 	}
 	for dst, count := range c.connectsFailed {
-		ch <- counter(metrics.NetConnectsFailed, float64(count), dst.String())
-	}
-	for d, count := range c.retransmits {
-		ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String())
+		ch <- counter(metrics.NetConnectionsFailed, float64(count), dst.String())
 	}
 
 	connections := map[AddrPair]int{}
@@ -344,7 +362,14 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 
 	appTypes := map[string]struct{}{}
 	seenJvms := map[string]bool{}
-	for pid, process := range c.processes {
+	seenDotNetApps := map[string]bool{}
+	pids := maps.Keys(c.processes)
+	sort.Slice(pids, func(i, j int) bool {
+		return pids[i] < pids[j]
+	})
+
+	for _, pid := range pids {
+		process := c.processes[pid]
 		cmdline := proc.GetCmdline(pid)
 		if len(cmdline) == 0 {
 			continue
@@ -353,6 +378,9 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 		if appType != "" {
 			appTypes[appType] = struct{}{}
 		}
+		if process.isGolangApp {
+			appTypes["golang"] = struct{}{}
+		}
 		switch {
 		case isJvm(cmdline):
 			jvm, jMetrics := jvmMetrics(pid)
@@ -364,12 +392,20 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 			}
 		case process.dotNetMonitor != nil:
 			appTypes["dotnet"] = struct{}{}
-			process.dotNetMonitor.Collect(ch)
+			appName := process.dotNetMonitor.AppName()
+			if !seenDotNetApps[appName] {
+				seenDotNetApps[appName] = true
+				process.dotNetMonitor.Collect(ch)
+			}
 		}
 	}
 	for appType := range appTypes {
 		ch <- gauge(metrics.ApplicationType, 1, appType)
 	}
+	if c.pythonThreadLockWaitTime > 0 {
+		ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds())
+	}
+
 	if c.dnsStats.Requests != nil {
 		c.dnsStats.Requests.Collect(ch)
 	}
@@ -393,7 +429,8 @@ func (c *Container) onProcessStart(pid uint32) *Process {
 		return nil
 	}
 	c.zombieAt = time.Time{}
-	p := NewProcess(pid, stats)
+	p := NewProcess(pid, stats, c.registry.tracer)
+
 	if p == nil {
 		return nil
 	}
@@ -463,6 +500,7 @@ func (c *Container) onFileOpen(pid uint32, fd uint64) {
 
 // set
 func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
+	klog.Infof("TCP listen open pid=%d id=%s addr=%s", pid, c.id, addr)
 	if common.PortFilter.ShouldBeSkipped(addr.Port()) {
 		return
 	}
@@ -485,20 +523,18 @@ func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
 			return
 		}
 		defer ns.Close()
-		nsId := ns.UniqueId()
-		ips, ok := c.ipsByNs[nsId]
-		if !ok {
-			if ips, err = proc.GetNsIps(ns); err != nil {
-				klog.Warningln(err)
-			} else {
-				c.ipsByNs[nsId] = ips
-			}
+		ips, err := proc.GetNsIps(ns)
+		if err != nil {
+			klog.Warningln(err)
+			return
 		}
+		klog.Infof("got IPs %s for %s", ips, ns.UniqueId())
 		details.NsIPs = ips
 	}
 }
 
 func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
+	klog.Infof("TCP listen close pid=%d id=%s addr=%s", pid, c.id, addr)
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	if _, byAddr := c.listens[addr]; byAddr {
@@ -510,7 +546,7 @@ func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
 	}
 }
 
-func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool) {
+func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
 	if common.PortFilter.ShouldBeSkipped(dst.Port()) {
 		return
 	}
@@ -542,7 +578,14 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
 	if failed {
 		c.connectsFailed[dst]++
 	} else {
-		c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
+		key := AddrPair{src: dst, dst: *actualDst}
+		stats := c.connectsSuccessful[key]
+		if stats == nil {
+			stats = &ConnectionStats{}
+			c.connectsSuccessful[key] = stats
+		}
+		stats.Count++
+		stats.TotalTime += duration
 		connection := &ActiveConnection{
 			Dest:       dst,
 			ActualDest: *actualDst,
@@ -586,15 +629,49 @@ func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*
 	return nil, nil
 }
 
-func (c *Container) onConnectionClose(srcDst AddrPair) bool {
+func (c *Container) onConnectionClose(e ebpftracer.Event) {
+	c.lock.Lock()
+	conn := c.connectionsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
+	c.lock.Unlock()
+	if conn != nil {
+		if conn.Closed.IsZero() {
+			if e.TrafficStats != nil {
+				c.lock.Lock()
+				c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived)
+				c.lock.Unlock()
+			}
+			conn.Closed = time.Now()
+		}
+	}
+}
+
+func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) {
+	if u == nil {
+		return
+	}
 	c.lock.Lock()
 	defer c.lock.Unlock()
-	conn := c.connectionsActive[srcDst]
-	if conn == nil {
-		return false
+	c.updateConnectionTrafficStats(c.connectionsByPidFd[PidFd{Pid: u.Pid, Fd: u.FD}], u.BytesSent, u.BytesReceived)
+}
+
+func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, received uint64) {
+	if ac == nil {
+		return
 	}
-	conn.Closed = time.Now()
-	return true
+	key := AddrPair{src: ac.Dest, dst: ac.ActualDest}
+	stats := c.connectsSuccessful[key]
+	if stats == nil {
+		stats = &ConnectionStats{}
+		c.connectsSuccessful[key] = stats
+	}
+	if sent > ac.BytesSent {
+		stats.BytesSent += sent - ac.BytesSent
+	}
+	if received > ac.BytesReceived {
+		stats.BytesReceived += received - ac.BytesReceived
+	}
+	ac.BytesSent = sent
+	ac.BytesReceived = received
 }
 
 func (c *Container) onDNSRequest(r *l7.RequestData) (map[netaddr.IP]string, string, string) {
@@ -610,10 +687,10 @@ func (c *Container) onDNSRequest(r *l7.RequestData) (map[netaddr.IP]string, stri
 		dnsReq := L7Requests[l7.ProtocolDNS]
 		c.dnsStats.Requests = prometheus.NewCounterVec(
 			prometheus.CounterOpts{Name: dnsReq.Name, Help: dnsReq.Help},
-			[]string{"request_type", "status"},
+			[]string{"request_type", "domain", "status"},
 		)
 	}
-	if m, _ := c.dnsStats.Requests.GetMetricWithLabelValues(t, status); m != nil {
+	if m, _ := c.dnsStats.Requests.GetMetricWithLabelValues(t, fqdn, status); m != nil {
 		m.Inc()
 	}
 	if r.Duration != 0 {
@@ -703,14 +780,20 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
 	return nil
 }
 
-func (c *Container) onRetransmit(srcDst AddrPair) bool {
+func (c *Container) onRetransmission(srcDst AddrPair) bool {
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	conn, ok := c.connectionsActive[srcDst]
 	if !ok {
 		return false
 	}
-	c.retransmits[AddrPair{src: srcDst.dst, dst: conn.ActualDest}]++
+	key := AddrPair{src: srcDst.dst, dst: conn.ActualDest}
+	stats := c.connectsSuccessful[key]
+	if stats == nil {
+		stats = &ConnectionStats{}
+		c.connectsSuccessful[key] = stats
+	}
+	stats.Retransmissions++
 	return true
 }
 
@@ -872,6 +955,12 @@ func (c *Container) ping() map[netaddr.IP]float64 {
 	}
 	targets := make([]netaddr.IP, 0, len(ips))
 	for ip := range ips {
+		if ip.IsLoopback() {
+			continue
+		}
+		if !ip.Is4() { // pinger doesn't support IPv6 yet
+			continue
+		}
 		targets = append(targets, ip)
 	}
 	rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout)
@@ -951,7 +1040,7 @@ func (c *Container) gc(now time.Time) {
 	seenNamespaces := map[string]bool{}
 	fdMap := map[uint64]struct{}{}
 	for _, p := range c.processes {
-		if seenNamespaces[p.NetNsId] {
+		if seenNamespaces[p.NetNsId()] {
 			continue
 		}
 		sockets, err := proc.GetSockets(p.Pid)
@@ -973,8 +1062,7 @@ func (c *Container) gc(now time.Time) {
 				establishedDst[s.DAddr] = struct{}{}
 			}
 		}
-
-		seenNamespaces[p.NetNsId] = true
+		seenNamespaces[p.NetNsId()] = true
 	}
 
 	for ns := range c.ipsByNs {
@@ -1018,11 +1106,6 @@ func (c *Container) gc(now time.Time) {
 					delete(c.connectsSuccessful, d)
 				}
 			}
-			for d := range c.retransmits {
-				if d.src == dst {
-					delete(c.retransmits, d)
-				}
-			}
 			c.l7Stats.delete(dst)
 		}
 	}
@@ -1065,6 +1148,7 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I
 		for pid := range c.processes {
 			fds, err := proc.ReadFds(pid)
 			if err != nil {
+				klog.Warningln(err)
 				continue
 			}
 			for _, fd := range fds {
@@ -1148,12 +1232,9 @@ func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) erro
 		p.openSslUprobesChecked = true
 	}
 	if !p.goTlsUprobesChecked {
-		codeType := c.GetCodeTypeFromCache(pid)
-		goProbes, err := tracer.AttachGoTlsUprobes(pid, &c.AppInfo, uint16(codeType))
-		if err != nil {
-			return err
-		}
-		p.uprobes = append(p.uprobes, goProbes...)
+		uprobes, isGolangApp := tracer.AttachGoTlsUprobes(pid)
+		p.isGolangApp = isGolangApp
+		p.uprobes = append(p.uprobes, uprobes...)
 		p.goTlsUprobesChecked = true
 	}
 	return nil

+ 19 - 3
containers/crio.go

@@ -8,6 +8,7 @@ import (
 	"net"
 	"net/http"
 	"os"
+	"strings"
 	"time"
 
 	"github.com/coroot/coroot-node-agent/common"
@@ -20,7 +21,6 @@ const crioTimeout = 30 * time.Second
 
 var (
 	crioClient *http.Client
-	crioSocket = proc.HostPath("/var/run/crio/crio.sock")
 )
 
 type CrioContainerInfo struct {
@@ -37,8 +37,23 @@ type CrioVolume struct {
 }
 
 func CrioInit() error {
-	if _, err := os.Stat(crioSocket); err != nil {
-		return err
+	sockets := []string{
+		"/var/run/crio/crio.sock",
+		"/run/crio/crio.sock",
+	}
+	var crioSocket string
+	var err error
+	for _, socket := range sockets {
+		socketHostPath := proc.HostPath(socket)
+		if _, err := os.Stat(socketHostPath); err == nil {
+			crioSocket = socketHostPath
+			break
+		}
+	}
+	if err != nil {
+		return fmt.Errorf("couldn't connect to CRI-O through the following UNIX sockets: [%s]: %s",
+			strings.Join(sockets, ","), err,
+		)
 	}
 	klog.Infoln("cri-o socket:", crioSocket)
 
@@ -50,6 +65,7 @@ func CrioInit() error {
 			DisableCompression: true,
 		},
 	}
+
 	return nil
 }
 

+ 12 - 0
containers/dockerd.go

@@ -56,6 +56,7 @@ func DockerdInspect(containerID string) (*ContainerMetadata, error) {
 		volumes:     map[string]string{},
 		hostListens: map[string][]netaddr.IPPort{},
 		networks:    map[string]ContainerNetwork{},
+		env:         map[string]string{},
 	}
 	for _, m := range c.Mounts {
 		res.volumes[m.Destination] = common.ParseKubernetesVolumeSource(m.Source)
@@ -92,6 +93,17 @@ func DockerdInspect(containerID string) (*ContainerMetadata, error) {
 			}
 		}
 	}
+	if c.Config != nil {
+		for _, value := range c.Config.Env {
+			idx := strings.Index(value, "=")
+			if idx < 0 {
+				continue
+			}
+			k := value[:idx]
+			v := value[idx+1:]
+			res.env[k] = v
+		}
+	}
 	return res, nil
 }
 

+ 7 - 2
containers/dotnet.go

@@ -53,6 +53,7 @@ func (m *dotNetMetric) units() string {
 
 type DotNetMonitor struct {
 	pid            uint32
+	appName        string
 	cancel         context.CancelFunc
 	lastUpdate     time.Time
 	runtimeVersion string
@@ -74,8 +75,8 @@ func NewDotNetMonitor(ctx context.Context, pid uint32, appName string) *DotNetMo
 	constLabels := prometheus.Labels{"application": appName}
 
 	m := &DotNetMonitor{
-		pid: pid,
-
+		pid:                           pid,
+		appName:                       appName,
 		info:                          newGaugeVec("container_dotnet_info", "Meta information about the Common Language Runtime (CLR)", constLabels, "runtime_version"),
 		memoryAllocatedBytes:          newCounter("container_dotnet_memory_allocated_bytes_total", "The number of bytes allocated", constLabels),
 		exceptionCount:                newGauge("container_dotnet_exceptions_total", "The number of exceptions that have occurred", constLabels),
@@ -91,6 +92,10 @@ func NewDotNetMonitor(ctx context.Context, pid uint32, appName string) *DotNetMo
 	return m
 }
 
+func (m *DotNetMonitor) AppName() string {
+	return m.appName
+}
+
 func (m *DotNetMonitor) Collect(ch chan<- prometheus.Metric) {
 	if m.lastUpdate.Before(time.Now().Add(-2 * dotNetEventInterval)) {
 		return

+ 27 - 15
containers/metrics.go

@@ -28,12 +28,15 @@ var metrics = struct {
 	DiskWriteOps   *prometheus.Desc
 	DiskWriteBytes *prometheus.Desc
 
-	NetListenInfo         *prometheus.Desc
-	NetConnectsSuccessful *prometheus.Desc
-	NetConnectsFailed     *prometheus.Desc
-	NetConnectionsActive  *prometheus.Desc
-	NetRetransmits        *prometheus.Desc
-	NetLatency            *prometheus.Desc
+	NetListenInfo            *prometheus.Desc
+	NetConnectionsSuccessful *prometheus.Desc
+	NetConnectionsTotalTime  *prometheus.Desc
+	NetConnectionsFailed     *prometheus.Desc
+	NetConnectionsActive     *prometheus.Desc
+	NetRetransmits           *prometheus.Desc
+	NetLatency               *prometheus.Desc
+	NetBytesSent             *prometheus.Desc
+	NetBytesReceived         *prometheus.Desc
 
 	LogMessages *prometheus.Desc
 
@@ -45,9 +48,12 @@ var metrics = struct {
 	JvmGCTime            *prometheus.Desc
 	JvmSafepointTime     *prometheus.Desc
 	JvmSafepointSyncTime *prometheus.Desc
-	Ip2Fqdn              *prometheus.Desc
+
+	PythonThreadLockWaitTime *prometheus.Desc
+
+	Ip2Fqdn *prometheus.Desc
 }{
-	ContainerInfo: metric("container_info", "Meta information about the container", "image"),
+	ContainerInfo: metric("container_info", "Meta information about the container", "image", "systemd_triggered_by"),
 
 	Restarts: metric("container_restarts_total", "Number of times the container was restarted"),
 
@@ -70,12 +76,15 @@ var metrics = struct {
 	DiskWriteOps:   metric("container_resources_disk_writes_total", "Total number of writes completed successfully by the container", "mount_point", "device", "volume"),
 	DiskWriteBytes: metric("container_resources_disk_written_bytes_total", "Total number of bytes written to the disk by the container", "mount_point", "device", "volume"),
 
-	NetListenInfo:         metric("container_net_tcp_listen_info", "Listen address of the container", "listen_addr"),
-	NetConnectsSuccessful: metric("container_net_tcp_successful_connects_total", "Total number of successful TCP connects", "destination", "actual_destination"),
-	NetConnectsFailed:     metric("container_net_tcp_failed_connects_total", "Total number of failed TCP connects", "destination"),
-	NetConnectionsActive:  metric("container_net_tcp_active_connections", "Number of active outbound connections used by the container", "destination", "actual_destination"),
-	NetRetransmits:        metric("container_net_tcp_retransmits_total", "Total number of retransmitted TCP segments", "destination", "actual_destination"),
-	NetLatency:            metric("container_net_latency_seconds", "Round-trip time between the container and a remote IP", "destination_ip"),
+	NetListenInfo:            metric("container_net_tcp_listen_info", "Listen address of the container", "listen_addr", "proxy"),
+	NetConnectionsSuccessful: metric("container_net_tcp_successful_connects_total", "Total number of successful TCP connects", "destination", "actual_destination"),
+	NetConnectionsTotalTime:  metric("container_net_tcp_connection_time_seconds_total", "Time spent on TCP connections", "destination", "actual_destination"),
+	NetConnectionsFailed:     metric("container_net_tcp_failed_connects_total", "Total number of failed TCP connects", "destination"),
+	NetConnectionsActive:     metric("container_net_tcp_active_connections", "Number of active outbound connections used by the container", "destination", "actual_destination"),
+	NetRetransmits:           metric("container_net_tcp_retransmits_total", "Total number of retransmitted TCP segments", "destination", "actual_destination"),
+	NetLatency:               metric("container_net_latency_seconds", "Round-trip time between the container and a remote IP", "destination_ip"),
+	NetBytesSent:             metric("container_net_tcp_bytes_sent_total", "Total number of bytes sent to the peer", "destination", "actual_destination"),
+	NetBytesReceived:         metric("container_net_tcp_bytes_received_total", "Total number of bytes received from the peer", "destination", "actual_destination"),
 
 	LogMessages: metric("container_log_messages_total", "Number of messages grouped by the automatically extracted repeated pattern", "source", "level", "pattern_hash", "sample"),
 
@@ -87,7 +96,10 @@ var metrics = struct {
 	JvmGCTime:            metric("container_jvm_gc_time_seconds", "Time spent in the given JVM garbage collector in seconds", "jvm", "gc"),
 	JvmSafepointTime:     metric("container_jvm_safepoint_time_seconds", "Time the application has been stopped for safepoint operations in seconds", "jvm"),
 	JvmSafepointSyncTime: metric("container_jvm_safepoint_sync_time_seconds", "Time spent getting to safepoints in seconds", "jvm"),
-	Ip2Fqdn:              metric("ip_to_fqdn", "Mapping IP addresses to FQDNs based on DNS requests initiated by containers", "ip", "fqdn"),
+
+	Ip2Fqdn: metric("ip_to_fqdn", "Mapping IP addresses to FQDNs based on DNS requests initiated by containers", "ip", "fqdn"),
+
+	PythonThreadLockWaitTime: metric("container_python_thread_lock_wait_time_seconds", "Time spent waiting acquiring GIL in seconds"),
 }
 
 var (

+ 47 - 14
containers/process.go

@@ -1,27 +1,30 @@
 package containers
 
 import (
+	"bytes"
 	"context"
 	. "github.com/coroot/coroot-node-agent/utils/modelse"
 	"os"
 	"time"
 
-	"github.com/jpillora/backoff"
-
 	"github.com/cilium/ebpf/link"
+	"github.com/coroot/coroot-node-agent/ebpftracer"
 	"github.com/coroot/coroot-node-agent/proc"
+	"github.com/jpillora/backoff"
 	"github.com/mdlayher/taskstats"
 )
 
 type Process struct {
 	Pid       uint32
 	StartedAt time.Time
-	NetNsId   string
+
+	netNsId string
 
 	ctx        context.Context
 	cancelFunc context.CancelFunc
 
 	dotNetMonitor *DotNetMonitor
+	isGolangApp   bool
 
 	uprobes               []link.Link
 	goTlsUprobesChecked   bool
@@ -31,25 +34,33 @@ type Process struct {
 
 	codeType CodeType
 	cmdline  string
+	pythonGilChecked      bool
 }
 
-func NewProcess(pid uint32, stats *taskstats.Stats) *Process {
-	ns, err := proc.GetNetNs(pid)
-	if err != nil {
-		return nil
-	}
-	defer ns.Close()
-	p := &Process{Pid: pid, StartedAt: stats.BeginTime, NetNsId: ns.UniqueId()}
+func NewProcess(pid uint32, stats *taskstats.Stats, tracer *ebpftracer.Tracer) *Process {
+	p := &Process{Pid: pid, StartedAt: stats.BeginTime}
 	p.ctx, p.cancelFunc = context.WithCancel(context.Background())
-	go p.instrument()
+	go p.instrument(tracer)
 	return p
 }
 
+func (p *Process) NetNsId() string {
+	if p.netNsId == "" {
+		ns, err := proc.GetNetNs(p.Pid)
+		if err != nil {
+			return ""
+		}
+		p.netNsId = ns.UniqueId()
+		_ = ns.Close()
+	}
+	return p.netNsId
+}
+
 func (p *Process) isHostNs() bool {
-	return p.NetNsId == hostNetNsId
+	return p.NetNsId() == hostNetNsId
 }
 
-func (p *Process) instrument() {
+func (p *Process) instrument(tracer *ebpftracer.Tracer) {
 	b := backoff.Backoff{Factor: 2, Min: time.Second, Max: time.Minute}
 	for {
 		select {
@@ -61,18 +72,40 @@ func (p *Process) instrument() {
 				return
 			}
 			if dest != "/" {
+				p.instrumentPython(tracer)
 				if dotNetAppName, err := dotNetApp(p.Pid); err == nil {
 					if dotNetAppName != "" {
 						p.dotNetMonitor = NewDotNetMonitor(p.ctx, p.Pid, dotNetAppName)
 					}
-					return
 				}
+				return
 			}
 			time.Sleep(b.Duration())
 		}
 	}
 }
 
+func (p *Process) instrumentPython(tracer *ebpftracer.Tracer) {
+	if p.pythonGilChecked {
+		return
+	}
+	p.pythonGilChecked = true
+	cmdline := proc.GetCmdline(p.Pid)
+	if len(cmdline) == 0 {
+		return
+	}
+	parts := bytes.Split(cmdline, []byte{0})
+	cmd := parts[0]
+	if len(cmd) == 0 {
+		return
+	}
+	cmd = bytes.TrimSuffix(bytes.Fields(cmd)[0], []byte{':'})
+	if !pythonCmd.Match(cmd) {
+		return
+	}
+	p.uprobes = append(p.uprobes, tracer.AttachPythonThreadLockProbes(p.Pid)...)
+}
+
 func (p *Process) Close() {
 	p.cancelFunc()
 	for _, u := range p.uprobes {

+ 94 - 17
containers/registry.go

@@ -11,6 +11,7 @@ import (
 	log "github.com/sirupsen/logrus"
 	"os"
 	"regexp"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -27,11 +28,15 @@ import (
 	"inet.af/netaddr"
 )
 
+const MinTrafficStatsUpdateInterval = 5 * time.Second
+
 var (
-	selfNetNs         = netns.None()
-	hostNetNsId       = netns.None().UniqueId()
-	agentPid          = uint32(os.Getpid())
-	containerIdRegexp = regexp.MustCompile(`[a-z0-9]{64}`)
+	selfNetNs                = netns.None()
+	hostNetNsId              = netns.None().UniqueId()
+	agentPid                 = uint32(os.Getpid())
+	containerIdRegexp        = regexp.MustCompile(`[a-z0-9]{64}`)
+	cronjobPodName           = regexp.MustCompile(`([a-z0-9-]+)-([0-9]{8})-[bcdfghjklmnpqrstvwxz2456789]{5}`)
+	cronjobPodScheduleWindow = 7 * 24 * time.Hour
 )
 
 type ProcessInfo struct {
@@ -59,6 +64,9 @@ type Registry struct {
 	whiteListRules       WhiteListMap
 	whiteLastUpdatedTime int
 	connServer           ServerWorker
+	trafficStatsLastUpdated time.Time
+	trafficStatsLock        sync.Mutex
+	trafficStatsUpdateCh    chan *TrafficStatsUpdate
 }
 
 var (
@@ -123,6 +131,7 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh
 
 		tracer:         ebpftracer.NewTracer(kernelVersion, *flags.DisableL7Tracing, *flags.DisableE2ETracing, *flags.DisableStackTracing),
 		whiteListRules: make(WhiteListMap),
+		trafficStatsUpdateCh: make(chan *TrafficStatsUpdate),
 	}
 	// 初始化软负载集群节点
 	proxyClient, clientErr := NewProxyClient(*flags.ConfigEndpoint, false)
@@ -254,12 +263,12 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 
 			activeIPs := map[netaddr.IP]struct{}{}
 			for id, c := range r.containersById {
-				if !c.Dead(now) {
-					continue
-				}
 				for dst := range c.connectLastAttempt {
 					activeIPs[dst.IP()] = struct{}{}
 				}
+				if !c.Dead(now) {
+					continue
+				}
 				klog.Infoln("deleting dead container:", id)
 				for cg, cc := range r.containersByCgroupId {
 					if cc == c {
@@ -288,6 +297,13 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 				}
 			}
 			r.ip2fqdnLock.Unlock()
+		case u := <-r.trafficStatsUpdateCh:
+			if u == nil {
+				continue
+			}
+			if c := r.containersByPid[u.Pid]; c != nil {
+				c.updateTrafficStats(u)
+			}
 		case e, more := <-ch:
 			if e.Pid == uint32(os.Getpid()) {
 				continue
@@ -374,23 +390,28 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 				if c := r.containersByPid[e.Pid]; c != nil {
 					c.onListenClose(e.Pid, e.SrcAddr)
 				}
+
+			case ebpftracer.EventTypeConnectionOpen:
+				if c := r.getOrCreateContainer(e.Pid); c != nil {
+					c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false, e.Duration)
+					c.attachTlsUprobes(r.tracer, e.Pid)
+				} else {
+					klog.Infoln("TCP connection from unknown container", e)
+				}
 			case ebpftracer.EventTypeConnectionError:
 				if c := r.getOrCreateContainer(e.Pid); c != nil {
-					c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, 0, true)
+					c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, 0, true, e.Duration)
 				} else {
 					klog.Infoln("TCP connection error from unknown container", e)
 				}
 			case ebpftracer.EventTypeConnectionClose:
-				srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
-				for _, c := range r.containersById {
-					if c.onConnectionClose(srcDst) {
-						break
-					}
+				if c := r.containersByPid[e.Pid]; c != nil {
+					c.onConnectionClose(e)
 				}
 			case ebpftracer.EventTypeTCPRetransmit:
 				srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
 				for _, c := range r.containersById {
-					if c.onRetransmit(srcDst) {
+					if c.onRetransmission(srcDst) {
 						break
 					}
 				}
@@ -427,7 +448,10 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 					// fmt.Printf("e.EventTypeFunEnt ErrorError: TraceId:%d, Pid:%d, Location:%d, Goid:%d, TimeNs:%d, Ip:%X, CallerIp:%x, Bp:%x, CallerBp:%x", e.StackEvent.TraceId, e.StackEvent.Pid, e.StackEvent.Location, e.StackEvent.Goid, e.StackEvent.TimeNsStart, e.StackEvent.Ip, e.StackEvent.CallerIp, e.StackEvent.Bp, e.StackEvent.CallerBp)
 					// fmt.Printf("e.EventTypeFunEnt ErrorError: TraceId:%x, FPid:%x, Nid:%x, Level:%d\n", e.StackEvent.Fpid, e.StackEvent.Nid, e.StackEvent.Level)
 				}
-			}
+			case ebpftracer.EventTypePythonThreadLock:
+				if c := r.containersByPid[e.Pid]; c != nil {
+					c.pythonThreadLockWaitTime += e.Duration
+				}
 		}
 	}
 }
@@ -495,8 +519,7 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
 		r.containersByCgroupId[cgId] = c
 		return c
 	}
-
-	c, err := NewContainer(id, cg, md, r.hostConntrack, pid)
+	c, err := NewContainer(id, cg, md, r.hostConntrack, pid, r)
 	if err != nil {
 		klog.Warningf("failed to create container pid=%d cg=%s id=%s: %s", pid, cg.Id, id, err)
 		return nil
@@ -521,6 +544,31 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
 	return c
 }
 
+func (r *Registry) updateTrafficStatsIfNecessary() {
+	r.trafficStatsLock.Lock()
+	defer r.trafficStatsLock.Unlock()
+
+	if time.Now().Sub(r.trafficStatsLastUpdated) < MinTrafficStatsUpdateInterval {
+		return
+	}
+	iter := r.tracer.ActiveConnectionsIterator()
+	cid := ebpftracer.ConnectionId{}
+	stats := ebpftracer.Connection{}
+	for iter.Next(&cid, &stats) {
+		r.trafficStatsUpdateCh <- &TrafficStatsUpdate{
+			Pid:           cid.PID,
+			FD:            cid.FD,
+			BytesSent:     stats.BytesSent,
+			BytesReceived: stats.BytesReceived,
+		}
+	}
+	if err := iter.Err(); err != nil {
+		klog.Warningln(err)
+	}
+	r.trafficStatsUpdateCh <- nil
+	r.trafficStatsLastUpdated = time.Now()
+}
+
 func calcId(cg *cgroup.Cgroup, md *ContainerMetadata, pid uint32) (ContainerID, map[string]string) {
 	extensionTag := map[string]string{Namespace: "", Workload: "", PodName: "", ProcessName: ""}
 	if cg.ContainerType == cgroup.ContainerTypeSystemdService {
@@ -556,6 +604,14 @@ func calcId(cg *cgroup.Cgroup, md *ContainerMetadata, pid uint32) (ContainerID,
 		extensionTag[Workload] = ""
 		extensionTag[PodName] = pod
 		extensionTag[ProcessName] = name
+		if g := cronjobPodName.FindStringSubmatch(pod); len(g) == 3 {
+			now := time.Now()
+			tsMiniutes, _ := strconv.ParseUint(g[2], 10, 64)
+			scheduledAt := time.Unix(int64(tsMiniutes)*60, 0)
+			if scheduledAt.After(now.Add(-cronjobPodScheduleWindow)) && scheduledAt.Before(now.Add(cronjobPodScheduleWindow)) {
+				return ContainerID(fmt.Sprintf("/k8s-cronjob/%s/%s/%s", namespace, g[1], name)), extensionTag
+			}
+		}
 		return ContainerID(fmt.Sprintf("/k8s/%s/%s/%s", namespace, pod, name)), extensionTag
 	}
 	if taskNameParts := strings.SplitN(md.labels["com.docker.swarm.task.name"], ".", 3); len(taskNameParts) == 3 {
@@ -569,6 +625,16 @@ func calcId(cg *cgroup.Cgroup, md *ContainerMetadata, pid uint32) (ContainerID,
 		}
 		return ContainerID(fmt.Sprintf("/swarm/%s/%s/%s", namespace, service, taskNameParts[1])), extensionTag
 	}
+	if md.env != nil {
+		allocId := md.env["NOMAD_ALLOC_ID"]
+		group := md.env["NOMAD_GROUP_NAME"]
+		job := md.env["NOMAD_JOB_NAME"]
+		namespace := md.env["NOMAD_NAMESPACE"]
+		task := md.env["NOMAD_TASK_NAME"]
+		if allocId != "" && group != "" && job != "" && namespace != "" && task != "" {
+			return ContainerID(fmt.Sprintf("/nomad/%s/%s/%s/%s/%s", namespace, job, group, allocId, task))
+		}
+	}
 	if md.name == "" { // should be "pure" dockerd container here
 		klog.Warningln("empty dockerd container name for:", cg.ContainerId)
 		return "", extensionTag
@@ -578,6 +644,10 @@ func calcId(cg *cgroup.Cgroup, md *ContainerMetadata, pid uint32) (ContainerID,
 
 func getContainerMetadata(cg *cgroup.Cgroup) (*ContainerMetadata, error) {
 	switch cg.ContainerType {
+	case cgroup.ContainerTypeSystemdService:
+		md := &ContainerMetadata{}
+		md.systemdTriggeredBy = SystemdTriggeredBy(cg.ContainerId)
+		return md, nil
 	case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd, cgroup.ContainerTypeSandbox, cgroup.ContainerTypeCrio:
 	default:
 		return &ContainerMetadata{}, nil
@@ -606,3 +676,10 @@ func getContainerMetadata(cg *cgroup.Cgroup) (*ContainerMetadata, error) {
 	}
 	return nil, fmt.Errorf("failed to interact with dockerd (%s) or with containerd (%s)", dockerdErr, containerdErr)
 }
+
+type TrafficStatsUpdate struct {
+	Pid           uint32
+	FD            uint64
+	BytesSent     uint64
+	BytesReceived uint64
+}

+ 56 - 0
containers/systemd.go

@@ -0,0 +1,56 @@
+package containers
+
+import (
+	"context"
+	"os"
+	"strconv"
+	"strings"
+	"time"
+
+	"github.com/coroot/coroot-node-agent/proc"
+
+	"github.com/coreos/go-systemd/v22/dbus"
+	gdbus "github.com/godbus/dbus/v5"
+
+	"k8s.io/klog/v2"
+)
+
+var (
+	dbusConn    *dbus.Conn
+	dbusTimeout = time.Second
+)
+
+func init() {
+	var err error
+	dbusConn, err = dbus.NewConnection(func() (*gdbus.Conn, error) {
+		c, err := gdbus.Dial("unix:path=" + proc.HostPath("/run/systemd/private"))
+		if err != nil {
+			return nil, err
+		}
+		methods := []gdbus.Auth{gdbus.AuthExternal(strconv.Itoa(os.Getuid()))}
+		if err = c.Auth(methods); err != nil {
+			dbusConn.Close()
+			return nil, err
+		}
+		return c, nil
+	})
+	if err != nil {
+		klog.Warningln("failed to connect to systemd bus:", err)
+	}
+}
+
+func SystemdTriggeredBy(id string) string {
+	if dbusConn == nil {
+		return ""
+	}
+	ctx, cancel := context.WithTimeout(context.Background(), dbusTimeout)
+	defer cancel()
+	parts := strings.Split(id, "/")
+	unit := parts[len(parts)-1]
+	if prop, _ := dbusConn.GetUnitPropertyContext(ctx, unit, "TriggeredBy"); prop != nil {
+		if values, _ := prop.Value.Value().([]string); len(values) > 0 {
+			return values[0]
+		}
+	}
+	return ""
+}

+ 13 - 9
ebpftracer/ebpf/ebpf.c

@@ -14,15 +14,16 @@
 //#include <bpf/bpf_endian.h>
 #include "common/bpf/bpf_endian.h"
 
-#define EVENT_TYPE_PROCESS_START	1
-#define EVENT_TYPE_PROCESS_EXIT		2
-#define EVENT_TYPE_CONNECTION_OPEN	3
-#define EVENT_TYPE_CONNECTION_CLOSE	4
-#define EVENT_TYPE_CONNECTION_ERROR	5
-#define EVENT_TYPE_LISTEN_OPEN		6
-#define EVENT_TYPE_LISTEN_CLOSE 	7
-#define EVENT_TYPE_FILE_OPEN		8
-#define EVENT_TYPE_TCP_RETRANSMIT	9
+#define EVENT_TYPE_PROCESS_START	    1
+#define EVENT_TYPE_PROCESS_EXIT		    2
+#define EVENT_TYPE_CONNECTION_OPEN	    3
+#define EVENT_TYPE_CONNECTION_CLOSE	    4
+#define EVENT_TYPE_CONNECTION_ERROR	    5
+#define EVENT_TYPE_LISTEN_OPEN		    6
+#define EVENT_TYPE_LISTEN_CLOSE 	    7
+#define EVENT_TYPE_FILE_OPEN		    8
+#define EVENT_TYPE_TCP_RETRANSMIT	    9
+#define EVENT_TYPE_PYTHON_THREAD_LOCK	11
 
 #define EVENT_REASON_OOM_KILL		1
 
@@ -64,5 +65,8 @@
 #include "utrace/netcore/net/server.probe.bpf.c"
 #include "utrace/netcore/net/client.probe.bpf.c"
 #endif
+#include "l7/gotls.c"
+#include "l7/openssl.c"
+#include "python.c"
 
 char _license[] SEC("license") = "GPL";

+ 0 - 3
ebpftracer/ebpf/l7/dns.c

@@ -45,9 +45,6 @@ int is_dns_response(char *buf, __u64 buf_size, __s16 *stream_id, __u32 *status)
     if (h.bits0 & DNS_OPCODE) {
        return 0;
     }
-    if (!(h.bits1 & DNS_Z)) {
-        return 0;
-    }
     h.qdcount = bpf_ntohs(h.qdcount);
     if (h.qdcount != 1) {
         return 0;

+ 1 - 1
ebpftracer/ebpf/l7/gotls.c

@@ -60,7 +60,7 @@ int go_crypto_tls_read_enter(struct pt_regs *ctx) {
     __u64 goroutine_id = GOROUTINE(ctx);
     __u64 pid = pid_tgid >> 32;
     __u64 id = pid << 32 | goroutine_id | IS_TLS_READ_ID;
-    return trace_enter_read(id, fd, buf_ptr, 0, 0);
+    return trace_enter_read(id, pid, fd, buf_ptr, 0, 0);
 }
 
 SEC("uprobe/go_crypto_tls_read_exit")

+ 80 - 52
ebpftracer/ebpf/l7/l7.c

@@ -123,7 +123,7 @@ struct read_args {
 };
 
 struct {
-    __uint(type, BPF_MAP_TYPE_HASH);
+    __uint(type, BPF_MAP_TYPE_LRU_HASH);
     __uint(key_size, sizeof(__u64));
     __uint(value_size, sizeof(struct read_args));
     __uint(max_entries, 10240);
@@ -230,22 +230,10 @@ struct l7_user_msghdr {
 };
 
 static inline __attribute__((__always_inline__))
-void send_event(void *ctx, struct l7_event *e, __u32 pid, __u64 fd) {
-    struct sk_info sk = {};
-    sk.pid = pid;
-    sk.fd = fd;
-    __u64 *timestamp = bpf_map_lookup_elem(&connection_timestamps, &sk);
-    if (timestamp) {
-        if (*timestamp == 0) {
-//	        cw_bpf_debug("timestamp=0");
-            return;
-        }
-        e->connection_timestamp = *timestamp;
-    } else {
-        e->connection_timestamp = 0;
-    }
-    e->fd = fd;
-    e->pid = pid;
+void send_event(void *ctx, struct l7_event *e, struct connection_id cid, struct connection *conn) {
+    e->connection_timestamp = conn->timestamp;
+    e->fd = cid.fd;
+    e->pid = cid.pid;
     long error = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
 	if (error ==0){
 		cw_add_event_count(e->trace_id);
@@ -253,8 +241,8 @@ void send_event(void *ctx, struct l7_event *e, __u32 pid, __u64 fd) {
 }
 
 static inline __attribute__((__always_inline__))
-__u64 read_iovec(char *l7_iovec, __u64 iovlen, __u64 ret, char *buf) {
-    struct l7_iovec iov = {};
+__u64 read_iovec(char *iovec, __u64 iovlen, __u64 ret, char *buf, __u64 *total_size) {
+    struct iovec iov = {};
     __u64 max = (ret) ? MIN(ret, MAX_PAYLOAD_SIZE) : MAX_PAYLOAD_SIZE;
     __u64 offset = 0;
     __u64 size = 0;
@@ -269,15 +257,15 @@ __u64 read_iovec(char *l7_iovec, __u64 iovlen, __u64 ret, char *buf) {
         if (iov.size <= 0) {
             continue;
         }
-        size = MIN(iov.size, max-offset);
-        TRUNCATE_PAYLOAD_SIZE(size);
-        TRUNCATE_PAYLOAD_SIZE(offset);
-        if (bpf_probe_read(buf + offset, size, (void *)iov.buf)) {
-            return 0;
-        }
-        offset += size;
-        if (offset >= max) {
-            break;
+        *total_size += iov.size;
+        if (offset < max) {
+            size = MIN(iov.size, max-offset);
+            TRUNCATE_PAYLOAD_SIZE(size);
+            TRUNCATE_PAYLOAD_SIZE(offset);
+            if (bpf_probe_read(buf + offset, size, (void *)iov.buf)) {
+                return 0;
+            }
+            offset += size;
         }
     }
     return offset;
@@ -342,9 +330,18 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
     __u32 zero = 0;
     __u32 pid, tid;
     __u32 http_status ;
-
     pid = id >> 32;
     tid =  (__u32)id;
+    struct connection_id cid = {};
+    cid.pid = pid;
+    cid.fd = fd;
+    __u64 total_size = size;
+
+    struct connection *conn = bpf_map_lookup_elem(&active_connections, &cid);
+    if (!conn) {
+        return 0;
+    }
+
     if (load_filter_pid() != 0 && pid != load_filter_pid()) {
         return 0;
     }
@@ -354,12 +351,17 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         if (!payload) {
             return 0;
         }
-        size = read_iovec(buf, iovlen, 0, payload);
+        total_size = 0;
+        size = read_iovec(buf, iovlen, 0, payload, &total_size);
     }
     if (!size) {
         return 0;
     }
 
+    if (!is_tls) {
+        __sync_fetch_and_add(&conn->bytes_sent, total_size);
+    }
+
     struct l7_request *req = bpf_map_lookup_elem(&l7_request_heap, &zero);
     if (!req) {
         return 0;
@@ -370,8 +372,8 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
     req->ns = 0;
     req->payload_size = size;
     struct l7_request_key k = {};
-    k.pid = id >> 32;
-    k.fd = fd;
+    k.pid = cid.pid;
+    k.fd = cid.fd;
     k.is_tls = is_tls;
     k.stream_id = -1;
 
@@ -494,7 +496,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
             e->method = METHOD_STATEMENT_CLOSE;
             e->payload_size = size;
             COPY_PAYLOAD(e->payload, size, payload);
-            send_event(ctx, e, k.pid, k.fd);
+            send_event(ctx, e, cid, conn);
             return 0;
         }
         req->protocol = PROTOCOL_POSTGRES;
@@ -518,7 +520,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
             e->payload_size = size;
             COPY_PAYLOAD(e->payload, size, payload);
 	        cw_bpf_debug("[Enter][Mysql][Send]:thread_id:%d\n",tid);
-            send_event(ctx, e, k.pid, k.fd);
+            send_event(ctx, e, cid, conn);
             return 0;
         }
         req->protocol = PROTOCOL_MYSQL;
@@ -564,7 +566,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         }
         e->protocol = PROTOCOL_RABBITMQ;
         e->method = METHOD_PRODUCE;
-        send_event(ctx, e, k.pid, k.fd);
+        send_event(ctx, e, cid, conn);
         return 0;
     } else if (nats_method(payload, size) == METHOD_PRODUCE) {
         struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
@@ -573,7 +575,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         }
         e->protocol = PROTOCOL_NATS;
         e->method = METHOD_PRODUCE;
-        send_event(ctx, e, k.pid, k.fd);
+        send_event(ctx, e, cid, conn);
         return 0;
     } else if (is_cassandra_request(payload, size, &k.stream_id)) {
         req->protocol = PROTOCOL_CASSANDRA;
@@ -593,7 +595,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         e->duration = bpf_ktime_get_ns();
         e->payload_size = size;
         COPY_PAYLOAD(e->payload, size, payload);
-        send_event(ctx, e, k.pid, k.fd);
+        send_event(ctx, e, cid, conn);
         return 0;
     } else if (is_dubbo2_request(payload, size)) {
         req->protocol = PROTOCOL_DUBBO2;
@@ -613,7 +615,16 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
 }
 
 static inline __attribute__((__always_inline__))
-int trace_enter_read(__u64 id, __u64 fd, char *buf, __u64 *ret, __u64 iovlen) {
+int trace_enter_read(__u64 id, __u32 pid, __u64 fd, char *buf, __u64 *ret, __u64 iovlen) {
+    struct connection_id cid = {};
+    cid.pid = pid;
+    cid.fd = fd;
+
+    struct connection *conn = bpf_map_lookup_elem(&active_connections, &cid);
+    if (!conn) {
+        return 0;
+    }
+
     struct read_args args = {};
     args.fd = fd;
     args.buf = buf;
@@ -635,10 +646,17 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
     if (!args) {
         return 0;
     }
-
+    struct connection_id cid = {};
+    cid.pid = pid;
+    cid.fd = args->fd;
+    struct connection *conn = bpf_map_lookup_elem(&active_connections, &cid);
+    if (!conn) {
+        bpf_map_delete_elem(&active_reads, &id);
+        return 0;
+    }
     struct l7_request_key k = {};
-    k.pid = pid;
-    k.fd = args->fd;
+    k.pid = cid.pid;
+    k.fd = cid.fd;
     k.is_tls = is_tls;
     k.stream_id = -1;
 
@@ -655,7 +673,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
             return 0;
         }
     }
-
+    __u64 total_size = ret;
     int zero = 0;
     char* payload = args->buf;
     if (args->iovlen) {
@@ -663,12 +681,17 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
         if (!payload) {
             return 0;
         }
-        ret = read_iovec(args->buf, args->iovlen, ret, payload);
+        total_size = 0;
+        ret = read_iovec(args->buf, args->iovlen, ret, payload, &total_size);
         if (!ret) {
             return 0;
         }
     }
 
+    if (!is_tls) {
+        __sync_fetch_and_add(&conn->bytes_received, total_size);
+    }
+
     struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
     if (!e) {
         return 0;
@@ -756,13 +779,13 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
     if (is_rabbitmq_consume(payload, ret)) {
         e->protocol = PROTOCOL_RABBITMQ;
         e->method = METHOD_CONSUME;
-        send_event(ctx, e, k.pid, k.fd);
+        send_event(ctx, e, cid, conn);
         return 0;
     }
     if (nats_method(payload, ret) == METHOD_CONSUME) {
         e->protocol = PROTOCOL_NATS;
         e->method = METHOD_CONSUME;
-        send_event(ctx, e, k.pid, k.fd);
+        send_event(ctx, e, cid, conn);
         return 0;
     }
 
@@ -782,7 +805,8 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
 	        e->duration = e->end_at - req->ns;
             e->payload_size = ret;
             COPY_PAYLOAD(e->payload, ret, payload);
-            send_event(ctx, e, k.pid, k.fd);
+            send_event(ctx, e, cid, conn);
+            bpf_map_delete_elem(&active_l7_requests, &k);
             return 0;
         } else if (is_cassandra_response(payload, ret, &k.stream_id, &e->status)) {
             req = bpf_map_lookup_elem(&active_l7_requests, &k);
@@ -796,7 +820,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
             e->duration = bpf_ktime_get_ns();
             e->payload_size = ret;
             COPY_PAYLOAD(e->payload, ret, payload);
-            send_event(ctx, e, k.pid, k.fd);
+            send_event(ctx, e, cid, conn);
             return 0;
         } else {
 //	        cw_bpf_debug("bb 6:[0x%x] k.pid:%d, k.fd:%d",b[4],k.pid,k.fd);
@@ -1031,7 +1055,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
 	e->end_at = bpf_ktime_get_ns();
 	e->start_at = req->ns;
     e->duration = e->end_at - e->start_at;
-    send_event(ctx, e, k.pid, k.fd);
+    send_event(ctx, e, cid, conn);
     return 0;
 }
 
@@ -1085,13 +1109,15 @@ int sys_enter_sendto(struct trace_event_raw_sys_enter_rw__stub* ctx) {
 SEC("tracepoint/syscalls/sys_enter_read")
 int sys_enter_read(struct trace_event_raw_sys_enter_rw__stub* ctx) {
     __u64 id = bpf_get_current_pid_tgid();
-    return trace_enter_read(id, ctx->fd, ctx->buf, 0, 0);
+    __u32 pid = id >> 32;
+    return trace_enter_read(id, pid, ctx->fd, ctx->buf, 0, 0);
 }
 
 SEC("tracepoint/syscalls/sys_enter_readv")
 int sys_enter_readv(struct trace_event_raw_sys_enter_rw__stub* ctx) {
     __u64 id = bpf_get_current_pid_tgid();
-    return trace_enter_read(id, ctx->fd, ctx->buf, 0, ctx->size);
+    __u32 pid = id >> 32;
+    return trace_enter_read(id, pid, ctx->fd, ctx->buf, 0, ctx->size);
 }
 
 SEC("tracepoint/syscalls/sys_enter_recvmsg")
@@ -1101,13 +1127,15 @@ int sys_enter_recvmsg(struct trace_event_raw_sys_enter_rw__stub* ctx) {
     if (bpf_probe_read(&msghdr, sizeof(msghdr), (void *)ctx->buf)) {
         return 0;
     }
-    return trace_enter_read(id, ctx->fd, (char*)msghdr.msg_iov, 0, msghdr.msg_iovlen);
+    __u32 pid = id >> 32;
+    return trace_enter_read(id, pid, ctx->fd, (char*)msghdr.msg_iov, 0, msghdr.msg_iovlen);
 }
 
 SEC("tracepoint/syscalls/sys_enter_recvfrom")
 int sys_enter_recvfrom(struct trace_event_raw_sys_enter_rw__stub* ctx) {
     __u64 id = bpf_get_current_pid_tgid();
-    return trace_enter_read(id, ctx->fd, ctx->buf, 0, 0);
+    __u32 pid = id >> 32;
+    return trace_enter_read(id, pid, ctx->fd, ctx->buf, 0, 0);
 }
 
 SEC("tracepoint/syscalls/sys_exit_read")

+ 17 - 15
ebpftracer/ebpf/l7/openssl.c

@@ -69,23 +69,25 @@ struct ssl_st {
     return trace_enter_write(ctx, fd, 1, buf_ptr, buf_size, 0); \
 })
 
-#define READ_ENTER(ctx, bio_t)                      \
-({                                                  \
-    __u32 fd = GET_FD(ctx, bio_t, rbio);            \
-    char* buf_ptr = (char*)PT_REGS_PARM2(ctx);      \
-    __u64 pid_tgid = bpf_get_current_pid_tgid();    \
-    __u64 id = pid_tgid | IS_TLS_READ_ID;           \
-    return trace_enter_read(id, fd, buf_ptr, 0, 0); \
+#define READ_ENTER(ctx, bio_t)                           \
+({                                                       \
+    __u32 fd = GET_FD(ctx, bio_t, rbio);                 \
+    char* buf_ptr = (char*)PT_REGS_PARM2(ctx);           \
+    __u64 pid_tgid = bpf_get_current_pid_tgid();         \
+    __u32 pid = pid_tgid >> 32;                          \
+    __u64 id = pid_tgid | IS_TLS_READ_ID;                \
+    return trace_enter_read(id, pid, fd, buf_ptr, 0, 0); \
 })
 
-#define READ_EX_ENTER(ctx, bio_t)                           \
-({                                                          \
-    __u32 fd = GET_FD(ctx, bio_t, rbio);                    \
-    char* buf_ptr = (char*)PT_REGS_PARM2(ctx);              \
-    __u64 pid_tgid = bpf_get_current_pid_tgid();            \
-    __u64 id = pid_tgid | IS_TLS_READ_ID;                   \
-    __u64* ret_ptr = (__u64*)PT_REGS_PARM4(ctx);            \
-    return trace_enter_read(id, fd, buf_ptr, ret_ptr, 0);   \
+#define READ_EX_ENTER(ctx, bio_t)                              \
+({                                                             \
+    __u32 fd = GET_FD(ctx, bio_t, rbio);                       \
+    char* buf_ptr = (char*)PT_REGS_PARM2(ctx);                 \
+    __u64 pid_tgid = bpf_get_current_pid_tgid();               \
+    __u64 id = pid_tgid | IS_TLS_READ_ID;                      \
+    __u32 pid = pid_tgid >> 32;                                \
+    __u64* ret_ptr = (__u64*)PT_REGS_PARM4(ctx);               \
+    return trace_enter_read(id, pid, fd, buf_ptr, ret_ptr, 0); \
 })
 
 SEC("uprobe/openssl_SSL_write_enter")

+ 42 - 0
ebpftracer/ebpf/python.c

@@ -0,0 +1,42 @@
+struct {
+    __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
+    __uint(key_size, sizeof(int));
+    __uint(value_size, sizeof(int));
+} python_thread_events SEC(".maps");
+
+struct {
+    __uint(type, BPF_MAP_TYPE_HASH);
+    __uint(key_size, sizeof(__u64));
+    __uint(value_size, sizeof(__u64));
+    __uint(max_entries, 10240);
+} python_thread_locks SEC(".maps");
+
+SEC("uprobe/pthread_cond_timedwait_enter")
+int pthread_cond_timedwait_enter(struct pt_regs *ctx) {
+    __u64 pid_tgid = bpf_get_current_pid_tgid();
+    __u64 timestamp = bpf_ktime_get_ns();
+    bpf_map_update_elem(&python_thread_locks, &pid_tgid, &timestamp, BPF_ANY);
+    return 0;
+}
+
+struct python_thread_event {
+    __u32 type;
+    __u32 pid;
+    __u64 duration;
+};
+
+SEC("uprobe/pthread_cond_timedwait_exit")
+int pthread_cond_timedwait_exit(struct pt_regs *ctx) {
+    __u64 pid_tgid = bpf_get_current_pid_tgid();
+    __u64 *timestamp = bpf_map_lookup_elem(&python_thread_locks, &pid_tgid);
+    if (!timestamp) {
+        return 0;
+    }
+    struct python_thread_event e = {
+        .type = EVENT_TYPE_PYTHON_THREAD_LOCK,
+        .pid = pid_tgid >> 32,
+        .duration = bpf_ktime_get_ns()-*timestamp,
+    };
+    bpf_perf_event_output(ctx, &python_thread_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
+    return 0;
+}

+ 80 - 49
ebpftracer/ebpf/tcp/state.c

@@ -1,12 +1,16 @@
 #ifndef IPPROTO_TCP
 #define IPPROTO_TCP 6
 #endif
+#define MAX_CONNECTIONS 1000000
 
 struct tcp_event {
     __u64 fd;
     __u64 timestamp;
+    __u64 duration;
     __u32 type;
     __u32 pid;
+    __u64 bytes_sent;
+    __u64 bytes_received;
     __u16 sport;
     __u16 dport;
     __u8 saddr[16];
@@ -51,23 +55,31 @@ struct {
     __uint(max_entries, 10240);
 } fd_by_pid_tgid SEC(".maps");
 
-struct sk_info {
+struct connection_id {
     __u64 fd;
     __u32 pid;
 };
+
 struct {
-    __uint(type, BPF_MAP_TYPE_HASH);
+    __uint(type, BPF_MAP_TYPE_LRU_HASH);
     __uint(key_size, sizeof(void *));
-    __uint(value_size, sizeof(struct sk_info));
-    __uint(max_entries, 10240);
-} sk_info SEC(".maps");
+    __uint(value_size, sizeof(struct connection_id));
+    __uint(max_entries, MAX_CONNECTIONS);
+} connection_id_by_socket SEC(".maps");
+
+struct connection {
+    __u64 timestamp;
+    __u64 bytes_sent;
+    __u64 bytes_received;
+};
 
 struct {
     __uint(type, BPF_MAP_TYPE_LRU_HASH);
-    __uint(key_size, sizeof(struct sk_info));
-    __uint(value_size, sizeof(__u64));
-    __uint(max_entries, 32768);
-} connection_timestamps SEC(".maps");
+    __uint(key_size, sizeof(struct connection_id));
+    __uint(value_size, sizeof(struct connection));
+    __uint(max_entries, MAX_CONNECTIONS);
+} active_connections SEC(".maps");
+
 
 SEC("tracepoint/sock/inet_sock_set_state")
 int inet_sock_set_state(void *ctx)
@@ -88,40 +100,49 @@ int inet_sock_set_state(void *ctx)
         if (!fdp) {
             return 0;
         }
-        struct sk_info i = {};
-        i.pid = pid;
-        i.fd = *fdp;
+        struct connection_id cid = {};
+        cid.pid = pid;
+        cid.fd = *fdp;
+
+        struct connection conn = {};
+        conn.timestamp = bpf_ktime_get_ns();
+
         bpf_map_delete_elem(&fd_by_pid_tgid, &id);
-        bpf_map_update_elem(&sk_info, &args.skaddr, &i, BPF_ANY);
+        bpf_map_update_elem(&connection_id_by_socket, &args.skaddr, &cid, BPF_ANY);
+        bpf_map_update_elem(&active_connections, &cid, &conn, BPF_ANY);
         return 0;
     }
 
     __u64 fd = 0;
     __u32 type = 0;
     __u64 timestamp = 0;
+    __u64 duration = 0;
     void *map = &tcp_connect_events;
+
+    struct tcp_event e = {};
+
     if (args.oldstate == BPF_TCP_SYN_SENT) {
-        struct sk_info *i = bpf_map_lookup_elem(&sk_info, &args.skaddr);
-        if (!i) {
+        struct connection_id *cid = bpf_map_lookup_elem(&connection_id_by_socket, &args.skaddr);
+        if (!cid) {
+            return 0;
+        }
+        struct connection *conn = bpf_map_lookup_elem(&active_connections, cid);
+        if (!conn) {
             return 0;
         }
         if (args.newstate == BPF_TCP_ESTABLISHED) {
-            timestamp = bpf_ktime_get_ns();
-            struct sk_info k = {};
-            k.pid = i->pid;
-            k.fd = i->fd;
-            bpf_map_update_elem(&connection_timestamps, &k, &timestamp, BPF_ANY);
+            timestamp = conn->timestamp;
             type = EVENT_TYPE_CONNECTION_OPEN;
         } else if (args.newstate == BPF_TCP_CLOSE) {
+            bpf_map_delete_elem(&active_connections, cid);
             type = EVENT_TYPE_CONNECTION_ERROR;
         }
-        pid = i->pid;
-        fd = i->fd;
-        bpf_map_delete_elem(&sk_info, &args.skaddr);
+        duration = bpf_ktime_get_ns() - conn->timestamp;
+        pid = cid->pid;
+        fd = cid->fd;
     }
     if (args.oldstate == BPF_TCP_ESTABLISHED && (args.newstate == BPF_TCP_FIN_WAIT1 || args.newstate == BPF_TCP_CLOSE_WAIT)) {
-        pid = 0;
-        type = EVENT_TYPE_CONNECTION_CLOSE;
+        bpf_map_delete_elem(&connection_id_by_socket, &args.skaddr);
     }
     if (args.oldstate == BPF_TCP_CLOSE && args.newstate == BPF_TCP_LISTEN) {
         type = EVENT_TYPE_LISTEN_OPEN;
@@ -135,9 +156,8 @@ int inet_sock_set_state(void *ctx)
     if (type == 0) {
         return 0;
     }
-
-    struct tcp_event e = {};
     e.type = type;
+    e.duration = duration;
     e.timestamp = timestamp;
     e.pid = pid;
     e.sport = args.sport;
@@ -169,35 +189,46 @@ int sys_enter_connect(void *ctx) {
 }
 
 SEC("tracepoint/syscalls/sys_exit_connect")
-int sys_exit_connect(void *ctx) {
+int sys_exit_connect(struct trace_event_raw_sys_exit__stub* ctx) {
     __u64 id = bpf_get_current_pid_tgid();
+    __u64 *fdp = bpf_map_lookup_elem(&fd_by_pid_tgid, &id);
+    if (!fdp) {
+        return 0;
+    }
+    struct connection_id cid = {};
+    cid.pid = id >> 32;
+    cid.fd = *fdp;
+    struct connection *conn = bpf_map_lookup_elem(&active_connections, &cid);
+    if (!conn && ctx->ret == 0) { // non-TCP connection
+        struct connection conn = {};
+        conn.timestamp = bpf_ktime_get_ns();
+        bpf_map_update_elem(&active_connections, &cid, &conn, BPF_ANY);
+    }
     bpf_map_delete_elem(&fd_by_pid_tgid, &id);
     return 0;
 }
 
-static inline __attribute__((__always_inline__))
-int trace_exit_accept(struct trace_event_raw_sys_exit__stub* ctx) {
-    if (ctx->ret < 0) {
+SEC("tracepoint/syscalls/sys_enter_close")
+int sys_enter_close(void *ctx) {
+    struct trace_event_raw_args_with_fd__stub args = {};
+    if (bpf_probe_read(&args, sizeof(args), ctx) < 0) {
         return 0;
     }
     __u64 id = bpf_get_current_pid_tgid();
-    struct sk_info k = {};
-    k.pid = id >> 32;
-    k.fd = ctx->ret;
-    __u64 invalid_timestamp = 0;
-    bpf_map_update_elem(&connection_timestamps, &k, &invalid_timestamp, BPF_ANY);
+    struct connection_id cid = {};
+    cid.pid = id >> 32;
+    cid.fd = args.fd;
+    struct connection *conn = bpf_map_lookup_elem(&active_connections, &cid);
+    if (conn) {
+        struct tcp_event e = {};
+        e.type = EVENT_TYPE_CONNECTION_CLOSE;
+        e.pid = cid.pid;
+        e.fd = cid.fd;
+        e.bytes_sent = conn->bytes_sent;
+        e.bytes_received = conn->bytes_received;
+        e.timestamp = conn->timestamp;
+        bpf_perf_event_output(ctx, &tcp_connect_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
+        bpf_map_delete_elem(&active_connections, &cid);
+    }
     return 0;
 }
-
-SEC("tracepoint/syscalls/sys_exit_accept")
-int sys_exit_accept(struct trace_event_raw_sys_exit__stub* ctx) {
-    return trace_exit_accept(ctx);
-}
-
-SEC("tracepoint/syscalls/sys_exit_accept4")
-int sys_exit_accept4(struct trace_event_raw_sys_exit__stub* ctx) {
-    return trace_exit_accept(ctx);
-}
-
-
-

+ 5 - 3
ebpftracer/init.go

@@ -23,17 +23,18 @@ func readFds(pids []uint32) (files []file, socks []sock) {
 	for _, pid := range pids {
 		ns, err := proc.GetNetNs(pid)
 		if err != nil {
+			klog.Warningf("failed to get net ns for %d: %s", pid, err)
 			continue
 		}
 		nsId := ns.UniqueId()
 		sockets, ok := nss[nsId]
 		_ = ns.Close()
 		if !ok {
-			sockets = map[string]sock{}
-			nss[nsId] = sockets
 			if ss, err := proc.GetSockets(pid); err != nil {
-				klog.Warningln(err)
+				klog.Warningf("failed to get sockets for %d: %s", pid, err)
 			} else {
+				sockets = map[string]sock{}
+				nss[nsId] = sockets
 				for _, s := range ss {
 					sockets[s.Inode] = sock{Sock: s}
 				}
@@ -42,6 +43,7 @@ func readFds(pids []uint32) (files []file, socks []sock) {
 
 		fds, err := proc.ReadFds(pid)
 		if err != nil {
+			klog.Warningf("failed to read fds for %d: %s", pid, err)
 			continue
 		}
 		for _, fd := range fds {

+ 86 - 0
ebpftracer/python.go

@@ -0,0 +1,86 @@
+package ebpftracer
+
+import (
+	"bufio"
+	"os"
+	"regexp"
+	"strings"
+
+	"github.com/cilium/ebpf/link"
+	"github.com/coroot/coroot-node-agent/proc"
+	"golang.org/x/exp/maps"
+	"k8s.io/klog/v2"
+)
+
+var (
+	libcRegexp = regexp.MustCompile(`libc[\.-]`)
+	muslRegexp = regexp.MustCompile(`musl[\.-]`)
+)
+
+func (t *Tracer) AttachPythonThreadLockProbes(pid uint32) []link.Link {
+	log := func(libPath, msg string, err error) {
+		if err != nil {
+			for _, s := range []string{"no such file or directory", "no such process", "permission denied"} {
+				if strings.HasSuffix(err.Error(), s) {
+					return
+				}
+			}
+			klog.ErrorfDepth(1, "pid=%d lib=%s: %s: %s", pid, libPath, msg, err)
+			return
+		}
+		klog.InfofDepth(1, "pid=%d lib=%s: %s", pid, libPath, msg)
+	}
+
+	var (
+		lastErr error
+		links   []link.Link
+		libPath string
+	)
+
+	for _, libPath = range getPthreadLibs(pid) {
+		exe, err := link.OpenExecutable(libPath)
+		if err != nil {
+			log(libPath, "failed to open executable", err)
+			return nil
+		}
+		var uprobe, uretprobe link.Link
+		uprobe, lastErr = exe.Uprobe("pthread_cond_timedwait", t.uprobes["pthread_cond_timedwait_enter"], nil)
+		if lastErr != nil {
+			continue
+		}
+		links = append(links, uprobe)
+		uretprobe, lastErr = exe.Uretprobe("pthread_cond_timedwait", t.uprobes["pthread_cond_timedwait_exit"], nil)
+		if lastErr != nil {
+			continue
+		}
+		links = append(links, uretprobe)
+		log(libPath, "python uprobes attached", nil)
+		break
+	}
+	if lastErr != nil {
+		log(libPath, "failed to attach uprobe", lastErr)
+	}
+	return links
+}
+
+func getPthreadLibs(pid uint32) []string {
+	f, err := os.Open(proc.Path(pid, "maps"))
+	if err != nil {
+		return nil
+	}
+	defer f.Close()
+	scanner := bufio.NewScanner(f)
+	scanner.Split(bufio.ScanLines)
+	libs := map[string]bool{}
+	for scanner.Scan() {
+		parts := strings.Fields(scanner.Text())
+		if len(parts) <= 5 {
+			continue
+		}
+		libPath := parts[5]
+		if libcRegexp.MatchString(libPath) || muslRegexp.MatchString(libPath) || strings.Contains(libPath, "libpthread") {
+			libs[proc.Path(pid, "root", libPath)] = true
+		}
+	}
+	return maps.Keys(libs)
+}

+ 8 - 3
ebpftracer/tls.go

@@ -86,11 +86,15 @@ func (t *Tracer) AttachOpenSslUprobes(pid uint32) ([]link.Link, error) {
 	}
 	progs := []prog{
 		{symbol: "SSL_write", uprobe: writeEnter},
-		{symbol: "SSL_write_ex", uprobe: writeEnter},
 		{symbol: "SSL_read", uprobe: readEnter},
-		{symbol: "SSL_read_ex", uprobe: readExEnter},
 		{symbol: "SSL_read", uretprobe: readExit},
-		{symbol: "SSL_read_ex", uretprobe: readExit},
+	}
+	if semver.Compare(version, "v1.1.1") >= 0 {
+		progs = append(progs, []prog{
+			{symbol: "SSL_write_ex", uprobe: writeEnter},
+			{symbol: "SSL_read_ex", uprobe: readExEnter},
+			{symbol: "SSL_read_ex", uretprobe: readExit},
+		}...)
 	}
 	for _, p := range progs {
 		if p.uprobe != "" {
@@ -143,6 +147,7 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 		log("failed to read build info", err)
 		return nil, err
 	}
+	isGolangApp = true
 
 	name, err = os.Readlink(path)
 	if err != nil {

+ 84 - 22
ebpftracer/tracer.go

@@ -100,16 +100,23 @@ const (
 	EventReasonOOMKill EventReason = 1
 )
 
+type TrafficStats struct {
+	BytesSent     uint64
+	BytesReceived uint64
+}
+
 type Event struct {
-	Type       EventType
-	Reason     EventReason
-	Pid        uint32
-	SrcAddr    netaddr.IPPort
-	DstAddr    netaddr.IPPort
-	Fd         uint64
-	Timestamp  uint64
-	L7Request  *l7.RequestData
-	StackEvent *StackEvent
+	StackEvent 	*StackEvent
+	Type         EventType
+	Reason       EventReason
+	Pid          uint32
+	SrcAddr      netaddr.IPPort
+	DstAddr      netaddr.IPPort
+	Fd           uint64
+	Timestamp    uint64
+	Duration     time.Duration
+	L7Request    *l7.RequestData
+	TrafficStats *TrafficStats
 }
 
 type perfMapType uint8
@@ -121,6 +128,7 @@ const (
 	perfMapTypeL7Events     perfMapType = 4
 	perfMapTypeSocketEvents perfMapType = 5
 	perfMapTypeEventQueue   perfMapType = 6
+	perfMapTypePythonThreadEvents perfMapType = 7
 )
 
 type Tracer struct {
@@ -203,6 +211,8 @@ func (t *Tracer) init(ch chan<- Event) error {
 		}
 	}
 
+	ebpfConnectionsMap := t.collection.Maps["active_connections"]
+	timestamp := uint64(time.Now().UnixNano())
 	for _, s := range sockets {
 		typ := EventTypeConnectionOpen
 		if s.Listen {
@@ -211,16 +221,40 @@ func (t *Tracer) init(ch chan<- Event) error {
 			continue
 		}
 		ch <- Event{
-			Type:    typ,
-			Pid:     s.pid,
-			Fd:      s.fd,
-			SrcAddr: s.SAddr,
-			DstAddr: s.DAddr,
+			Type:      typ,
+			Pid:       s.pid,
+			Timestamp: timestamp,
+			Fd:        s.fd,
+			SrcAddr:   s.SAddr,
+			DstAddr:   s.DAddr,
+		}
+		if typ == EventTypeConnectionOpen {
+			id := ConnectionId{FD: s.fd, PID: s.pid}
+			conn := Connection{Timestamp: timestamp}
+			if err := ebpfConnectionsMap.Update(id, conn, ebpf.UpdateNoExist); err != nil {
+				klog.Warningln(err)
+			}
 		}
 	}
 	return nil
 }
 
+func (t *Tracer) ActiveConnectionsIterator() *ebpf.MapIterator {
+	return t.collection.Maps["active_connections"].Iterate()
+}
+
+type ConnectionId struct {
+	FD  uint64
+	PID uint32
+	_   uint32
+}
+
+type Connection struct {
+	Timestamp     uint64
+	BytesSent     uint64
+	BytesReceived uint64
+}
+
 type perfMap struct {
 	name                  string
 	perCPUBufferSizePages int
@@ -275,6 +309,7 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 		{name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
 		{name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
 		{name: "event_queue", typ: perfMapTypeEventQueue, perCPUBufferSizePages: 32},
+		{name: "python_thread_events", typ: perfMapTypePythonThreadEvents, perCPUBufferSizePages: 4},
 	}
 	tracer.MapInsert(c)
 	if !t.DisableL7Tracing() {
@@ -381,14 +416,17 @@ type procEvent struct {
 }
 
 type tcpEvent struct {
-	Fd        uint64
-	Timestamp uint64
-	Type      EventType
-	Pid       uint32
-	SPort     uint16
-	DPort     uint16
-	SAddr     [16]byte
-	DAddr     [16]byte
+	Fd            uint64
+	Timestamp     uint64
+	Duration      uint64
+	Type          EventType
+	Pid           uint32
+	BytesSent     uint64
+	BytesReceived uint64
+	SPort         uint16
+	DPort         uint16
+	SAddr         [16]byte
+	DAddr         [16]byte
 }
 
 type fileEvent struct {
@@ -490,6 +528,12 @@ type StackFunEvent struct {
 	Uprobe     *tracer.Uprobe
 }
 
+type pythonThreadEvent struct {
+	Type     EventType
+	Pid      uint32
+	Duration uint64
+}
+
 func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
 	for {
 		rec, err := r.Read()
@@ -688,6 +732,24 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				DstAddr:   ipPort(v.DAddr, v.DPort),
 				Fd:        v.Fd,
 				Timestamp: v.Timestamp,
+				Duration:  time.Duration(v.Duration),
+			}
+			if v.Type == EventTypeConnectionClose {
+				event.TrafficStats = &TrafficStats{
+					BytesSent:     v.BytesSent,
+					BytesReceived: v.BytesReceived,
+				}
+			}
+		case perfMapTypePythonThreadEvents:
+			v := &pythonThreadEvent{}
+			if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
+				klog.Warningln("failed to read msg:", err)
+				continue
+			}
+			event = Event{
+				Type:     v.Type,
+				Pid:      v.Pid,
+				Duration: time.Duration(v.Duration),
 			}
 		case perfMapTypeEventQueue:
 			v := &StackEvent{}

+ 7 - 2
flags/flags.go

@@ -21,8 +21,12 @@ var (
 	DisableStackTracing = kingpin.Flag("disable-stack-tracing", "Disable stack tracing").Default("false").Envar("DISABLE_STACK_TRACING").Bool()
 	DisableE2ETracing   = kingpin.Flag("disable-e2e-tracing", "Disable e2e tracing").Default("false").Envar("DISABLE_E2E_TRACING").Bool()
 
-	ExternalNetworksWhitelist = kingpin.Flag("track-public-network", "Allow track connections to the specified IP networks, all private networks are allowed by default (e.g., Y.Y.Y.Y/mask)").Envar("TRACK_PUBLIC_NETWORK").Strings()
-	EphemeralPortRange        = kingpin.Flag("ephemeral-port-range", "Destination and Listen TCP ports from this range will be skipped").Default("32768-60999").Envar("EPHEMERAL_PORT_RANGE").String()
+	ExternalNetworksWhitelist = kingpin.
+					Flag("track-public-network", "Allow track connections to the specified IP networks, all private networks are allowed by default (e.g., Y.Y.Y.Y/mask)").
+					Envar("TRACK_PUBLIC_NETWORK").
+					Default("0.0.0.0/0").
+					Strings()
+	EphemeralPortRange = kingpin.Flag("ephemeral-port-range", "Destination and Listen TCP ports from this range will be skipped").Default("32768-60999").Envar("EPHEMERAL_PORT_RANGE").String()
 
 	Provider          = kingpin.Flag("provider", "`provider` label for `node_cloud_info` metric").Envar("PROVIDER").String()
 	Region            = kingpin.Flag("region", "`region` label for `node_cloud_info` metric").Envar("REGION").String()
@@ -39,6 +43,7 @@ var (
 	ConfigEndpoint    = kingpin.Flag("config-endpoint", "The URL of the endpoint to send traces to").Envar("CONFIG_ENDPOINT").Default("10.0.16.250:18080").String()
 	LogsEndpoint      = kingpin.Flag("logs-endpoint", "The URL of the endpoint to send logs to").Envar("LOGS_ENDPOINT").URL()
 	ProfilesEndpoint  = kingpin.Flag("profiles-endpoint", "The URL of the endpoint to send profiles to").Envar("PROFILES_ENDPOINT").URL()
+	InsecureSkipVerify = kingpin.Flag("insecure-skip-verify", "whether to skip verifying the certificate or not").Envar("INSECURE_SKIP_VERIFY").Default("false").Bool()
 
 	ScrapeInterval = kingpin.Flag("scrape-interval", "How often to gather metrics from the agent").Default("15s").Envar("SCRAPE_INTERVAL").Duration()
 	WalDir         = kingpin.Flag("wal-dir", "Path to where the agent stores data (e.g. the metrics Write-Ahead Log)").Default("/tmp/coroot-node-agent").Envar("WAL_DIR").String()

+ 3 - 3
go.mod

@@ -10,13 +10,14 @@ require (
 	github.com/containerd/cgroups v1.0.4
 	github.com/containerd/containerd v1.6.26
 	github.com/coreos/go-systemd/v22 v22.5.0
-	github.com/coroot/logparser v1.1.2
+	github.com/coroot/logparser v1.1.5
 	github.com/docker/docker v25.0.0+incompatible
 	github.com/florianl/go-conntrack v0.3.0
 	github.com/go-kit/log v0.2.1
 	github.com/go-logr/logr v1.4.1
 	github.com/go-sql-driver/mysql v1.8.1
 	github.com/gomodule/redigo v1.9.2
+	github.com/godbus/dbus/v5 v5.0.6
 	github.com/grafana/pyroscope/ebpf v0.4.1
 	github.com/hashicorp/go-version v1.6.0
 	github.com/jedib0t/go-pretty/v6 v6.6.0
@@ -42,6 +43,7 @@ require (
 	go.opentelemetry.io/otel/sdk v1.22.0
 	go.opentelemetry.io/otel/trace v1.22.0
 	golang.org/x/arch v0.4.0
+	golang.org/x/exp v0.0.0-20240119083558-1b970713d09a
 	golang.org/x/mod v0.16.0
 	golang.org/x/net v0.22.0
 	golang.org/x/sys v0.18.0
@@ -97,7 +99,6 @@ require (
 	github.com/go-openapi/strfmt v0.22.0 // indirect
 	github.com/go-openapi/swag v0.22.4 // indirect
 	github.com/go-openapi/validate v0.22.1 // indirect
-	github.com/godbus/dbus/v5 v5.0.6 // indirect
 	github.com/gogo/googleapis v1.4.0 // indirect
 	github.com/gogo/protobuf v1.3.2 // indirect
 	github.com/golang-jwt/jwt/v5 v5.2.0 // indirect
@@ -174,7 +175,6 @@ require (
 	go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect
 	go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 // indirect
 	golang.org/x/crypto v0.21.0 // indirect
-	golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
 	golang.org/x/oauth2 v0.16.0 // indirect
 	golang.org/x/sync v0.6.0 // indirect
 	golang.org/x/text v0.14.0 // indirect

+ 2 - 2
go.sum

@@ -298,8 +298,8 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfc
 github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
 github.com/coroot/dotnetdiag v1.2.2 h1:PVP/By8o+xhPjfVolJYcjHLbFQInM7pkaD6/otPLc8Q=
 github.com/coroot/dotnetdiag v1.2.2/go.mod h1:veXCMlFzm1yNl7wwJb/ZLxO4WbzhDBoy1VG1XtkH2ls=
-github.com/coroot/logparser v1.1.2 h1:9aH4zIBle14xMHq07YHqVFE2t68k3LE10X2yKHXtJG8=
-github.com/coroot/logparser v1.1.2/go.mod h1:YfYxn9FYBm5GYHHUB4zI22irFAWVDe2bcbOWDHKSmEo=
+github.com/coroot/logparser v1.1.5 h1:gCXeJ0qeRsQWnkK9dOwEiZT3DMjCWp1MTY3ZsPoC3Bk=
+github.com/coroot/logparser v1.1.5/go.mod h1:YfYxn9FYBm5GYHHUB4zI22irFAWVDe2bcbOWDHKSmEo=
 github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
 github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
 github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=

+ 2 - 0
logs/otel.go

@@ -2,6 +2,7 @@ package logs
 
 import (
 	"context"
+	"crypto/tls"
 	"time"
 
 	otel "github.com/agoda-com/opentelemetry-logs-go"
@@ -36,6 +37,7 @@ func Init(machineId, hostname, version string) {
 		otlplogshttp.WithEndpoint(endpointUrl.Host),
 		otlplogshttp.WithURLPath(path),
 		otlplogshttp.WithHeaders(common.AuthHeaders()),
+		otlplogshttp.WithTLSClientConfig(&tls.Config{InsecureSkipVerify: *flags.InsecureSkipVerify}),
 	}
 	if endpointUrl.Scheme != "https" {
 		opts = append(opts, otlplogshttp.WithInsecure())

+ 16 - 4
main.go

@@ -8,7 +8,6 @@ import (
 	"net/http"
 	_ "net/http/pprof"
 	"os"
-	"path"
 	"runtime"
 	"strings"
 
@@ -20,6 +19,8 @@ import (
 	"github.com/coroot/coroot-node-agent/flags"
 	"github.com/coroot/coroot-node-agent/logs"
 	"github.com/coroot/coroot-node-agent/node"
+	"github.com/coroot/coroot-node-agent/proc"
+	"github.com/coroot/coroot-node-agent/profiling"
 	"github.com/coroot/coroot-node-agent/prom"
 	"github.com/coroot/coroot-node-agent/tracing"
 	"github.com/prometheus/client_golang/prometheus"
@@ -73,8 +74,8 @@ func uname() (string, string, error) {
 }
 
 func machineID() string {
-	for _, p := range []string{"sys/devices/virtual/dmi/id/product_uuid", "etc/machine-id", "var/lib/dbus/machine-id"} {
-		payload, err := os.ReadFile(path.Join("/proc/1/root", p))
+	for _, p := range []string{"/etc/machine-id", "/var/lib/dbus/machine-id", "/sys/devices/virtual/dmi/id/product_uuid"} {
+		payload, err := os.ReadFile(proc.HostPath(p))
 		if err != nil {
 			log.Warningln("failed to read machine-id:", err)
 			continue
@@ -86,6 +87,15 @@ func machineID() string {
 	return ""
 }
 
+func systemUUID() string {
+	payload, err := os.ReadFile(proc.HostPath("/sys/devices/virtual/dmi/id/product_uuid"))
+	if err != nil {
+		klog.Warningln("failed to read system-uuid:", err)
+		return ""
+	}
+	return strings.TrimSpace(string(payload))
+}
+
 func whitelistNodeExternalNetworks() {
 	netdevs, err := node.NetDevices()
 	if err != nil {
@@ -150,11 +160,13 @@ func main() {
 	whitelistNodeExternalNetworks()
 
 	machineId := machineID()
+	systemUuid := systemUUID()
+
 	tracing.Init(machineId, hostname, version)
 	logs.Init(machineId, hostname, version)
 
 	registry := prometheus.NewRegistry()
-	registerer := prometheus.WrapRegistererWith(prometheus.Labels{"machine_id": machineId}, registry)
+	registerer := prometheus.WrapRegistererWith(prometheus.Labels{"machine_id": machineId, "system_uuid": systemUuid}, registry)
 
 	registerer.MustRegister(info("node_agent_info", version))
 

+ 6 - 5
pinger/pinger.go

@@ -176,15 +176,16 @@ func receive(conn *net.IPConn) (*net.IPAddr, *icmp.Echo, time.Time, error) {
 		}
 		return nil, nil, ts, err
 	}
-
-	if ts, err = getTimestampFromOutOfBandData(oob, oobn); err != nil {
-		return nil, nil, ts, fmt.Errorf("failed to get RX timestamp: %s", err)
-	}
-
 	echo, err := extractEchoFromPacket(pktBuf, n)
 	if err != nil {
 		return nil, nil, ts, fmt.Errorf("failed to extract ICMP Echo from IPv4 packet %s: %s", ra, err)
 	}
+	if echo == nil {
+		return nil, nil, ts, nil
+	}
+	if ts, err = getTimestampFromOutOfBandData(oob, oobn); err != nil {
+		return nil, nil, ts, fmt.Errorf("failed to get RX timestamp: %s", err)
+	}
 	return ra, echo, ts, nil
 }
 

+ 9 - 0
proc/fd.go

@@ -5,6 +5,8 @@ import (
 	"path"
 	"strconv"
 	"strings"
+
+	"k8s.io/klog/v2"
 )
 
 type Fd struct {
@@ -18,16 +20,23 @@ func ReadFds(pid uint32) ([]Fd, error) {
 	fdDir := Path(pid, "fd")
 	entries, err := os.ReadDir(fdDir)
 	if err != nil {
+		if os.IsNotExist(err) {
+			return nil, nil
+		}
 		return nil, err
 	}
 	res := make([]Fd, 0, len(entries))
 	for _, entry := range entries {
 		fd, err := strconv.ParseUint(entry.Name(), 10, 64)
 		if err != nil {
+			klog.Warningf("failed to parse fd '%s': %s", entry.Name(), err)
 			continue
 		}
 		dest, err := os.Readlink(path.Join(fdDir, entry.Name()))
 		if err != nil {
+			if os.IsNotExist(err) {
+				klog.Warningf("failed to read link '%s': %s", entry.Name(), err)
+			}
 			continue
 		}
 		var socketInode string

+ 3 - 0
proc/net.go

@@ -39,6 +39,9 @@ func GetSockets(pid uint32) ([]Sock, error) {
 func readSockets(src string) ([]Sock, error) {
 	f, err := os.Open(src)
 	if err != nil {
+		if os.IsNotExist(err) {
+			return nil, nil
+		}
 		return nil, err
 	}
 	defer f.Close()

+ 4 - 0
profiling/profiling.go

@@ -2,6 +2,7 @@ package profiling
 
 import (
 	"bytes"
+	"crypto/tls"
 	"fmt"
 	"hash/fnv"
 	"io"
@@ -35,6 +36,9 @@ var (
 	constLabels labels.Labels
 	httpClient  = http.Client{
 		Timeout: UploadTimeout,
+		Transport: &http.Transport{
+			TLSClientConfig: &tls.Config{InsecureSkipVerify: *flags.InsecureSkipVerify},
+		},
 	}
 	endpointUrl  *url.URL
 	session      ebpfspy.Session

+ 3 - 0
prom/agent.go

@@ -41,6 +41,9 @@ func StartAgent(machineId string) error {
 			Headers:       common.AuthHeaders(),
 			RemoteTimeout: model.Duration(RemoteWriteTimeout),
 			QueueConfig:   config.DefaultQueueConfig,
+			HTTPClientConfig: promConfig.HTTPClientConfig{
+				TLSConfig: promConfig.TLSConfig{InsecureSkipVerify: *flags.InsecureSkipVerify},
+			},
 		},
 	)
 	cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, &config.ScrapeConfig{

+ 2 - 0
tracing/tracing.go

@@ -2,6 +2,7 @@ package tracing
 
 import (
 	"context"
+	"crypto/tls"
 	"fmt"
 
 	"sync"
@@ -46,6 +47,7 @@ func Init(machineId, hostname, version string) {
 		otlptracehttp.WithEndpoint(endpointUrl.Host),
 		otlptracehttp.WithURLPath(path),
 		otlptracehttp.WithHeaders(common.AuthHeaders()),
+		otlptracehttp.WithTLSClientConfig(&tls.Config{InsecureSkipVerify: *flags.InsecureSkipVerify}),
 	}
 	if endpointUrl.Scheme != "https" {
 		opts = append(opts, otlptracehttp.WithInsecure())