container.go 25 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048
  1. package containers
  2. import (
  3. "os"
  4. "strings"
  5. "sync"
  6. "time"
  7. "github.com/coroot/coroot-node-agent/cgroup"
  8. "github.com/coroot/coroot-node-agent/common"
  9. "github.com/coroot/coroot-node-agent/ebpftracer"
  10. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  11. "github.com/coroot/coroot-node-agent/flags"
  12. "github.com/coroot/coroot-node-agent/logs"
  13. "github.com/coroot/coroot-node-agent/node"
  14. "github.com/coroot/coroot-node-agent/pinger"
  15. "github.com/coroot/coroot-node-agent/proc"
  16. "github.com/coroot/coroot-node-agent/tracing"
  17. "github.com/coroot/logparser"
  18. "github.com/prometheus/client_golang/prometheus"
  19. "github.com/vishvananda/netns"
  20. "inet.af/netaddr"
  21. "k8s.io/klog/v2"
  22. )
  23. var (
  24. gcInterval = 10 * time.Minute
  25. pingTimeout = 300 * time.Millisecond
  26. )
  27. type ContainerID string
  28. type ContainerNetwork struct {
  29. NetworkID string
  30. }
  31. type ContainerMetadata struct {
  32. name string
  33. labels map[string]string
  34. volumes map[string]string
  35. logPath string
  36. image string
  37. logDecoder logparser.Decoder
  38. hostListens map[string][]netaddr.IPPort
  39. networks map[string]ContainerNetwork
  40. }
  41. type Delays struct {
  42. cpu time.Duration
  43. disk time.Duration
  44. }
  45. type LogParser struct {
  46. parser *logparser.Parser
  47. stop func()
  48. }
  49. func (p *LogParser) Stop() {
  50. if p.stop != nil {
  51. p.stop()
  52. }
  53. p.parser.Stop()
  54. }
  55. type AddrPair struct {
  56. src netaddr.IPPort
  57. dst netaddr.IPPort
  58. }
  59. type ActiveConnection struct {
  60. Dest netaddr.IPPort
  61. ActualDest netaddr.IPPort
  62. Pid uint32
  63. Fd uint64
  64. Timestamp uint64
  65. Closed time.Time
  66. http2Parser *l7.Http2Parser
  67. postgresParser *l7.PostgresParser
  68. mysqlParser *l7.MysqlParser
  69. }
  70. type ListenDetails struct {
  71. ClosedAt time.Time
  72. NsIPs []netaddr.IP
  73. }
  74. type PidFd struct {
  75. Pid uint32
  76. Fd uint64
  77. }
  78. type Container struct {
  79. id ContainerID
  80. cgroup *cgroup.Cgroup
  81. metadata *ContainerMetadata
  82. processes map[uint32]*Process
  83. startedAt time.Time
  84. zombieAt time.Time
  85. restarts int
  86. delays Delays
  87. delaysByPid map[uint32]Delays
  88. delaysLock sync.Mutex
  89. listens map[netaddr.IPPort]map[uint32]*ListenDetails
  90. ipsByNs map[string][]netaddr.IP
  91. connectsSuccessful map[AddrPair]int64 // dst:actual_dst -> count
  92. connectsFailed map[netaddr.IPPort]int64 // dst -> count
  93. connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  94. connectionsActive map[AddrPair]*ActiveConnection
  95. connectionsByPidFd map[PidFd]*ActiveConnection
  96. retransmits map[AddrPair]int64 // dst:actual_dst -> count
  97. l7Stats L7Stats
  98. oomKills int
  99. mounts map[string]proc.MountInfo
  100. logParsers map[string]*LogParser
  101. hostConntrack *Conntrack
  102. nsConntrack *Conntrack
  103. lbConntracks []*Conntrack
  104. lock sync.RWMutex
  105. done chan struct{}
  106. }
  107. func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32) (*Container, error) {
  108. netNs, err := proc.GetNetNs(pid)
  109. if err != nil {
  110. return nil, err
  111. }
  112. defer netNs.Close()
  113. c := &Container{
  114. id: id,
  115. cgroup: cg,
  116. metadata: md,
  117. processes: map[uint32]*Process{},
  118. delaysByPid: map[uint32]Delays{},
  119. listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
  120. ipsByNs: map[string][]netaddr.IP{},
  121. connectsSuccessful: map[AddrPair]int64{},
  122. connectsFailed: map[netaddr.IPPort]int64{},
  123. connectLastAttempt: map[netaddr.IPPort]time.Time{},
  124. connectionsActive: map[AddrPair]*ActiveConnection{},
  125. connectionsByPidFd: map[PidFd]*ActiveConnection{},
  126. retransmits: map[AddrPair]int64{},
  127. l7Stats: L7Stats{},
  128. mounts: map[string]proc.MountInfo{},
  129. logParsers: map[string]*LogParser{},
  130. hostConntrack: hostConntrack,
  131. done: make(chan struct{}),
  132. }
  133. for _, n := range md.networks {
  134. if nsHandle := FindNetworkLoadBalancerNs(n.NetworkID); nsHandle.IsOpen() {
  135. if ct, err := NewConntrack(nsHandle); err != nil {
  136. klog.Warningln(err)
  137. } else {
  138. c.lbConntracks = append(c.lbConntracks, ct)
  139. }
  140. _ = nsHandle.Close()
  141. }
  142. }
  143. c.runLogParser("")
  144. go func() {
  145. ticker := time.NewTicker(gcInterval)
  146. defer ticker.Stop()
  147. for {
  148. select {
  149. case <-c.done:
  150. return
  151. case t := <-ticker.C:
  152. c.gc(t)
  153. }
  154. }
  155. }()
  156. return c, nil
  157. }
  158. func (c *Container) Close() {
  159. for _, p := range c.logParsers {
  160. p.Stop()
  161. }
  162. for _, ct := range c.lbConntracks {
  163. _ = ct.Close()
  164. }
  165. if c.nsConntrack != nil {
  166. _ = c.nsConntrack.Close()
  167. }
  168. close(c.done)
  169. }
  170. func (c *Container) Dead(now time.Time) bool {
  171. return !c.zombieAt.IsZero() && now.Sub(c.zombieAt) > gcInterval
  172. }
  173. func (c *Container) Describe(ch chan<- *prometheus.Desc) {
  174. // some fixed metric description is required here to register/unregister the collector correctly
  175. ch <- prometheus.NewDesc("container", "", nil, nil)
  176. }
  177. func (c *Container) Collect(ch chan<- prometheus.Metric) {
  178. c.lock.RLock()
  179. defer c.lock.RUnlock()
  180. if c.metadata.image != "" {
  181. ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image)
  182. }
  183. ch <- counter(metrics.Restarts, float64(c.restarts))
  184. if cpu, err := c.cgroup.CpuStat(); err == nil {
  185. if cpu.LimitCores > 0 {
  186. ch <- gauge(metrics.CPULimit, cpu.LimitCores)
  187. }
  188. ch <- counter(metrics.CPUUsage, cpu.UsageSeconds)
  189. ch <- counter(metrics.ThrottledTime, cpu.ThrottledTimeSeconds)
  190. }
  191. if taskstatsClient != nil {
  192. c.updateDelays()
  193. ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second))
  194. ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second))
  195. }
  196. if s, err := c.cgroup.MemoryStat(); err == nil {
  197. ch <- gauge(metrics.MemoryRss, float64(s.RSS))
  198. ch <- gauge(metrics.MemoryCache, float64(s.Cache))
  199. if s.Limit > 0 {
  200. ch <- gauge(metrics.MemoryLimit, float64(s.Limit))
  201. }
  202. }
  203. if c.oomKills > 0 {
  204. ch <- counter(metrics.OOMKills, float64(c.oomKills))
  205. }
  206. if disks, err := node.GetDisks(); err == nil {
  207. ioStat, _ := c.cgroup.IOStat()
  208. for majorMinor, mounts := range c.getMounts() {
  209. dev := disks.GetParentBlockDevice(majorMinor)
  210. if dev == nil {
  211. continue
  212. }
  213. for mountPoint, fsStat := range mounts {
  214. dls := []string{mountPoint, dev.Name, c.metadata.volumes[mountPoint]}
  215. ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...)
  216. ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...)
  217. ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...)
  218. if io, ok := ioStat[majorMinor]; ok {
  219. ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...)
  220. ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...)
  221. ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...)
  222. ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...)
  223. }
  224. }
  225. }
  226. }
  227. for addr, open := range c.getListens() {
  228. ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "")
  229. }
  230. for proxy, addrs := range c.getProxiedListens() {
  231. for addr := range addrs {
  232. ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy)
  233. }
  234. }
  235. for d, count := range c.connectsSuccessful {
  236. ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String())
  237. }
  238. for dst, count := range c.connectsFailed {
  239. ch <- counter(metrics.NetConnectsFailed, float64(count), dst.String())
  240. }
  241. for d, count := range c.retransmits {
  242. ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String())
  243. }
  244. connections := map[AddrPair]int{}
  245. for addrPair, conn := range c.connectionsActive {
  246. if !conn.Closed.IsZero() {
  247. continue
  248. }
  249. connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest}]++
  250. }
  251. for d, count := range connections {
  252. ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
  253. }
  254. for source, p := range c.logParsers {
  255. for _, c := range p.parser.GetCounters() {
  256. ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
  257. }
  258. }
  259. appTypes := map[string]struct{}{}
  260. seenJvms := map[string]bool{}
  261. for pid, process := range c.processes {
  262. cmdline := proc.GetCmdline(pid)
  263. if len(cmdline) == 0 {
  264. continue
  265. }
  266. appType := guessApplicationType(cmdline)
  267. if appType != "" {
  268. appTypes[appType] = struct{}{}
  269. }
  270. switch {
  271. case isJvm(cmdline):
  272. jvm, jMetrics := jvmMetrics(pid)
  273. if len(jMetrics) > 0 && !seenJvms[jvm] {
  274. seenJvms[jvm] = true
  275. for _, m := range jMetrics {
  276. ch <- m
  277. }
  278. }
  279. case process.dotNetMonitor != nil:
  280. appTypes["dotnet"] = struct{}{}
  281. process.dotNetMonitor.Collect(ch)
  282. }
  283. }
  284. for appType := range appTypes {
  285. ch <- gauge(metrics.ApplicationType, 1, appType)
  286. }
  287. c.l7Stats.collect(ch)
  288. if !*flags.DisablePinger {
  289. for ip, rtt := range c.ping() {
  290. ch <- gauge(metrics.NetLatency, rtt, ip.String())
  291. }
  292. }
  293. }
  294. func (c *Container) onProcessStart(pid uint32) *Process {
  295. c.lock.Lock()
  296. defer c.lock.Unlock()
  297. stats, err := TaskstatsPID(pid)
  298. if err != nil {
  299. return nil
  300. }
  301. c.zombieAt = time.Time{}
  302. p := NewProcess(pid, stats)
  303. if p == nil {
  304. return nil
  305. }
  306. c.processes[pid] = p
  307. if c.startedAt.IsZero() {
  308. c.startedAt = stats.BeginTime
  309. } else {
  310. min := stats.BeginTime
  311. for _, p := range c.processes {
  312. if p.StartedAt.Before(min) {
  313. min = p.StartedAt
  314. }
  315. }
  316. if min.After(c.startedAt) {
  317. c.restarts++
  318. c.startedAt = min
  319. }
  320. }
  321. return p
  322. }
  323. func (c *Container) onProcessExit(pid uint32, oomKill bool) {
  324. c.lock.Lock()
  325. defer c.lock.Unlock()
  326. if p := c.processes[pid]; p != nil {
  327. p.Close()
  328. }
  329. delete(c.processes, pid)
  330. if len(c.processes) == 0 {
  331. c.zombieAt = time.Now()
  332. }
  333. delete(c.delaysByPid, pid)
  334. if oomKill {
  335. c.oomKills++
  336. }
  337. }
  338. func (c *Container) onFileOpen(pid uint32, fd uint64) {
  339. mntId, logPath := resolveFd(pid, fd)
  340. func() {
  341. if mntId == "" {
  342. return
  343. }
  344. c.lock.Lock()
  345. _, ok := c.mounts[mntId]
  346. c.lock.Unlock()
  347. if ok {
  348. return
  349. }
  350. byMountId := proc.GetMountInfo(pid)
  351. if byMountId == nil {
  352. return
  353. }
  354. if mi, ok := byMountId[mntId]; ok {
  355. c.lock.Lock()
  356. c.mounts[mntId] = mi
  357. c.lock.Unlock()
  358. }
  359. }()
  360. if logPath != "" {
  361. c.lock.Lock()
  362. c.runLogParser(logPath)
  363. c.lock.Unlock()
  364. }
  365. }
  366. func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
  367. if common.PortFilter.ShouldBeSkipped(addr.Port()) {
  368. return
  369. }
  370. if !safe {
  371. c.lock.Lock()
  372. defer c.lock.Unlock()
  373. }
  374. if _, ok := c.listens[addr]; !ok {
  375. c.listens[addr] = map[uint32]*ListenDetails{}
  376. }
  377. details := &ListenDetails{}
  378. c.listens[addr][pid] = details
  379. if addr.IP().IsUnspecified() {
  380. ns, err := proc.GetNetNs(pid)
  381. if err != nil {
  382. if !common.IsNotExist(err) {
  383. klog.Warningln(err)
  384. }
  385. return
  386. }
  387. defer ns.Close()
  388. nsId := ns.UniqueId()
  389. ips, ok := c.ipsByNs[nsId]
  390. if !ok {
  391. if ips, err = proc.GetNsIps(ns); err != nil {
  392. klog.Warningln(err)
  393. } else {
  394. c.ipsByNs[nsId] = ips
  395. }
  396. }
  397. details.NsIPs = ips
  398. }
  399. }
  400. func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
  401. c.lock.Lock()
  402. defer c.lock.Unlock()
  403. if _, byAddr := c.listens[addr]; byAddr {
  404. if _, byPid := c.listens[addr][pid]; byPid {
  405. if details := c.listens[addr][pid]; details != nil {
  406. details.ClosedAt = time.Now()
  407. }
  408. }
  409. }
  410. }
  411. func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool) {
  412. if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  413. return
  414. }
  415. p := c.processes[pid]
  416. if p == nil {
  417. return
  418. }
  419. if dst.IP().IsLoopback() && !p.isHostNs() {
  420. return
  421. }
  422. actualDst, err := c.getActualDestination(p, src, dst)
  423. if err != nil {
  424. if !common.IsNotExist(err) {
  425. klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  426. }
  427. return
  428. }
  429. switch {
  430. case actualDst == nil:
  431. actualDst = &dst
  432. case actualDst.IP().IsLoopback() && !p.isHostNs():
  433. return
  434. }
  435. if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  436. return
  437. }
  438. c.lock.Lock()
  439. defer c.lock.Unlock()
  440. if failed {
  441. c.connectsFailed[dst]++
  442. } else {
  443. c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
  444. connection := &ActiveConnection{
  445. Dest: dst,
  446. ActualDest: *actualDst,
  447. Pid: pid,
  448. Fd: fd,
  449. Timestamp: timestamp,
  450. }
  451. c.connectionsActive[AddrPair{src: src, dst: dst}] = connection
  452. c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection
  453. }
  454. c.connectLastAttempt[dst] = time.Now()
  455. }
  456. func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) {
  457. if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil {
  458. return actualDst, nil
  459. }
  460. for _, lb := range c.lbConntracks {
  461. if actualDst := lb.GetActualDestination(src, dst); actualDst != nil {
  462. return actualDst, nil
  463. }
  464. }
  465. actualDst := c.hostConntrack.GetActualDestination(src, dst)
  466. if actualDst != nil {
  467. return actualDst, nil
  468. }
  469. if !p.isHostNs() {
  470. if c.nsConntrack == nil {
  471. netNs, err := proc.GetNetNs(p.Pid)
  472. if err != nil {
  473. return nil, err
  474. }
  475. defer netNs.Close()
  476. c.nsConntrack, err = NewConntrack(netNs)
  477. if err != nil {
  478. return nil, err
  479. }
  480. }
  481. return c.nsConntrack.GetActualDestination(src, dst), nil
  482. }
  483. return nil, nil
  484. }
  485. func (c *Container) onConnectionClose(srcDst AddrPair) bool {
  486. c.lock.Lock()
  487. defer c.lock.Unlock()
  488. conn := c.connectionsActive[srcDst]
  489. if conn == nil {
  490. return false
  491. }
  492. conn.Closed = time.Now()
  493. return true
  494. }
  495. func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) {
  496. c.lock.Lock()
  497. defer c.lock.Unlock()
  498. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  499. if conn == nil {
  500. return
  501. }
  502. if timestamp != 0 && conn.Timestamp != timestamp {
  503. return
  504. }
  505. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  506. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  507. switch r.Protocol {
  508. case l7.ProtocolHTTP:
  509. stats.observe(r.Status.Http(), "", r.Duration)
  510. method, path := l7.ParseHttp(r.Payload)
  511. trace.HttpRequest(method, path, r.Status, r.Duration)
  512. case l7.ProtocolHTTP2:
  513. if conn.http2Parser == nil {
  514. conn.http2Parser = l7.NewHttp2Parser()
  515. }
  516. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  517. for _, req := range requests {
  518. stats.observe(req.Status.Http(), "", req.Duration)
  519. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  520. }
  521. case l7.ProtocolPostgres:
  522. if r.Method != l7.MethodStatementClose {
  523. stats.observe(r.Status.String(), "", r.Duration)
  524. }
  525. if conn.postgresParser == nil {
  526. conn.postgresParser = l7.NewPostgresParser()
  527. }
  528. query := conn.postgresParser.Parse(r.Payload)
  529. trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  530. case l7.ProtocolMysql:
  531. if r.Method != l7.MethodStatementClose {
  532. stats.observe(r.Status.String(), "", r.Duration)
  533. }
  534. if conn.mysqlParser == nil {
  535. conn.mysqlParser = l7.NewMysqlParser()
  536. }
  537. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  538. trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  539. case l7.ProtocolMemcached:
  540. stats.observe(r.Status.String(), "", r.Duration)
  541. cmd, items := l7.ParseMemcached(r.Payload)
  542. trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  543. case l7.ProtocolRedis:
  544. stats.observe(r.Status.String(), "", r.Duration)
  545. cmd, args := l7.ParseRedis(r.Payload)
  546. trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  547. case l7.ProtocolMongo:
  548. stats.observe(r.Status.String(), "", r.Duration)
  549. query := l7.ParseMongo(r.Payload)
  550. trace.MongoQuery(query, r.Status.Error(), r.Duration)
  551. case l7.ProtocolKafka, l7.ProtocolCassandra:
  552. stats.observe(r.Status.String(), "", r.Duration)
  553. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  554. stats.observe(r.Status.String(), r.Method.String(), 0)
  555. case l7.ProtocolDubbo2:
  556. stats.observe(r.Status.String(), "", r.Duration)
  557. }
  558. }
  559. func (c *Container) onRetransmit(srcDst AddrPair) bool {
  560. c.lock.Lock()
  561. defer c.lock.Unlock()
  562. conn, ok := c.connectionsActive[srcDst]
  563. if !ok {
  564. return false
  565. }
  566. c.retransmits[AddrPair{src: srcDst.dst, dst: conn.ActualDest}]++
  567. return true
  568. }
  569. func (c *Container) updateDelays() {
  570. c.delaysLock.Lock()
  571. defer c.delaysLock.Unlock()
  572. for pid := range c.processes {
  573. stats, err := TaskstatsTGID(pid)
  574. if err != nil {
  575. continue
  576. }
  577. d := c.delaysByPid[pid]
  578. c.delays.cpu += stats.CPUDelay - d.cpu
  579. c.delays.disk += stats.BlockIODelay - d.disk
  580. d.cpu = stats.CPUDelay
  581. d.disk = stats.BlockIODelay
  582. c.delaysByPid[pid] = d
  583. }
  584. }
  585. func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
  586. if len(c.mounts) == 0 {
  587. return nil
  588. }
  589. res := map[string]map[string]*proc.FSStat{}
  590. for _, mi := range c.mounts {
  591. var stat *proc.FSStat
  592. for pid := range c.processes {
  593. s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
  594. if err == nil {
  595. stat = &s
  596. break
  597. }
  598. }
  599. if stat == nil {
  600. continue
  601. }
  602. if _, ok := res[mi.MajorMinor]; !ok {
  603. res[mi.MajorMinor] = map[string]*proc.FSStat{}
  604. }
  605. res[mi.MajorMinor][mi.MountPoint] = stat
  606. }
  607. return res
  608. }
  609. func (c *Container) getListens() map[netaddr.IPPort]int {
  610. res := map[netaddr.IPPort]int{}
  611. for addr, byPid := range c.listens {
  612. open := 0
  613. isHostNs := false
  614. ips := map[netaddr.IP]bool{}
  615. for pid, details := range byPid {
  616. p := c.processes[pid]
  617. if p == nil {
  618. continue
  619. }
  620. if p.isHostNs() {
  621. isHostNs = true
  622. }
  623. if details.ClosedAt.IsZero() {
  624. open = 1
  625. }
  626. for _, ip := range details.NsIPs {
  627. ips[ip] = true
  628. }
  629. }
  630. if !addr.IP().IsUnspecified() {
  631. ips = map[netaddr.IP]bool{addr.IP(): true}
  632. }
  633. for ip := range ips {
  634. if ip.IsLoopback() && !isHostNs {
  635. continue
  636. }
  637. res[netaddr.IPPortFrom(ip, addr.Port())] = open
  638. }
  639. }
  640. return res
  641. }
  642. func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {
  643. if len(c.metadata.hostListens) == 0 {
  644. return nil
  645. }
  646. hasUnspecified := false
  647. for _, addrs := range c.metadata.hostListens {
  648. for _, addr := range addrs {
  649. if addr.IP().IsUnspecified() {
  650. hasUnspecified = true
  651. break
  652. }
  653. }
  654. }
  655. var hostIps []netaddr.IP
  656. if hasUnspecified {
  657. if ns, err := proc.GetHostNetNs(); err != nil {
  658. klog.Warningln(err)
  659. } else {
  660. ips, err := proc.GetNsIps(ns)
  661. _ = ns.Close()
  662. if err != nil {
  663. klog.Warningln(err)
  664. } else {
  665. hostIps = ips
  666. }
  667. }
  668. }
  669. res := map[string]map[netaddr.IPPort]struct{}{}
  670. for proxy, addrs := range c.metadata.hostListens {
  671. res[proxy] = map[netaddr.IPPort]struct{}{}
  672. for _, addr := range addrs {
  673. if addr.IP().IsUnspecified() {
  674. for _, ip := range hostIps {
  675. if addr.IP().Is4() && ip.Is4() || addr.IP().Is6() && ip.Is6() {
  676. res[proxy][netaddr.IPPortFrom(ip, addr.Port())] = struct{}{}
  677. }
  678. }
  679. } else {
  680. res[proxy][addr] = struct{}{}
  681. }
  682. }
  683. }
  684. return res
  685. }
  686. func (c *Container) ping() map[netaddr.IP]float64 {
  687. netNs := netns.None()
  688. for pid := range c.processes {
  689. if pid == agentPid {
  690. netNs = selfNetNs
  691. break
  692. }
  693. ns, err := proc.GetNetNs(pid)
  694. if err != nil {
  695. if !common.IsNotExist(err) {
  696. klog.Warningln(err)
  697. }
  698. continue
  699. }
  700. netNs = ns
  701. defer netNs.Close()
  702. break
  703. }
  704. if !netNs.IsOpen() {
  705. return nil
  706. }
  707. ips := map[netaddr.IP]struct{}{}
  708. for d := range c.connectsSuccessful {
  709. ips[d.dst.IP()] = struct{}{}
  710. }
  711. for dst := range c.connectsFailed {
  712. ips[dst.IP()] = struct{}{}
  713. }
  714. if len(ips) == 0 {
  715. return nil
  716. }
  717. targets := make([]netaddr.IP, 0, len(ips))
  718. for ip := range ips {
  719. targets = append(targets, ip)
  720. }
  721. rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout)
  722. if err != nil {
  723. klog.Warningln(err)
  724. return nil
  725. }
  726. return rtt
  727. }
  728. func (c *Container) runLogParser(logPath string) {
  729. if *flags.DisableLogParsing {
  730. return
  731. }
  732. containerId := string(c.id)
  733. if logPath != "" {
  734. if c.logParsers[logPath] != nil {
  735. return
  736. }
  737. ch := make(chan logparser.LogEntry)
  738. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  739. reader, err := logs.NewTailReader(proc.HostPath(logPath), ch)
  740. if err != nil {
  741. klog.Warningln(err)
  742. parser.Stop()
  743. return
  744. }
  745. klog.InfoS("started varlog logparser", "cg", c.cgroup.Id, "log", logPath)
  746. c.logParsers[logPath] = &LogParser{parser: parser, stop: reader.Stop}
  747. return
  748. }
  749. switch c.cgroup.ContainerType {
  750. case cgroup.ContainerTypeSystemdService:
  751. ch := make(chan logparser.LogEntry)
  752. if err := JournaldSubscribe(c.cgroup, ch); err != nil {
  753. klog.Warningln(err)
  754. return
  755. }
  756. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  757. stop := func() {
  758. JournaldUnsubscribe(c.cgroup)
  759. }
  760. klog.InfoS("started journald logparser", "cg", c.cgroup.Id)
  761. c.logParsers["journald"] = &LogParser{parser: parser, stop: stop}
  762. case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd, cgroup.ContainerTypeCrio:
  763. if c.metadata.logPath == "" {
  764. return
  765. }
  766. if parser := c.logParsers["stdout/stderr"]; parser != nil {
  767. parser.Stop()
  768. delete(c.logParsers, "stdout/stderr")
  769. }
  770. ch := make(chan logparser.LogEntry)
  771. parser := logparser.NewParser(ch, c.metadata.logDecoder, logs.OtelLogEmitter(containerId))
  772. reader, err := logs.NewTailReader(proc.HostPath(c.metadata.logPath), ch)
  773. if err != nil {
  774. klog.Warningln(err)
  775. parser.Stop()
  776. return
  777. }
  778. klog.InfoS("started container logparser", "cg", c.cgroup.Id)
  779. c.logParsers["stdout/stderr"] = &LogParser{parser: parser, stop: reader.Stop}
  780. }
  781. }
  782. func (c *Container) gc(now time.Time) {
  783. c.lock.Lock()
  784. defer c.lock.Unlock()
  785. established := map[AddrPair]struct{}{}
  786. establishedDst := map[netaddr.IPPort]struct{}{}
  787. listens := map[netaddr.IPPort]string{}
  788. seenNamespaces := map[string]bool{}
  789. for _, p := range c.processes {
  790. if seenNamespaces[p.NetNsId] {
  791. continue
  792. }
  793. sockets, err := proc.GetSockets(p.Pid)
  794. if err != nil {
  795. continue
  796. }
  797. for _, s := range sockets {
  798. if s.Listen {
  799. listens[s.SAddr] = s.Inode
  800. } else {
  801. established[AddrPair{src: s.SAddr, dst: s.DAddr}] = struct{}{}
  802. establishedDst[s.DAddr] = struct{}{}
  803. }
  804. }
  805. seenNamespaces[p.NetNsId] = true
  806. }
  807. for ns := range c.ipsByNs {
  808. if !seenNamespaces[ns] {
  809. delete(c.ipsByNs, ns)
  810. }
  811. }
  812. c.revalidateListens(now, listens)
  813. for srcDst, conn := range c.connectionsActive {
  814. if _, ok := established[srcDst]; !ok {
  815. delete(c.connectionsActive, srcDst)
  816. delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  817. continue
  818. }
  819. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  820. delete(c.connectionsActive, srcDst)
  821. delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  822. }
  823. }
  824. for dst, at := range c.connectLastAttempt {
  825. _, active := establishedDst[dst]
  826. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  827. delete(c.connectLastAttempt, dst)
  828. delete(c.connectsFailed, dst)
  829. for d := range c.connectsSuccessful {
  830. if d.src == dst {
  831. delete(c.connectsSuccessful, d)
  832. }
  833. }
  834. for d := range c.retransmits {
  835. if d.src == dst {
  836. delete(c.retransmits, d)
  837. }
  838. }
  839. c.l7Stats.delete(dst)
  840. }
  841. }
  842. }
  843. func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.IPPort]string) {
  844. for addr, byPid := range c.listens {
  845. if _, open := actualListens[addr]; open {
  846. continue
  847. }
  848. klog.Warningln("deleting the outdated listen:", addr)
  849. for _, details := range byPid {
  850. if details.ClosedAt.IsZero() {
  851. details.ClosedAt = now
  852. }
  853. }
  854. }
  855. missingListens := map[netaddr.IPPort]string{}
  856. for addr, inode := range actualListens {
  857. byPids, found := c.listens[addr]
  858. if !found {
  859. missingListens[addr] = inode
  860. continue
  861. }
  862. open := false
  863. for _, details := range byPids {
  864. if details.ClosedAt.IsZero() {
  865. open = true
  866. break
  867. }
  868. }
  869. if !open {
  870. missingListens[addr] = inode
  871. }
  872. }
  873. if len(missingListens) > 0 {
  874. inodeToPid := map[string]uint32{}
  875. for pid := range c.processes {
  876. fds, err := proc.ReadFds(pid)
  877. if err != nil {
  878. continue
  879. }
  880. for _, fd := range fds {
  881. if fd.SocketInode != "" {
  882. inodeToPid[fd.SocketInode] = pid
  883. }
  884. }
  885. }
  886. for addr, inode := range missingListens {
  887. pid, found := inodeToPid[inode]
  888. if !found {
  889. continue
  890. }
  891. klog.Warningln("missing listen found:", addr, pid)
  892. c.onListenOpen(pid, addr, true)
  893. }
  894. }
  895. for addr, pids := range c.listens {
  896. for pid, details := range pids {
  897. if !details.ClosedAt.IsZero() && now.Sub(details.ClosedAt) > gcInterval {
  898. delete(c.listens[addr], pid)
  899. }
  900. }
  901. if len(c.listens[addr]) == 0 {
  902. delete(c.listens, addr)
  903. }
  904. }
  905. }
  906. func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) {
  907. p := c.processes[pid]
  908. if p == nil {
  909. return
  910. }
  911. if !p.openSslUprobesChecked {
  912. p.uprobes = append(p.uprobes, tracer.AttachOpenSslUprobes(pid)...)
  913. p.openSslUprobesChecked = true
  914. }
  915. if !p.goTlsUprobesChecked {
  916. p.uprobes = append(p.uprobes, tracer.AttachGoTlsUprobes(pid)...)
  917. p.goTlsUprobesChecked = true
  918. }
  919. }
  920. func resolveFd(pid uint32, fd uint64) (mntId string, logPath string) {
  921. info := proc.GetFdInfo(pid, fd)
  922. if info == nil {
  923. return
  924. }
  925. switch {
  926. case info.Flags&os.O_WRONLY == 0 && info.Flags&os.O_RDWR == 0,
  927. !strings.HasPrefix(info.Dest, "/"),
  928. strings.HasPrefix(info.Dest, "/proc/"),
  929. strings.HasPrefix(info.Dest, "/dev/"),
  930. strings.HasPrefix(info.Dest, "/sys/"),
  931. strings.HasSuffix(info.Dest, "(deleted)"):
  932. return
  933. }
  934. mntId = info.MntId
  935. if info.Flags&os.O_WRONLY != 0 && strings.HasPrefix(info.Dest, "/var/log/") &&
  936. !strings.HasPrefix(info.Dest, "/var/log/pods/") &&
  937. !strings.HasPrefix(info.Dest, "/var/log/containers/") &&
  938. !strings.HasPrefix(info.Dest, "/var/log/journal/") {
  939. logPath = info.Dest
  940. }
  941. return
  942. }
  943. func counter(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  944. return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labelValues...)
  945. }
  946. func gauge(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  947. return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...)
  948. }