|
|
@@ -2,12 +2,14 @@ package containers
|
|
|
|
|
|
import (
|
|
|
debugelf "debug/elf"
|
|
|
- . "github.com/coroot/coroot-node-agent/utils/modelse"
|
|
|
"os"
|
|
|
+ "sort"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
|
+ . "github.com/coroot/coroot-node-agent/utils/modelse"
|
|
|
+
|
|
|
"github.com/coroot/coroot-node-agent/cgroup"
|
|
|
"github.com/coroot/coroot-node-agent/common"
|
|
|
"github.com/coroot/coroot-node-agent/ebpftracer"
|
|
|
@@ -23,6 +25,7 @@ import (
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
klog "github.com/sirupsen/logrus"
|
|
|
"github.com/vishvananda/netns"
|
|
|
+ "golang.org/x/exp/maps"
|
|
|
"inet.af/netaddr"
|
|
|
)
|
|
|
|
|
|
@@ -46,6 +49,8 @@ type ContainerMetadata struct {
|
|
|
logDecoder logparser.Decoder
|
|
|
hostListens map[string][]netaddr.IPPort
|
|
|
networks map[string]ContainerNetwork
|
|
|
+ env map[string]string
|
|
|
+ systemdTriggeredBy string
|
|
|
rootfs string
|
|
|
}
|
|
|
|
|
|
@@ -73,18 +78,34 @@ type AddrPair struct {
|
|
|
|
|
|
type ActiveConnection struct {
|
|
|
Dest netaddr.IPPort
|
|
|
+ Src netaddr.IPPort
|
|
|
ActualDest netaddr.IPPort
|
|
|
Pid uint32
|
|
|
Fd uint64
|
|
|
Timestamp uint64
|
|
|
Closed time.Time
|
|
|
|
|
|
+ BytesSent uint64
|
|
|
+ BytesReceived uint64
|
|
|
+
|
|
|
http2Parser *l7.Http2Parser
|
|
|
postgresParser *l7.PostgresParser
|
|
|
mysqlParser *l7.MysqlParser
|
|
|
dmParser *l7.DmParser
|
|
|
}
|
|
|
|
|
|
+type ActiveAccept struct {
|
|
|
+ Dest netaddr.IPPort
|
|
|
+ Src netaddr.IPPort
|
|
|
+ Pid uint32
|
|
|
+ Fd uint64
|
|
|
+ Timestamp uint64
|
|
|
+ Closed time.Time
|
|
|
+
|
|
|
+ BytesSent uint64
|
|
|
+ BytesReceived uint64
|
|
|
+}
|
|
|
+
|
|
|
type ListenDetails struct {
|
|
|
ClosedAt time.Time
|
|
|
NsIPs []netaddr.IP
|
|
|
@@ -103,6 +124,25 @@ type K8sContainer struct {
|
|
|
containerName string
|
|
|
pid string
|
|
|
}
|
|
|
+type ConnectionStats struct {
|
|
|
+ Count uint64
|
|
|
+ TotalTime time.Duration
|
|
|
+ Retransmissions uint64
|
|
|
+ BytesSent uint64
|
|
|
+ PerBytesSent uint64
|
|
|
+ BytesReceived uint64
|
|
|
+ PerBytesReceived uint64
|
|
|
+ Src netaddr.IPPort
|
|
|
+ ConEstTime time.Duration
|
|
|
+ FirstReadTime uint64
|
|
|
+ FirstWriteTime uint64
|
|
|
+ NewReadTime uint64
|
|
|
+}
|
|
|
+
|
|
|
+type AcceptStats struct {
|
|
|
+ BytesSent uint64
|
|
|
+ BytesReceived uint64
|
|
|
+}
|
|
|
|
|
|
type Container struct {
|
|
|
id ContainerID
|
|
|
@@ -122,19 +162,23 @@ type Container struct {
|
|
|
delaysLock sync.Mutex
|
|
|
|
|
|
listens map[netaddr.IPPort]map[uint32]*ListenDetails
|
|
|
- ipsByNs map[string][]netaddr.IP
|
|
|
|
|
|
- connectsSuccessful map[AddrPair]int64 // dst:actual_dst -> count
|
|
|
- connectsFailed map[netaddr.IPPort]int64 // dst -> count
|
|
|
- connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
|
|
|
+ connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count
|
|
|
+ connectsFailed map[netaddr.IPPort]int64 // dst -> count
|
|
|
+ connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
|
|
|
connectionsActive map[AddrPair]*ActiveConnection
|
|
|
connectionsByPidFd map[PidFd]*ActiveConnection
|
|
|
- retransmits map[AddrPair]int64 // dst:actual_dst -> count
|
|
|
+
|
|
|
+ acceptsSuccessful map[AddrPair]*AcceptStats
|
|
|
+ acceptLastAttempt map[netaddr.IPPort]time.Time // dst -> time
|
|
|
+ acceptsActive map[AddrPair]*ActiveAccept
|
|
|
+ acceptsByPidFd map[PidFd]*ActiveAccept
|
|
|
|
|
|
l7Stats L7Stats
|
|
|
dnsStats *L7Metrics
|
|
|
|
|
|
- oomKills int
|
|
|
+ oomKills int
|
|
|
+ pythonThreadLockWaitTime time.Duration
|
|
|
|
|
|
mounts map[string]proc.MountInfo
|
|
|
|
|
|
@@ -144,6 +188,8 @@ type Container struct {
|
|
|
nsConntrack *Conntrack
|
|
|
lbConntracks []*Conntrack
|
|
|
|
|
|
+ registry *Registry
|
|
|
+
|
|
|
lock sync.RWMutex
|
|
|
|
|
|
done chan struct{}
|
|
|
@@ -161,7 +207,7 @@ type Container struct {
|
|
|
AppInfo AppInfo
|
|
|
}
|
|
|
|
|
|
-func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32) (*Container, error) {
|
|
|
+func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) {
|
|
|
netNs, err := proc.GetNetNs(pid)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
@@ -177,16 +223,19 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
|
|
|
delaysByPid: map[uint32]Delays{},
|
|
|
|
|
|
listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
|
|
|
- ipsByNs: map[string][]netaddr.IP{},
|
|
|
|
|
|
- connectsSuccessful: map[AddrPair]int64{},
|
|
|
+ connectsSuccessful: map[AddrPair]*ConnectionStats{},
|
|
|
connectsFailed: map[netaddr.IPPort]int64{},
|
|
|
connectLastAttempt: map[netaddr.IPPort]time.Time{},
|
|
|
connectionsActive: map[AddrPair]*ActiveConnection{},
|
|
|
connectionsByPidFd: map[PidFd]*ActiveConnection{},
|
|
|
- retransmits: map[AddrPair]int64{},
|
|
|
- l7Stats: L7Stats{},
|
|
|
- dnsStats: &L7Metrics{},
|
|
|
+ acceptsSuccessful: map[AddrPair]*AcceptStats{},
|
|
|
+ acceptLastAttempt: map[netaddr.IPPort]time.Time{},
|
|
|
+ acceptsActive: map[AddrPair]*ActiveAccept{},
|
|
|
+ acceptsByPidFd: map[PidFd]*ActiveAccept{},
|
|
|
+
|
|
|
+ l7Stats: L7Stats{},
|
|
|
+ dnsStats: &L7Metrics{},
|
|
|
|
|
|
mounts: map[string]proc.MountInfo{},
|
|
|
|
|
|
@@ -196,6 +245,7 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
|
|
|
|
|
|
done: make(chan struct{}),
|
|
|
traceMap: make(map[uint64]*tracing.Trace),
|
|
|
+ registry: registry,
|
|
|
}
|
|
|
|
|
|
for _, n := range md.networks {
|
|
|
@@ -211,18 +261,18 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
|
|
|
|
|
|
c.runLogParser("")
|
|
|
|
|
|
- go func() {
|
|
|
- ticker := time.NewTicker(gcInterval)
|
|
|
- defer ticker.Stop()
|
|
|
- for {
|
|
|
- select {
|
|
|
- case <-c.done:
|
|
|
- return
|
|
|
- case t := <-ticker.C:
|
|
|
- c.gc(t)
|
|
|
- }
|
|
|
- }
|
|
|
- }()
|
|
|
+ // go func() {
|
|
|
+ // ticker := time.NewTicker(gcInterval)
|
|
|
+ // defer ticker.Stop()
|
|
|
+ // for {
|
|
|
+ // select {
|
|
|
+ // case <-c.done:
|
|
|
+ // return
|
|
|
+ // case t := <-ticker.C:
|
|
|
+ // c.gc(t)
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ // }()
|
|
|
|
|
|
return c, nil
|
|
|
}
|
|
|
@@ -250,11 +300,13 @@ func (c *Container) Describe(ch chan<- *prometheus.Desc) {
|
|
|
}
|
|
|
|
|
|
func (c *Container) Collect(ch chan<- prometheus.Metric) {
|
|
|
+ c.registry.updateTrafficStatsIfNecessary()
|
|
|
+
|
|
|
c.lock.RLock()
|
|
|
defer c.lock.RUnlock()
|
|
|
|
|
|
- if c.metadata.image != "" {
|
|
|
- ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image)
|
|
|
+ if c.metadata.image != "" || c.metadata.systemdTriggeredBy != "" {
|
|
|
+ ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image, c.metadata.systemdTriggeredBy)
|
|
|
}
|
|
|
|
|
|
ch <- counter(metrics.Restarts, float64(c.restarts))
|
|
|
@@ -308,7 +360,7 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
|
|
|
}
|
|
|
|
|
|
for addr, open := range c.getListens() {
|
|
|
- ch <- gauge(metrics.NetListenInfo, float64(open), addr.String())
|
|
|
+ ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "")
|
|
|
}
|
|
|
for proxy, addrs := range c.getProxiedListens() {
|
|
|
for addr := range addrs {
|
|
|
@@ -316,14 +368,35 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- for d, count := range c.connectsSuccessful {
|
|
|
- ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String())
|
|
|
+ for d, stats := range c.connectsSuccessful {
|
|
|
+ ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), d.src.String(), d.dst.String())
|
|
|
+ ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), d.src.String(), d.dst.String())
|
|
|
+ if stats.Retransmissions > 0 {
|
|
|
+ ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), d.src.String(), d.dst.String())
|
|
|
+ }
|
|
|
+ ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), d.dst.String(), d.dst.String(),stats.Src.String())
|
|
|
+ ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), d.dst.String(), d.dst.String(),stats.Src.String())
|
|
|
+ ch <- counter(metrics.NetBytesSentPer, float64(stats.PerBytesSent), d.dst.String(), d.dst.String(),stats.Src.String())
|
|
|
+ ch <- counter(metrics.NetBytesReceivedPer, float64(stats.PerBytesReceived), d.dst.String(), d.dst.String(),stats.Src.String())
|
|
|
+
|
|
|
+ ch <- counter(metrics.NetDataLatency, float64(stats.FirstReadTime-stats.FirstWriteTime), d.dst.String(), d.dst.String(),stats.Src.String())
|
|
|
+ ch <- counter(metrics.NetDataDuration, float64(stats.NewReadTime-stats.FirstWriteTime), d.dst.String(), d.dst.String(),stats.Src.String())
|
|
|
+ ch <- counter(metrics.NetEstTime, float64(stats.ConEstTime), d.dst.String(), d.dst.String(),stats.Src.String())
|
|
|
+
|
|
|
+ klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime)
|
|
|
+ stats.PerBytesReceived = 0
|
|
|
+ stats.PerBytesSent = 0
|
|
|
}
|
|
|
+
|
|
|
+ // for d, stats := range c.acceptsSuccessful {
|
|
|
+ // ch <- counter(metrics.NetAcceptsSuccessful, float64(0), d.src.String(), d.dst.String())
|
|
|
+ // ch <- counter(metrics.NetAcceptBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String())
|
|
|
+ // ch <- counter(metrics.NetAcceptBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String())
|
|
|
+
|
|
|
+ // klog.Infof("c.acceptsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d", d.src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived)
|
|
|
+ // }
|
|
|
for dst, count := range c.connectsFailed {
|
|
|
- ch <- counter(metrics.NetConnectsFailed, float64(count), dst.String())
|
|
|
- }
|
|
|
- for d, count := range c.retransmits {
|
|
|
- ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String())
|
|
|
+ ch <- counter(metrics.NetConnectionsFailed, float64(count), dst.String())
|
|
|
}
|
|
|
|
|
|
connections := map[AddrPair]int{}
|
|
|
@@ -345,7 +418,14 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
|
|
|
|
|
|
appTypes := map[string]struct{}{}
|
|
|
seenJvms := map[string]bool{}
|
|
|
- for pid, process := range c.processes {
|
|
|
+ seenDotNetApps := map[string]bool{}
|
|
|
+ pids := maps.Keys(c.processes)
|
|
|
+ sort.Slice(pids, func(i, j int) bool {
|
|
|
+ return pids[i] < pids[j]
|
|
|
+ })
|
|
|
+
|
|
|
+ for _, pid := range pids {
|
|
|
+ process := c.processes[pid]
|
|
|
cmdline := proc.GetCmdline(pid)
|
|
|
if len(cmdline) == 0 {
|
|
|
continue
|
|
|
@@ -354,6 +434,9 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
|
|
|
if appType != "" {
|
|
|
appTypes[appType] = struct{}{}
|
|
|
}
|
|
|
+ if process.isGolangApp {
|
|
|
+ appTypes["golang"] = struct{}{}
|
|
|
+ }
|
|
|
switch {
|
|
|
case isJvm(cmdline):
|
|
|
jvm, jMetrics := jvmMetrics(pid)
|
|
|
@@ -365,12 +448,20 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
|
|
|
}
|
|
|
case process.dotNetMonitor != nil:
|
|
|
appTypes["dotnet"] = struct{}{}
|
|
|
- process.dotNetMonitor.Collect(ch)
|
|
|
+ appName := process.dotNetMonitor.AppName()
|
|
|
+ if !seenDotNetApps[appName] {
|
|
|
+ seenDotNetApps[appName] = true
|
|
|
+ process.dotNetMonitor.Collect(ch)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
for appType := range appTypes {
|
|
|
ch <- gauge(metrics.ApplicationType, 1, appType)
|
|
|
}
|
|
|
+ if c.pythonThreadLockWaitTime > 0 {
|
|
|
+ ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds())
|
|
|
+ }
|
|
|
+
|
|
|
if c.dnsStats.Requests != nil {
|
|
|
c.dnsStats.Requests.Collect(ch)
|
|
|
}
|
|
|
@@ -384,6 +475,7 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
|
|
|
ch <- gauge(metrics.NetLatency, rtt, ip.String())
|
|
|
}
|
|
|
}
|
|
|
+ c.gc(time.Now())
|
|
|
}
|
|
|
|
|
|
func (c *Container) onProcessStart(pid uint32) *Process {
|
|
|
@@ -395,7 +487,8 @@ func (c *Container) onProcessStart(pid uint32) *Process {
|
|
|
return nil
|
|
|
}
|
|
|
c.zombieAt = time.Time{}
|
|
|
- p := NewProcess(pid, stats)
|
|
|
+ p := NewProcess(pid, stats, c.registry.tracer)
|
|
|
+
|
|
|
if p == nil {
|
|
|
return nil
|
|
|
}
|
|
|
@@ -465,6 +558,7 @@ func (c *Container) onFileOpen(pid uint32, fd uint64) {
|
|
|
|
|
|
// set
|
|
|
func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
|
|
|
+ klog.Infof("TCP listen open pid=%d id=%s addr=%s", pid, c.id, addr)
|
|
|
if common.PortFilter.ShouldBeSkipped(addr.Port()) {
|
|
|
return
|
|
|
}
|
|
|
@@ -487,20 +581,18 @@ func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
|
|
|
return
|
|
|
}
|
|
|
defer ns.Close()
|
|
|
- nsId := ns.UniqueId()
|
|
|
- ips, ok := c.ipsByNs[nsId]
|
|
|
- if !ok {
|
|
|
- if ips, err = proc.GetNsIps(ns); err != nil {
|
|
|
- klog.Warningln(err)
|
|
|
- } else {
|
|
|
- c.ipsByNs[nsId] = ips
|
|
|
- }
|
|
|
+ ips, err := proc.GetNsIps(ns)
|
|
|
+ if err != nil {
|
|
|
+ klog.Warningln(err)
|
|
|
+ return
|
|
|
}
|
|
|
+ klog.Infof("got IPs %s for %s", ips, ns.UniqueId())
|
|
|
details.NsIPs = ips
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
|
|
|
+ klog.Infof("TCP listen close pid=%d id=%s addr=%s", pid, c.id, addr)
|
|
|
c.lock.Lock()
|
|
|
defer c.lock.Unlock()
|
|
|
if _, byAddr := c.listens[addr]; byAddr {
|
|
|
@@ -512,7 +604,57 @@ func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool) {
|
|
|
+func (c *Container) onAcceptOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
|
|
|
+ klog.Infof("accept pid=%d id=%s dstaddr=%s srcaddr=%s", pid, c.id, dst.IP(), src.IP())
|
|
|
+ // if common.PortFilter.ShouldBeSkipped(dst.Port()) {
|
|
|
+ // return
|
|
|
+ // }
|
|
|
+ p := c.processes[pid]
|
|
|
+ if p == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if dst.IP().IsLoopback() && !p.isHostNs() {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ // actualDst, err := c.getActualDestination(p, src, dst)
|
|
|
+ // if err != nil {
|
|
|
+ // if !common.IsNotExist(err) {
|
|
|
+ // klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
|
|
|
+ // }
|
|
|
+ // return
|
|
|
+ // }
|
|
|
+ // switch {
|
|
|
+ // case actualDst == nil:
|
|
|
+ // actualDst = &dst
|
|
|
+ // case actualDst.IP().IsLoopback() && !p.isHostNs():
|
|
|
+ // return
|
|
|
+ // }
|
|
|
+ // if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
|
|
|
+ // return
|
|
|
+ // }
|
|
|
+ c.lock.Lock()
|
|
|
+ defer c.lock.Unlock()
|
|
|
+ if !failed {
|
|
|
+ key := AddrPair{src: dst, dst: src}
|
|
|
+ stats := c.acceptsSuccessful[key]
|
|
|
+ if stats == nil {
|
|
|
+ stats = &AcceptStats{}
|
|
|
+ c.acceptsSuccessful[key] = stats
|
|
|
+ }
|
|
|
+ acceptCon := &ActiveAccept{
|
|
|
+ Dest: src,
|
|
|
+ Src: dst,
|
|
|
+ Pid: pid,
|
|
|
+ Fd: fd,
|
|
|
+ Timestamp: timestamp,
|
|
|
+ }
|
|
|
+ c.acceptsActive[AddrPair{src: dst, dst: src}] = acceptCon
|
|
|
+ c.acceptsByPidFd[PidFd{Pid: pid, Fd: fd}] = acceptCon
|
|
|
+ }
|
|
|
+ c.acceptLastAttempt[dst] = time.Now()
|
|
|
+}
|
|
|
+
|
|
|
+func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
|
|
|
if common.PortFilter.ShouldBeSkipped(dst.Port()) {
|
|
|
return
|
|
|
}
|
|
|
@@ -544,9 +686,19 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
|
|
|
if failed {
|
|
|
c.connectsFailed[dst]++
|
|
|
} else {
|
|
|
- c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
|
|
|
+ key := AddrPair{src: dst, dst: *actualDst}
|
|
|
+ stats := c.connectsSuccessful[key]
|
|
|
+ if stats == nil {
|
|
|
+ stats = &ConnectionStats{}
|
|
|
+ c.connectsSuccessful[key] = stats
|
|
|
+ }
|
|
|
+ stats.Count++
|
|
|
+ stats.TotalTime += duration
|
|
|
+ stats.Src = src
|
|
|
+ stats.ConEstTime = duration
|
|
|
connection := &ActiveConnection{
|
|
|
Dest: dst,
|
|
|
+ Src: src,
|
|
|
ActualDest: *actualDst,
|
|
|
Pid: pid,
|
|
|
Fd: fd,
|
|
|
@@ -588,15 +740,93 @@ func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*
|
|
|
return nil, nil
|
|
|
}
|
|
|
|
|
|
-func (c *Container) onConnectionClose(srcDst AddrPair) bool {
|
|
|
+func (c *Container) onConnectionClose(e ebpftracer.Event) {
|
|
|
+ c.lock.Lock()
|
|
|
+ conn := c.connectionsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
|
|
|
+ c.lock.Unlock()
|
|
|
+ if conn != nil {
|
|
|
+ if conn.Closed.IsZero() {
|
|
|
+ if e.TrafficStats != nil {
|
|
|
+ c.lock.Lock()
|
|
|
+ c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived, e.FirstReadTime, e.FirstWriteTime, e.NewReadTime)
|
|
|
+ c.lock.Unlock()
|
|
|
+ }
|
|
|
+ conn.Closed = time.Now()
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (c *Container) onAcceptClose(e ebpftracer.Event) {
|
|
|
+ c.lock.Lock()
|
|
|
+ conn := c.acceptsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
|
|
|
+ c.lock.Unlock()
|
|
|
+ if conn != nil {
|
|
|
+ if conn.Closed.IsZero() {
|
|
|
+ if e.TrafficStats != nil {
|
|
|
+ c.lock.Lock()
|
|
|
+ c.updateAcceptTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived)
|
|
|
+ c.lock.Unlock()
|
|
|
+ }
|
|
|
+ conn.Closed = time.Now()
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) {
|
|
|
+ if u == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
c.lock.Lock()
|
|
|
defer c.lock.Unlock()
|
|
|
- conn := c.connectionsActive[srcDst]
|
|
|
- if conn == nil {
|
|
|
- return false
|
|
|
+ c.updateConnectionTrafficStats(c.connectionsByPidFd[PidFd{Pid: u.Pid, Fd: u.FD}], u.BytesSent, u.BytesReceived, 0, 0, 0)
|
|
|
+}
|
|
|
+
|
|
|
+func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, received, firstreadtime, firstwritetime, newreadtime uint64) {
|
|
|
+ if ac == nil {
|
|
|
+ return
|
|
|
}
|
|
|
- conn.Closed = time.Now()
|
|
|
- return true
|
|
|
+ key := AddrPair{src: ac.Dest, dst: ac.ActualDest}
|
|
|
+ stats := c.connectsSuccessful[key]
|
|
|
+ if stats == nil {
|
|
|
+ stats = &ConnectionStats{}
|
|
|
+ c.connectsSuccessful[key] = stats
|
|
|
+ }
|
|
|
+ if sent > ac.BytesSent {
|
|
|
+ stats.BytesSent += sent - ac.BytesSent
|
|
|
+ stats.PerBytesSent = sent - ac.BytesSent
|
|
|
+ }
|
|
|
+ if received > ac.BytesReceived {
|
|
|
+ stats.BytesReceived += received - ac.BytesReceived
|
|
|
+ stats.PerBytesReceived = received - ac.BytesReceived
|
|
|
+ }
|
|
|
+ if firstreadtime != 0 && firstwritetime != 0 && newreadtime != 0 {
|
|
|
+ stats.FirstReadTime = firstreadtime
|
|
|
+ stats.FirstWriteTime = firstwritetime
|
|
|
+ stats.NewReadTime = newreadtime
|
|
|
+ }
|
|
|
+ ac.BytesSent = sent
|
|
|
+ ac.BytesReceived = received
|
|
|
+}
|
|
|
+
|
|
|
+func (c *Container) updateAcceptTrafficStats(ac *ActiveAccept, sent, received uint64) {
|
|
|
+ if ac == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ klog.Infoln("TCP onConnectionClose5", ac.BytesSent, ac.BytesReceived, ac)
|
|
|
+ key := AddrPair{src: ac.Src, dst: ac.Dest}
|
|
|
+ stats := c.acceptsSuccessful[key]
|
|
|
+ if stats == nil {
|
|
|
+ stats = &AcceptStats{}
|
|
|
+ c.acceptsSuccessful[key] = stats
|
|
|
+ }
|
|
|
+ if sent > ac.BytesSent {
|
|
|
+ stats.BytesSent += sent - ac.BytesSent
|
|
|
+ }
|
|
|
+ if received > ac.BytesReceived {
|
|
|
+ stats.BytesReceived += received - ac.BytesReceived
|
|
|
+ }
|
|
|
+ ac.BytesSent = sent
|
|
|
+ ac.BytesReceived = received
|
|
|
}
|
|
|
|
|
|
func (c *Container) onDNSRequest(r *l7.RequestData) (map[netaddr.IP]string, string, string) {
|
|
|
@@ -612,10 +842,10 @@ func (c *Container) onDNSRequest(r *l7.RequestData) (map[netaddr.IP]string, stri
|
|
|
dnsReq := L7Requests[l7.ProtocolDNS]
|
|
|
c.dnsStats.Requests = prometheus.NewCounterVec(
|
|
|
prometheus.CounterOpts{Name: dnsReq.Name, Help: dnsReq.Help},
|
|
|
- []string{"request_type", "status"},
|
|
|
+ []string{"request_type", "domain", "status"},
|
|
|
)
|
|
|
}
|
|
|
- if m, _ := c.dnsStats.Requests.GetMetricWithLabelValues(t, status); m != nil {
|
|
|
+ if m, _ := c.dnsStats.Requests.GetMetricWithLabelValues(t, fqdn, status); m != nil {
|
|
|
m.Inc()
|
|
|
}
|
|
|
if r.Duration != 0 {
|
|
|
@@ -705,14 +935,20 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (c *Container) onRetransmit(srcDst AddrPair) bool {
|
|
|
+func (c *Container) onRetransmission(srcDst AddrPair) bool {
|
|
|
c.lock.Lock()
|
|
|
defer c.lock.Unlock()
|
|
|
conn, ok := c.connectionsActive[srcDst]
|
|
|
if !ok {
|
|
|
return false
|
|
|
}
|
|
|
- c.retransmits[AddrPair{src: srcDst.dst, dst: conn.ActualDest}]++
|
|
|
+ key := AddrPair{src: srcDst.dst, dst: conn.ActualDest}
|
|
|
+ stats := c.connectsSuccessful[key]
|
|
|
+ if stats == nil {
|
|
|
+ stats = &ConnectionStats{}
|
|
|
+ c.connectsSuccessful[key] = stats
|
|
|
+ }
|
|
|
+ stats.Retransmissions++
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
@@ -874,6 +1110,12 @@ func (c *Container) ping() map[netaddr.IP]float64 {
|
|
|
}
|
|
|
targets := make([]netaddr.IP, 0, len(ips))
|
|
|
for ip := range ips {
|
|
|
+ if ip.IsLoopback() {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if !ip.Is4() { // pinger doesn't support IPv6 yet
|
|
|
+ continue
|
|
|
+ }
|
|
|
targets = append(targets, ip)
|
|
|
}
|
|
|
rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout)
|
|
|
@@ -944,8 +1186,8 @@ func (c *Container) runLogParser(logPath string) {
|
|
|
}
|
|
|
|
|
|
func (c *Container) gc(now time.Time) {
|
|
|
- c.lock.Lock()
|
|
|
- defer c.lock.Unlock()
|
|
|
+ // c.lock.Lock()
|
|
|
+ // defer c.lock.Unlock()
|
|
|
|
|
|
established := map[AddrPair]struct{}{}
|
|
|
establishedDst := map[netaddr.IPPort]struct{}{}
|
|
|
@@ -953,7 +1195,7 @@ func (c *Container) gc(now time.Time) {
|
|
|
seenNamespaces := map[string]bool{}
|
|
|
fdMap := map[uint64]struct{}{}
|
|
|
for _, p := range c.processes {
|
|
|
- if seenNamespaces[p.NetNsId] {
|
|
|
+ if seenNamespaces[p.NetNsId()] {
|
|
|
continue
|
|
|
}
|
|
|
sockets, err := proc.GetSockets(p.Pid)
|
|
|
@@ -975,15 +1217,9 @@ func (c *Container) gc(now time.Time) {
|
|
|
establishedDst[s.DAddr] = struct{}{}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- seenNamespaces[p.NetNsId] = true
|
|
|
+ seenNamespaces[p.NetNsId()] = true
|
|
|
}
|
|
|
|
|
|
- for ns := range c.ipsByNs {
|
|
|
- if !seenNamespaces[ns] {
|
|
|
- delete(c.ipsByNs, ns)
|
|
|
- }
|
|
|
- }
|
|
|
c.revalidateListens(now, listens)
|
|
|
for srcDst, conn := range c.connectionsActive {
|
|
|
pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
|
|
|
@@ -1003,6 +1239,24 @@ func (c *Container) gc(now time.Time) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ for srcDst, conn := range c.acceptsActive {
|
|
|
+ pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
|
|
|
+ if _, ok := established[srcDst]; !ok {
|
|
|
+ delete(c.acceptsActive, srcDst)
|
|
|
+ if conn == c.acceptsByPidFd[pidFd] {
|
|
|
+ delete(c.acceptsByPidFd, pidFd)
|
|
|
+ }
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
|
|
|
+ delete(c.acceptsActive, srcDst)
|
|
|
+ if conn == c.acceptsByPidFd[pidFd] {
|
|
|
+ delete(c.acceptsByPidFd, pidFd)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
for _, conn := range c.connectionsByPidFd {
|
|
|
|
|
|
if _, ok := fdMap[conn.Fd]; !ok {
|
|
|
@@ -1010,6 +1264,13 @@ func (c *Container) gc(now time.Time) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ for _, conn := range c.acceptsByPidFd {
|
|
|
+
|
|
|
+ if _, ok := fdMap[conn.Fd]; !ok {
|
|
|
+ delete(c.acceptsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
for dst, at := range c.connectLastAttempt {
|
|
|
_, active := establishedDst[dst]
|
|
|
if !active && !at.IsZero() && now.Sub(at) > gcInterval {
|
|
|
@@ -1020,9 +1281,17 @@ func (c *Container) gc(now time.Time) {
|
|
|
delete(c.connectsSuccessful, d)
|
|
|
}
|
|
|
}
|
|
|
- for d := range c.retransmits {
|
|
|
+ c.l7Stats.delete(dst)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for dst, at := range c.acceptLastAttempt {
|
|
|
+ _, active := establishedDst[dst]
|
|
|
+ if !active && !at.IsZero() && now.Sub(at) > gcInterval {
|
|
|
+ delete(c.acceptLastAttempt, dst)
|
|
|
+ for d := range c.acceptsSuccessful {
|
|
|
if d.src == dst {
|
|
|
- delete(c.retransmits, d)
|
|
|
+ delete(c.acceptsSuccessful, d)
|
|
|
}
|
|
|
}
|
|
|
c.l7Stats.delete(dst)
|
|
|
@@ -1067,6 +1336,7 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I
|
|
|
for pid := range c.processes {
|
|
|
fds, err := proc.ReadFds(pid)
|
|
|
if err != nil {
|
|
|
+ klog.Warningln(err)
|
|
|
continue
|
|
|
}
|
|
|
for _, fd := range fds {
|
|
|
@@ -1158,6 +1428,7 @@ func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) erro
|
|
|
p.uprobes = append(p.uprobes, goProbes...)
|
|
|
p.goTlsUprobesChecked = true
|
|
|
}
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
|