container.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783
  1. package containers
  2. import (
  3. "github.com/coroot/coroot-node-agent/cgroup"
  4. "github.com/coroot/coroot-node-agent/common"
  5. "github.com/coroot/coroot-node-agent/flags"
  6. "github.com/coroot/coroot-node-agent/logs"
  7. "github.com/coroot/coroot-node-agent/node"
  8. "github.com/coroot/coroot-node-agent/pinger"
  9. "github.com/coroot/coroot-node-agent/proc"
  10. "github.com/coroot/logparser"
  11. "github.com/prometheus/client_golang/prometheus"
  12. "github.com/vishvananda/netns"
  13. "inet.af/netaddr"
  14. "k8s.io/klog/v2"
  15. "os"
  16. "strings"
  17. "sync"
  18. "time"
  19. )
  20. var (
  21. gcInterval = 10 * time.Minute
  22. pingTimeout = 300 * time.Millisecond
  23. )
  24. type ContainerID string
  25. type ContainerMetadata struct {
  26. name string
  27. labels map[string]string
  28. volumes map[string]string
  29. logPath string
  30. logDecoder logparser.Decoder
  31. hostListens map[string][]netaddr.IPPort
  32. }
  33. type Delays struct {
  34. cpu time.Duration
  35. disk time.Duration
  36. }
  37. type LogParser struct {
  38. parser *logparser.Parser
  39. stop func()
  40. }
  41. func (p *LogParser) Stop() {
  42. if p.stop != nil {
  43. p.stop()
  44. }
  45. p.parser.Stop()
  46. }
  47. type AddrPair struct {
  48. src netaddr.IPPort
  49. dst netaddr.IPPort
  50. }
  51. type ActiveConnection struct {
  52. ActualDest netaddr.IPPort
  53. Pid uint32
  54. Fd uint64
  55. }
  56. type Container struct {
  57. cgroup *cgroup.Cgroup
  58. metadata *ContainerMetadata
  59. pids map[uint32]time.Time // pid -> start time
  60. startedAt time.Time
  61. zombieAt time.Time
  62. restarts int
  63. delays Delays
  64. delaysByPid map[uint32]Delays
  65. delaysLock sync.Mutex
  66. listens map[netaddr.IPPort]map[uint32]time.Time // listen addr -> pid -> close time
  67. connectsSuccessful map[AddrPair]int // dst:actual_dst -> count
  68. connectsFailed map[netaddr.IPPort]int // dst -> count
  69. connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  70. connectionsActive map[AddrPair]ActiveConnection
  71. retransmits map[AddrPair]int // dst:actual_dst -> count
  72. oomKills int
  73. mountIds map[string]struct{}
  74. logParsers map[string]*LogParser
  75. lock sync.RWMutex
  76. done chan struct{}
  77. }
  78. func NewContainer(cg *cgroup.Cgroup, md *ContainerMetadata) *Container {
  79. c := &Container{
  80. cgroup: cg,
  81. metadata: md,
  82. pids: map[uint32]time.Time{},
  83. delaysByPid: map[uint32]Delays{},
  84. listens: map[netaddr.IPPort]map[uint32]time.Time{},
  85. connectsSuccessful: map[AddrPair]int{},
  86. connectsFailed: map[netaddr.IPPort]int{},
  87. connectLastAttempt: map[netaddr.IPPort]time.Time{},
  88. connectionsActive: map[AddrPair]ActiveConnection{},
  89. retransmits: map[AddrPair]int{},
  90. mountIds: map[string]struct{}{},
  91. logParsers: map[string]*LogParser{},
  92. done: make(chan struct{}),
  93. }
  94. c.runLogParser("")
  95. go func() {
  96. ticker := time.NewTicker(gcInterval)
  97. defer ticker.Stop()
  98. for {
  99. select {
  100. case <-c.done:
  101. return
  102. case t := <-ticker.C:
  103. c.gc(t)
  104. }
  105. }
  106. }()
  107. return c
  108. }
  109. func (c *Container) Close() {
  110. for _, p := range c.logParsers {
  111. p.Stop()
  112. }
  113. close(c.done)
  114. }
  115. func (c *Container) Dead(now time.Time) bool {
  116. return !c.zombieAt.IsZero() && now.Sub(c.zombieAt) > gcInterval
  117. }
  118. func (c *Container) Describe(ch chan<- *prometheus.Desc) {
  119. for _, m := range metricsList {
  120. ch <- m
  121. }
  122. }
  123. func (c *Container) Collect(ch chan<- prometheus.Metric) {
  124. c.lock.RLock()
  125. defer c.lock.RUnlock()
  126. ch <- counter(metrics.Restarts, float64(c.restarts))
  127. if cpu, err := c.cgroup.CpuStat(); err == nil {
  128. if cpu.LimitCores > 0 {
  129. ch <- gauge(metrics.CPULimit, cpu.LimitCores)
  130. }
  131. ch <- counter(metrics.CPUUsage, cpu.UsageSeconds)
  132. ch <- counter(metrics.ThrottledTime, cpu.ThrottledTimeSeconds)
  133. }
  134. if taskstatsClient != nil {
  135. c.updateDelays()
  136. ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second))
  137. ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second))
  138. }
  139. if s, err := c.cgroup.MemoryStat(); err == nil {
  140. ch <- gauge(metrics.MemoryRss, float64(s.RSS))
  141. ch <- gauge(metrics.MemoryCache, float64(s.Cache))
  142. if s.Limit > 0 {
  143. ch <- gauge(metrics.MemoryLimit, float64(s.Limit))
  144. }
  145. }
  146. if c.oomKills > 0 {
  147. ch <- counter(metrics.OOMKills, float64(c.oomKills))
  148. }
  149. if disks, err := node.GetDisks(); err == nil {
  150. ioStat, _ := c.cgroup.IOStat()
  151. for majorMinor, mounts := range c.getMounts() {
  152. dev := disks.GetParentBlockDevice(majorMinor)
  153. if dev == nil {
  154. continue
  155. }
  156. for mountPoint, fsStat := range mounts {
  157. dls := []string{mountPoint, dev.Name, c.metadata.volumes[mountPoint]}
  158. ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...)
  159. ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...)
  160. ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...)
  161. if io, ok := ioStat[majorMinor]; ok {
  162. ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...)
  163. ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...)
  164. ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...)
  165. ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...)
  166. }
  167. }
  168. }
  169. }
  170. netNs := netns.None()
  171. for pid := range c.pids {
  172. if pid == agentPid {
  173. netNs = selfNetNs
  174. break
  175. }
  176. ns, err := proc.GetNetNs(pid)
  177. if err != nil {
  178. if !common.IsNotExist(err) {
  179. klog.Warningln(err)
  180. }
  181. continue
  182. }
  183. netNs = ns
  184. defer netNs.Close()
  185. break
  186. }
  187. listens := c.getListens(netNs)
  188. for addr, open := range listens {
  189. ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "")
  190. }
  191. for proxy, addrs := range c.getProxiedListens() {
  192. for addr := range addrs {
  193. ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy)
  194. }
  195. }
  196. for d, count := range c.connectsSuccessful {
  197. ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String())
  198. }
  199. for dst, count := range c.connectsFailed {
  200. ch <- counter(metrics.NetConnectsFailed, float64(count), dst.String())
  201. }
  202. for d, count := range c.retransmits {
  203. ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String())
  204. }
  205. connections := map[AddrPair]int{}
  206. for c, conn := range c.connectionsActive {
  207. connections[AddrPair{src: c.dst, dst: conn.ActualDest}]++
  208. }
  209. for d, count := range connections {
  210. ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
  211. }
  212. for source, p := range c.logParsers {
  213. for _, c := range p.parser.GetCounters() {
  214. ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
  215. }
  216. }
  217. appTypes := map[string]struct{}{}
  218. for pid := range c.pids {
  219. cmdline := proc.GetCmdline(pid)
  220. if len(cmdline) == 0 {
  221. continue
  222. }
  223. appType := guessApplicationType(cmdline)
  224. if appType == "" {
  225. continue
  226. }
  227. appTypes[appType] = struct{}{}
  228. }
  229. for appType := range appTypes {
  230. ch <- gauge(metrics.ApplicationType, 1, appType)
  231. }
  232. if !*flags.NoPingUpstreams {
  233. for ip, rtt := range c.ping(netNs) {
  234. ch <- gauge(metrics.NetLatency, rtt, ip.String())
  235. }
  236. }
  237. }
  238. func (c *Container) onProcessStart(pid uint32) {
  239. c.lock.Lock()
  240. defer c.lock.Unlock()
  241. stats, err := TaskstatsPID(pid)
  242. if err != nil {
  243. return
  244. }
  245. c.zombieAt = time.Time{}
  246. c.pids[pid] = stats.BeginTime
  247. if c.startedAt.IsZero() {
  248. c.startedAt = stats.BeginTime
  249. } else {
  250. min := stats.BeginTime
  251. for _, t := range c.pids {
  252. if t.Before(min) {
  253. min = t
  254. }
  255. }
  256. if min.After(c.startedAt) {
  257. c.restarts++
  258. c.startedAt = min
  259. }
  260. }
  261. }
  262. func (c *Container) onProcessExit(pid uint32, oomKill bool) {
  263. c.lock.Lock()
  264. defer c.lock.Unlock()
  265. delete(c.pids, pid)
  266. if len(c.pids) == 0 {
  267. c.zombieAt = time.Now()
  268. }
  269. delete(c.delaysByPid, pid)
  270. if oomKill {
  271. c.oomKills++
  272. }
  273. }
  274. func (c *Container) onFileOpen(pid uint32, fd uint64) {
  275. mntId, logPath := resolveFd(pid, fd)
  276. c.lock.Lock()
  277. defer c.lock.Unlock()
  278. if mntId != "" {
  279. c.mountIds[mntId] = struct{}{}
  280. }
  281. if logPath != "" {
  282. c.runLogParser(logPath)
  283. }
  284. }
  285. func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
  286. if !safe {
  287. c.lock.Lock()
  288. defer c.lock.Unlock()
  289. }
  290. if _, ok := c.listens[addr]; !ok {
  291. c.listens[addr] = map[uint32]time.Time{}
  292. }
  293. c.listens[addr][pid] = time.Time{}
  294. }
  295. func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
  296. c.lock.Lock()
  297. defer c.lock.Unlock()
  298. if _, byAddr := c.listens[addr]; byAddr {
  299. if _, byPid := c.listens[addr][pid]; byPid {
  300. c.listens[addr][pid] = time.Now()
  301. }
  302. }
  303. }
  304. func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, failed bool) {
  305. if dst.IP().IsLoopback() {
  306. netNs, err := proc.GetNetNs(pid)
  307. isHostNs := err == nil && hostNetNsId == netNs.UniqueId()
  308. netNs.Close()
  309. if !isHostNs {
  310. return
  311. }
  312. } else {
  313. whitelisted := false
  314. for _, prefix := range flags.ExternalNetworksWhitelist {
  315. if prefix.Contains(dst.IP()) {
  316. whitelisted = true
  317. break
  318. }
  319. }
  320. if !whitelisted && !common.IsIpPrivate(dst.IP()) {
  321. return
  322. }
  323. }
  324. c.lock.Lock()
  325. defer c.lock.Unlock()
  326. if failed {
  327. c.connectsFailed[dst]++
  328. } else {
  329. actualDst := ConntrackGetActualDestination(src, dst)
  330. c.connectsSuccessful[AddrPair{src: dst, dst: actualDst}]++
  331. c.connectionsActive[AddrPair{src: src, dst: dst}] = ActiveConnection{
  332. ActualDest: actualDst,
  333. Pid: pid,
  334. Fd: fd,
  335. }
  336. }
  337. c.connectLastAttempt[dst] = time.Now()
  338. }
  339. func (c *Container) onConnectionClose(srcDst AddrPair) bool {
  340. c.lock.Lock()
  341. defer c.lock.Unlock()
  342. if _, ok := c.connectionsActive[srcDst]; !ok {
  343. return false
  344. }
  345. delete(c.connectionsActive, srcDst)
  346. return true
  347. }
  348. func (c *Container) onRetransmit(srcDst AddrPair) bool {
  349. c.lock.Lock()
  350. defer c.lock.Unlock()
  351. conn, ok := c.connectionsActive[srcDst]
  352. if !ok {
  353. return false
  354. }
  355. c.retransmits[AddrPair{src: srcDst.dst, dst: conn.ActualDest}]++
  356. return true
  357. }
  358. func (c *Container) updateDelays() {
  359. c.delaysLock.Lock()
  360. defer c.delaysLock.Unlock()
  361. for pid := range c.pids {
  362. stats, err := TaskstatsTGID(pid)
  363. if err != nil {
  364. continue
  365. }
  366. d := c.delaysByPid[pid]
  367. c.delays.cpu += stats.CPUDelay - d.cpu
  368. c.delays.disk += stats.BlockIODelay - d.disk
  369. d.cpu = stats.CPUDelay
  370. d.disk = stats.BlockIODelay
  371. c.delaysByPid[pid] = d
  372. }
  373. }
  374. func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
  375. mounts := map[string]proc.MountInfo{}
  376. for p := range c.pids {
  377. mi := proc.GetMountInfo(p)
  378. if mi != nil {
  379. mounts = mi
  380. break
  381. }
  382. }
  383. for mountId := range mounts {
  384. if _, ok := c.mountIds[mountId]; !ok {
  385. delete(mounts, mountId)
  386. }
  387. }
  388. if len(mounts) == 0 {
  389. return nil
  390. }
  391. res := map[string]map[string]*proc.FSStat{}
  392. for _, mi := range mounts {
  393. var stat *proc.FSStat
  394. for pid := range c.pids {
  395. s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
  396. if err == nil {
  397. stat = &s
  398. break
  399. }
  400. }
  401. if stat == nil {
  402. continue
  403. }
  404. if _, ok := res[mi.MajorMinor]; !ok {
  405. res[mi.MajorMinor] = map[string]*proc.FSStat{}
  406. }
  407. res[mi.MajorMinor][mi.MountPoint] = stat
  408. }
  409. return res
  410. }
  411. func (c *Container) getListens(netNs netns.NsHandle) map[netaddr.IPPort]int {
  412. if !netNs.IsOpen() {
  413. return nil
  414. }
  415. isHostNs := hostNetNsId == netNs.UniqueId()
  416. res := map[netaddr.IPPort]int{}
  417. for addr, byPid := range c.listens {
  418. open := 0
  419. for _, closedAt := range byPid {
  420. if closedAt.IsZero() {
  421. open = 1
  422. break
  423. }
  424. }
  425. var ips []netaddr.IP
  426. if addr.IP().IsUnspecified() {
  427. if nsIps, err := proc.GetNsIps(netNs); err != nil {
  428. klog.Warningln(err)
  429. } else {
  430. ips = nsIps
  431. }
  432. } else {
  433. ips = []netaddr.IP{addr.IP()}
  434. }
  435. for _, ip := range ips {
  436. if ip.IsLoopback() && !isHostNs {
  437. continue
  438. }
  439. res[netaddr.IPPortFrom(ip, addr.Port())] = open
  440. }
  441. }
  442. return res
  443. }
  444. func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {
  445. if len(c.metadata.hostListens) == 0 {
  446. return nil
  447. }
  448. hasUnspecified := false
  449. for _, addrs := range c.metadata.hostListens {
  450. for _, addr := range addrs {
  451. if addr.IP().IsUnspecified() {
  452. hasUnspecified = true
  453. break
  454. }
  455. }
  456. }
  457. var hostIps []netaddr.IP
  458. if hasUnspecified {
  459. if ns, err := proc.GetHostNetNs(); err != nil {
  460. klog.Warningln(err)
  461. } else {
  462. ips, err := proc.GetNsIps(ns)
  463. _ = ns.Close()
  464. if err != nil {
  465. klog.Warningln(err)
  466. } else {
  467. hostIps = ips
  468. }
  469. }
  470. }
  471. res := map[string]map[netaddr.IPPort]struct{}{}
  472. for proxy, addrs := range c.metadata.hostListens {
  473. res[proxy] = map[netaddr.IPPort]struct{}{}
  474. for _, addr := range addrs {
  475. if addr.IP().IsUnspecified() {
  476. for _, ip := range hostIps {
  477. if addr.IP().Is4() && ip.Is4() || addr.IP().Is6() && ip.Is6() {
  478. res[proxy][netaddr.IPPortFrom(ip, addr.Port())] = struct{}{}
  479. }
  480. }
  481. } else {
  482. res[proxy][addr] = struct{}{}
  483. }
  484. }
  485. }
  486. return res
  487. }
  488. func (c *Container) ping(netNs netns.NsHandle) map[netaddr.IP]float64 {
  489. if !netNs.IsOpen() {
  490. return nil
  491. }
  492. ips := map[netaddr.IP]struct{}{}
  493. for d := range c.connectsSuccessful {
  494. ips[d.dst.IP()] = struct{}{}
  495. }
  496. for dst := range c.connectsFailed {
  497. ips[dst.IP()] = struct{}{}
  498. }
  499. if len(ips) == 0 {
  500. return nil
  501. }
  502. targets := make([]netaddr.IP, 0, len(ips))
  503. for ip := range ips {
  504. targets = append(targets, ip)
  505. }
  506. rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout)
  507. if err != nil {
  508. klog.Warningln(err)
  509. return nil
  510. }
  511. return rtt
  512. }
  513. func (c *Container) runLogParser(logPath string) {
  514. if *flags.NoParseLogs {
  515. return
  516. }
  517. if logPath != "" {
  518. if c.logParsers[logPath] != nil {
  519. return
  520. }
  521. ch := make(chan logparser.LogEntry)
  522. parser := logparser.NewParser(ch, nil)
  523. reader, err := logs.NewTailReader(proc.HostPath(logPath), ch)
  524. if err != nil {
  525. klog.Warningln(err)
  526. parser.Stop()
  527. return
  528. }
  529. klog.InfoS("started varlog logparser", "cg", c.cgroup.Id, "log", logPath)
  530. c.logParsers[logPath] = &LogParser{parser: parser, stop: reader.Stop}
  531. return
  532. }
  533. switch c.cgroup.ContainerType {
  534. case cgroup.ContainerTypeSystemdService:
  535. ch := make(chan logparser.LogEntry)
  536. if err := JournaldSubscribe(c.cgroup, ch); err != nil {
  537. klog.Warningln(err)
  538. return
  539. }
  540. parser := logparser.NewParser(ch, nil)
  541. stop := func() {
  542. JournaldUnsubscribe(c.cgroup)
  543. }
  544. klog.InfoS("started journald logparser", "cg", c.cgroup.Id)
  545. c.logParsers["journald"] = &LogParser{parser: parser, stop: stop}
  546. case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd:
  547. if c.metadata.logPath == "" {
  548. return
  549. }
  550. if parser := c.logParsers["stdout/stderr"]; parser != nil {
  551. parser.Stop()
  552. delete(c.logParsers, "stdout/stderr")
  553. }
  554. ch := make(chan logparser.LogEntry)
  555. parser := logparser.NewParser(ch, c.metadata.logDecoder)
  556. reader, err := logs.NewTailReader(proc.HostPath(c.metadata.logPath), ch)
  557. if err != nil {
  558. klog.Warningln(err)
  559. parser.Stop()
  560. return
  561. }
  562. klog.InfoS("started container logparser", "cg", c.cgroup.Id)
  563. c.logParsers["stdout/stderr"] = &LogParser{parser: parser, stop: reader.Stop}
  564. }
  565. }
  566. func (c *Container) gc(now time.Time) {
  567. c.lock.Lock()
  568. defer c.lock.Unlock()
  569. established := map[AddrPair]struct{}{}
  570. establishedDst := map[netaddr.IPPort]struct{}{}
  571. listens := map[netaddr.IPPort]string{}
  572. for pid := range c.pids {
  573. sockets, err := proc.GetSockets(pid)
  574. if err != nil {
  575. continue
  576. }
  577. for _, s := range sockets {
  578. if s.Listen {
  579. listens[s.SAddr] = s.Inode
  580. } else {
  581. established[AddrPair{src: s.SAddr, dst: s.DAddr}] = struct{}{}
  582. establishedDst[s.DAddr] = struct{}{}
  583. }
  584. }
  585. break
  586. }
  587. c.revalidateListens(now, listens)
  588. for srcDst := range c.connectionsActive {
  589. if _, ok := established[srcDst]; !ok {
  590. delete(c.connectionsActive, srcDst)
  591. }
  592. }
  593. for dst, at := range c.connectLastAttempt {
  594. _, active := establishedDst[dst]
  595. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  596. delete(c.connectLastAttempt, dst)
  597. delete(c.connectsFailed, dst)
  598. for d := range c.connectsSuccessful {
  599. if d.src == dst {
  600. delete(c.connectsSuccessful, d)
  601. }
  602. }
  603. for d := range c.retransmits {
  604. if d.src == dst {
  605. delete(c.retransmits, d)
  606. }
  607. }
  608. }
  609. }
  610. }
  611. func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.IPPort]string) {
  612. for addr, byPid := range c.listens {
  613. if _, open := actualListens[addr]; open {
  614. continue
  615. }
  616. klog.Warningln("deleting the outdated listen:", addr)
  617. for pid, closedAt := range byPid {
  618. if closedAt.IsZero() {
  619. byPid[pid] = now
  620. }
  621. }
  622. }
  623. missingListens := map[netaddr.IPPort]string{}
  624. for addr, inode := range actualListens {
  625. byPids, found := c.listens[addr]
  626. if !found {
  627. missingListens[addr] = inode
  628. continue
  629. }
  630. open := false
  631. for _, closedAt := range byPids {
  632. if closedAt.IsZero() {
  633. open = true
  634. break
  635. }
  636. }
  637. if !open {
  638. missingListens[addr] = inode
  639. }
  640. }
  641. if len(missingListens) > 0 {
  642. inodeToPid := map[string]uint32{}
  643. for pid := range c.pids {
  644. fds, err := proc.ReadFds(pid)
  645. if err != nil {
  646. continue
  647. }
  648. for _, fd := range fds {
  649. if fd.SocketInode != "" {
  650. inodeToPid[fd.SocketInode] = pid
  651. }
  652. }
  653. }
  654. for addr, inode := range missingListens {
  655. pid, found := inodeToPid[inode]
  656. if !found {
  657. continue
  658. }
  659. klog.Warningln("missing listen found:", addr, pid)
  660. c.onListenOpen(pid, addr, true)
  661. }
  662. }
  663. for addr, pids := range c.listens {
  664. for pid, closedAt := range pids {
  665. if !closedAt.IsZero() && now.Sub(closedAt) > gcInterval {
  666. delete(c.listens[addr], pid)
  667. }
  668. }
  669. if len(c.listens[addr]) == 0 {
  670. delete(c.listens, addr)
  671. }
  672. }
  673. }
  674. func resolveFd(pid uint32, fd uint64) (mntId string, logPath string) {
  675. info := proc.GetFdInfo(pid, fd)
  676. if info == nil {
  677. return
  678. }
  679. switch {
  680. case info.Flags&os.O_WRONLY == 0 && info.Flags&os.O_RDWR == 0,
  681. !strings.HasPrefix(info.Dest, "/"),
  682. strings.HasPrefix(info.Dest, "/proc/"),
  683. strings.HasPrefix(info.Dest, "/dev/"),
  684. strings.HasPrefix(info.Dest, "/sys/"),
  685. strings.HasSuffix(info.Dest, "(deleted)"):
  686. return
  687. }
  688. mntId = info.MntId
  689. if info.Flags&os.O_WRONLY != 0 && strings.HasPrefix(info.Dest, "/var/log/") &&
  690. !strings.HasPrefix(info.Dest, "/var/log/pods/") &&
  691. !strings.HasPrefix(info.Dest, "/var/log/containers/") &&
  692. !strings.HasPrefix(info.Dest, "/var/log/journal/") {
  693. logPath = info.Dest
  694. }
  695. return
  696. }
  697. func counter(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  698. return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labelValues...)
  699. }
  700. func gauge(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  701. return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...)
  702. }