container.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783
  1. package containers
  2. import (
  3. "github.com/coroot/coroot-node-agent/cgroup"
  4. "github.com/coroot/coroot-node-agent/common"
  5. "github.com/coroot/coroot-node-agent/flags"
  6. "github.com/coroot/coroot-node-agent/logs"
  7. "github.com/coroot/coroot-node-agent/node"
  8. "github.com/coroot/coroot-node-agent/pinger"
  9. "github.com/coroot/coroot-node-agent/proc"
  10. "github.com/coroot/logparser"
  11. "github.com/prometheus/client_golang/prometheus"
  12. "github.com/vishvananda/netns"
  13. "inet.af/netaddr"
  14. "k8s.io/klog/v2"
  15. "os"
  16. "strings"
  17. "sync"
  18. "time"
  19. )
  20. var (
  21. gcInterval = 10 * time.Minute
  22. pingTimeout = 300 * time.Millisecond
  23. )
  24. type ContainerID string
  25. type ContainerMetadata struct {
  26. name string
  27. labels map[string]string
  28. volumes map[string]Volume
  29. logPath string
  30. logDecoder logparser.Decoder
  31. hostListens map[string][]netaddr.IPPort
  32. }
  33. type Volume struct {
  34. provisioner string
  35. volume string
  36. }
  37. type Delays struct {
  38. cpu time.Duration
  39. disk time.Duration
  40. }
  41. type LogParser struct {
  42. parser *logparser.Parser
  43. stop func()
  44. }
  45. func (p *LogParser) Stop() {
  46. if p.stop != nil {
  47. p.stop()
  48. }
  49. p.parser.Stop()
  50. }
  51. type AddrPair struct {
  52. src netaddr.IPPort
  53. dst netaddr.IPPort
  54. }
  55. type Container struct {
  56. cgroup *cgroup.Cgroup
  57. metadata *ContainerMetadata
  58. pids map[uint32]time.Time // pid -> start time
  59. startedAt time.Time
  60. zombieAt time.Time
  61. restarts int
  62. delays Delays
  63. delaysByPid map[uint32]Delays
  64. delaysLock sync.Mutex
  65. listens map[netaddr.IPPort]map[uint32]time.Time // listen addr -> pid -> close time
  66. connectsSuccessful map[AddrPair]int // dst:actual_dst -> count
  67. connectsFailed map[netaddr.IPPort]int // dst -> count
  68. connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  69. connectionsActive map[AddrPair]netaddr.IPPort // src:dst -> actual_dst
  70. retransmits map[AddrPair]int // dst:actual_dst -> count
  71. oomKills int
  72. mountIds map[string]struct{}
  73. logParsers map[string]*LogParser
  74. lock sync.RWMutex
  75. done chan struct{}
  76. }
  77. func NewContainer(cg *cgroup.Cgroup, md *ContainerMetadata) *Container {
  78. c := &Container{
  79. cgroup: cg,
  80. metadata: md,
  81. pids: map[uint32]time.Time{},
  82. delaysByPid: map[uint32]Delays{},
  83. listens: map[netaddr.IPPort]map[uint32]time.Time{},
  84. connectsSuccessful: map[AddrPair]int{},
  85. connectsFailed: map[netaddr.IPPort]int{},
  86. connectLastAttempt: map[netaddr.IPPort]time.Time{},
  87. connectionsActive: map[AddrPair]netaddr.IPPort{},
  88. retransmits: map[AddrPair]int{},
  89. mountIds: map[string]struct{}{},
  90. logParsers: map[string]*LogParser{},
  91. done: make(chan struct{}),
  92. }
  93. c.runLogParser("")
  94. go func() {
  95. ticker := time.NewTicker(gcInterval)
  96. defer ticker.Stop()
  97. for {
  98. select {
  99. case <-c.done:
  100. return
  101. case t := <-ticker.C:
  102. c.gc(t)
  103. }
  104. }
  105. }()
  106. return c
  107. }
  108. func (c *Container) Close() {
  109. for _, p := range c.logParsers {
  110. p.Stop()
  111. }
  112. close(c.done)
  113. }
  114. func (c *Container) Dead(now time.Time) bool {
  115. return !c.zombieAt.IsZero() && now.Sub(c.zombieAt) > gcInterval
  116. }
  117. func (c *Container) Describe(ch chan<- *prometheus.Desc) {
  118. for _, m := range metricsList {
  119. ch <- m
  120. }
  121. }
  122. func (c *Container) Collect(ch chan<- prometheus.Metric) {
  123. c.lock.RLock()
  124. defer c.lock.RUnlock()
  125. ch <- counter(metrics.Restarts, float64(c.restarts))
  126. if cpu, err := c.cgroup.CpuStat(); err == nil {
  127. if cpu.LimitCores > 0 {
  128. ch <- gauge(metrics.CPULimit, cpu.LimitCores)
  129. }
  130. ch <- counter(metrics.CPUUsage, cpu.UsageSeconds)
  131. ch <- counter(metrics.ThrottledTime, cpu.ThrottledTimeSeconds)
  132. }
  133. if taskstatsClient != nil {
  134. c.updateDelays()
  135. ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second))
  136. ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second))
  137. }
  138. if s, err := c.cgroup.MemoryStat(); err == nil {
  139. ch <- gauge(metrics.MemoryRss, float64(s.RSS))
  140. ch <- gauge(metrics.MemoryCache, float64(s.Cache))
  141. if s.Limit > 0 {
  142. ch <- gauge(metrics.MemoryLimit, float64(s.Limit))
  143. }
  144. }
  145. if c.oomKills > 0 {
  146. ch <- counter(metrics.OOMKills, float64(c.oomKills))
  147. }
  148. if disks, err := node.GetDisks(); err == nil {
  149. ioStat, _ := c.cgroup.IOStat()
  150. for majorMinor, mounts := range c.getMounts() {
  151. dev := disks.GetParentBlockDevice(majorMinor)
  152. if dev == nil {
  153. continue
  154. }
  155. for mountPoint, fsStat := range mounts {
  156. v := c.metadata.volumes[mountPoint]
  157. dls := []string{mountPoint, dev.Name, v.provisioner, v.volume}
  158. ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...)
  159. ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...)
  160. ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...)
  161. if io, ok := ioStat[majorMinor]; ok {
  162. ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...)
  163. ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...)
  164. ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...)
  165. ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...)
  166. }
  167. }
  168. }
  169. }
  170. netNs := netns.None()
  171. for pid := range c.pids {
  172. if pid == agentPid {
  173. netNs = selfNetNs
  174. break
  175. }
  176. ns, err := proc.GetNetNs(pid)
  177. if err != nil {
  178. if !common.IsNotExist(err) {
  179. klog.Warningln(err)
  180. }
  181. continue
  182. }
  183. netNs = ns
  184. defer netNs.Close()
  185. break
  186. }
  187. listens := c.getListens(netNs)
  188. for addr, open := range listens {
  189. ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "")
  190. }
  191. for proxy, addrs := range c.getProxiedListens() {
  192. for addr := range addrs {
  193. ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy)
  194. }
  195. }
  196. for d, count := range c.connectsSuccessful {
  197. ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String())
  198. }
  199. for dst, count := range c.connectsFailed {
  200. ch <- counter(metrics.NetConnectsFailed, float64(count), dst.String())
  201. }
  202. for d, count := range c.retransmits {
  203. ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String())
  204. }
  205. connections := map[AddrPair]int{}
  206. for c, actualDst := range c.connectionsActive {
  207. connections[AddrPair{src: c.dst, dst: actualDst}]++
  208. }
  209. for d, count := range connections {
  210. ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
  211. }
  212. for source, p := range c.logParsers {
  213. for _, c := range p.parser.GetCounters() {
  214. ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
  215. }
  216. }
  217. appTypes := map[string]struct{}{}
  218. for pid := range c.pids {
  219. cmdline := proc.GetCmdline(pid)
  220. if len(cmdline) == 0 {
  221. continue
  222. }
  223. appType := guessApplicationType(cmdline)
  224. if appType == "" {
  225. continue
  226. }
  227. appTypes[appType] = struct{}{}
  228. }
  229. for appType := range appTypes {
  230. ch <- gauge(metrics.ApplicationType, 1, appType)
  231. }
  232. if !*flags.NoPingUpstreams {
  233. for ip, rtt := range c.ping(netNs) {
  234. ch <- gauge(metrics.NetLatency, rtt, ip.String())
  235. }
  236. }
  237. }
  238. func (c *Container) onProcessStart(pid uint32) {
  239. c.lock.Lock()
  240. defer c.lock.Unlock()
  241. stats, err := TaskstatsPID(pid)
  242. if err != nil {
  243. return
  244. }
  245. c.zombieAt = time.Time{}
  246. c.pids[pid] = stats.BeginTime
  247. if c.startedAt.IsZero() {
  248. c.startedAt = stats.BeginTime
  249. } else {
  250. min := stats.BeginTime
  251. for _, t := range c.pids {
  252. if t.Before(min) {
  253. min = t
  254. }
  255. }
  256. if min.After(c.startedAt) {
  257. c.restarts++
  258. c.startedAt = min
  259. }
  260. }
  261. }
  262. func (c *Container) onProcessExit(pid uint32, oomKill bool) {
  263. c.lock.Lock()
  264. defer c.lock.Unlock()
  265. delete(c.pids, pid)
  266. if len(c.pids) == 0 {
  267. c.zombieAt = time.Now()
  268. }
  269. delete(c.delaysByPid, pid)
  270. if oomKill {
  271. c.oomKills++
  272. }
  273. }
  274. func (c *Container) onFileOpen(pid uint32, fd uint32) {
  275. mntId, logPath := resolveFd(pid, fd)
  276. c.lock.Lock()
  277. defer c.lock.Unlock()
  278. if mntId != "" {
  279. c.mountIds[mntId] = struct{}{}
  280. }
  281. if logPath != "" {
  282. c.runLogParser(logPath)
  283. }
  284. }
  285. func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
  286. if !safe {
  287. c.lock.Lock()
  288. defer c.lock.Unlock()
  289. }
  290. if _, ok := c.listens[addr]; !ok {
  291. c.listens[addr] = map[uint32]time.Time{}
  292. }
  293. c.listens[addr][pid] = time.Time{}
  294. }
  295. func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
  296. c.lock.Lock()
  297. defer c.lock.Unlock()
  298. if _, byAddr := c.listens[addr]; byAddr {
  299. if _, byPid := c.listens[addr][pid]; byPid {
  300. c.listens[addr][pid] = time.Now()
  301. }
  302. }
  303. }
  304. func (c *Container) onConnectionOpen(pid uint32, src, dst netaddr.IPPort, failed bool) {
  305. if dst.IP().IsLoopback() {
  306. netNs, err := proc.GetNetNs(pid)
  307. isHostNs := err == nil && hostNetNsId == netNs.UniqueId()
  308. netNs.Close()
  309. if !isHostNs {
  310. return
  311. }
  312. } else {
  313. whitelisted := false
  314. for _, prefix := range flags.ExternalNetworksWhitelist {
  315. if prefix.Contains(dst.IP()) {
  316. whitelisted = true
  317. break
  318. }
  319. }
  320. if !whitelisted && !common.IsIpPrivate(dst.IP()) {
  321. return
  322. }
  323. }
  324. c.lock.Lock()
  325. defer c.lock.Unlock()
  326. if failed {
  327. c.connectsFailed[dst]++
  328. } else {
  329. actualDst, err := ConntrackGetActualDestination(src, dst)
  330. if err != nil {
  331. klog.Errorf("failed to resolve actual destination for %s->%s: %s", src, dst, err)
  332. } else if actualDst.IsValid() {
  333. c.connectsSuccessful[AddrPair{src: dst, dst: actualDst}]++
  334. c.connectionsActive[AddrPair{src: src, dst: dst}] = actualDst
  335. } else {
  336. klog.Errorf("invalid actual destination for %s->%s: %s", src, dst, actualDst)
  337. }
  338. }
  339. c.connectLastAttempt[dst] = time.Now()
  340. }
  341. func (c *Container) onConnectionClose(srcDst AddrPair) bool {
  342. c.lock.Lock()
  343. defer c.lock.Unlock()
  344. if _, ok := c.connectionsActive[srcDst]; !ok {
  345. return false
  346. }
  347. delete(c.connectionsActive, srcDst)
  348. return true
  349. }
  350. func (c *Container) onRetransmit(srcDst AddrPair) bool {
  351. c.lock.Lock()
  352. defer c.lock.Unlock()
  353. actualDst, ok := c.connectionsActive[srcDst]
  354. if !ok {
  355. return false
  356. }
  357. c.retransmits[AddrPair{src: srcDst.dst, dst: actualDst}]++
  358. return true
  359. }
  360. func (c *Container) updateDelays() {
  361. c.delaysLock.Lock()
  362. defer c.delaysLock.Unlock()
  363. for pid := range c.pids {
  364. stats, err := TaskstatsTGID(pid)
  365. if err != nil {
  366. continue
  367. }
  368. d := c.delaysByPid[pid]
  369. c.delays.cpu += stats.CPUDelay - d.cpu
  370. c.delays.disk += stats.BlockIODelay - d.disk
  371. d.cpu = stats.CPUDelay
  372. d.disk = stats.BlockIODelay
  373. c.delaysByPid[pid] = d
  374. }
  375. }
  376. func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
  377. mounts := map[string]proc.MountInfo{}
  378. for p := range c.pids {
  379. mi := proc.GetMountInfo(p)
  380. if mi != nil {
  381. mounts = mi
  382. break
  383. }
  384. }
  385. for mountId := range mounts {
  386. if _, ok := c.mountIds[mountId]; !ok {
  387. delete(mounts, mountId)
  388. }
  389. }
  390. if len(mounts) == 0 {
  391. return nil
  392. }
  393. res := map[string]map[string]*proc.FSStat{}
  394. for _, mi := range mounts {
  395. var stat *proc.FSStat
  396. for pid := range c.pids {
  397. s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
  398. if err == nil {
  399. stat = &s
  400. break
  401. }
  402. }
  403. if stat == nil {
  404. continue
  405. }
  406. if _, ok := res[mi.MajorMinor]; !ok {
  407. res[mi.MajorMinor] = map[string]*proc.FSStat{}
  408. }
  409. res[mi.MajorMinor][mi.MountPoint] = stat
  410. }
  411. return res
  412. }
  413. func (c *Container) getListens(netNs netns.NsHandle) map[netaddr.IPPort]int {
  414. if !netNs.IsOpen() {
  415. return nil
  416. }
  417. isHostNs := hostNetNsId == netNs.UniqueId()
  418. res := map[netaddr.IPPort]int{}
  419. for addr, byPid := range c.listens {
  420. open := 0
  421. for _, closedAt := range byPid {
  422. if closedAt.IsZero() {
  423. open = 1
  424. break
  425. }
  426. }
  427. var ips []netaddr.IP
  428. if addr.IP().IsUnspecified() {
  429. if nsIps, err := proc.GetNsIps(netNs); err != nil {
  430. klog.Warningln(err)
  431. } else {
  432. ips = nsIps
  433. }
  434. } else {
  435. ips = []netaddr.IP{addr.IP()}
  436. }
  437. for _, ip := range ips {
  438. if ip.IsLoopback() && !isHostNs {
  439. continue
  440. }
  441. res[netaddr.IPPortFrom(ip, addr.Port())] = open
  442. }
  443. }
  444. return res
  445. }
  446. func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {
  447. if len(c.metadata.hostListens) == 0 {
  448. return nil
  449. }
  450. hasUnspecified := false
  451. for _, addrs := range c.metadata.hostListens {
  452. for _, addr := range addrs {
  453. if addr.IP().IsUnspecified() {
  454. hasUnspecified = true
  455. break
  456. }
  457. }
  458. }
  459. var hostIps []netaddr.IP
  460. if hasUnspecified {
  461. if ns, err := proc.GetHostNetNs(); err != nil {
  462. klog.Warningln(err)
  463. } else {
  464. ips, err := proc.GetNsIps(ns)
  465. _ = ns.Close()
  466. if err != nil {
  467. klog.Warningln(err)
  468. } else {
  469. hostIps = ips
  470. }
  471. }
  472. }
  473. res := map[string]map[netaddr.IPPort]struct{}{}
  474. for proxy, addrs := range c.metadata.hostListens {
  475. res[proxy] = map[netaddr.IPPort]struct{}{}
  476. for _, addr := range addrs {
  477. if addr.IP().IsUnspecified() {
  478. for _, ip := range hostIps {
  479. if addr.IP().Is4() && ip.Is4() || addr.IP().Is6() && ip.Is6() {
  480. res[proxy][netaddr.IPPortFrom(ip, addr.Port())] = struct{}{}
  481. }
  482. }
  483. } else {
  484. res[proxy][addr] = struct{}{}
  485. }
  486. }
  487. }
  488. return res
  489. }
  490. func (c *Container) ping(netNs netns.NsHandle) map[netaddr.IP]float64 {
  491. if !netNs.IsOpen() {
  492. return nil
  493. }
  494. ips := map[netaddr.IP]struct{}{}
  495. for d := range c.connectsSuccessful {
  496. ips[d.dst.IP()] = struct{}{}
  497. }
  498. for dst := range c.connectsFailed {
  499. ips[dst.IP()] = struct{}{}
  500. }
  501. if len(ips) == 0 {
  502. return nil
  503. }
  504. targets := make([]netaddr.IP, 0, len(ips))
  505. for ip := range ips {
  506. targets = append(targets, ip)
  507. }
  508. rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout)
  509. if err != nil {
  510. klog.Warningln(err)
  511. return nil
  512. }
  513. return rtt
  514. }
  515. func (c *Container) runLogParser(logPath string) {
  516. if *flags.NoParseLogs {
  517. return
  518. }
  519. switch {
  520. case logPath != "":
  521. if c.logParsers[logPath] != nil {
  522. return
  523. }
  524. ch := make(chan logparser.LogEntry)
  525. parser := logparser.NewParser(ch, nil)
  526. reader, err := logs.NewTailReader(proc.HostPath(logPath), ch)
  527. if err != nil {
  528. klog.Warningln(err)
  529. parser.Stop()
  530. return
  531. }
  532. klog.InfoS("started varlog logparser", "cg", c.cgroup.Id, "log", logPath)
  533. c.logParsers[logPath] = &LogParser{parser: parser, stop: reader.Stop}
  534. case c.cgroup.ContainerType == cgroup.ContainerTypeSystemdService:
  535. ch := make(chan logparser.LogEntry)
  536. if err := JournaldSubscribe(c.cgroup, ch); err != nil {
  537. klog.Warningln(err)
  538. return
  539. }
  540. parser := logparser.NewParser(ch, nil)
  541. stop := func() {
  542. JournaldUnsubscribe(c.cgroup)
  543. }
  544. klog.InfoS("started journald logparser", "cg", c.cgroup.Id)
  545. c.logParsers["journald"] = &LogParser{parser: parser, stop: stop}
  546. case c.cgroup.ContainerType == cgroup.ContainerTypeDocker:
  547. if c.metadata.logPath == "" {
  548. return
  549. }
  550. if parser := c.logParsers["stdout/stderr"]; parser != nil {
  551. parser.Stop()
  552. delete(c.logParsers, "stdout/stderr")
  553. }
  554. ch := make(chan logparser.LogEntry)
  555. parser := logparser.NewParser(ch, c.metadata.logDecoder)
  556. reader, err := logs.NewTailReader(proc.HostPath(c.metadata.logPath), ch)
  557. if err != nil {
  558. klog.Warningln(err)
  559. parser.Stop()
  560. return
  561. }
  562. klog.InfoS("started container logparser", "cg", c.cgroup.Id)
  563. c.logParsers["stdout/stderr"] = &LogParser{parser: parser, stop: reader.Stop}
  564. }
  565. }
  566. func (c *Container) gc(now time.Time) {
  567. c.lock.Lock()
  568. defer c.lock.Unlock()
  569. established := map[AddrPair]struct{}{}
  570. establishedDst := map[netaddr.IPPort]struct{}{}
  571. listens := map[netaddr.IPPort]string{}
  572. for pid := range c.pids {
  573. sockets, err := proc.GetSockets(pid)
  574. if err != nil {
  575. continue
  576. }
  577. for _, s := range sockets {
  578. if s.Listen {
  579. listens[s.SAddr] = s.Inode
  580. } else {
  581. established[AddrPair{src: s.SAddr, dst: s.DAddr}] = struct{}{}
  582. establishedDst[s.DAddr] = struct{}{}
  583. }
  584. }
  585. break
  586. }
  587. c.revalidateListens(now, listens)
  588. for srcDst := range c.connectionsActive {
  589. if _, ok := established[srcDst]; !ok {
  590. delete(c.connectionsActive, srcDst)
  591. }
  592. }
  593. for dst, at := range c.connectLastAttempt {
  594. _, active := establishedDst[dst]
  595. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  596. delete(c.connectLastAttempt, dst)
  597. delete(c.connectsFailed, dst)
  598. for d := range c.connectsSuccessful {
  599. if d.src == dst {
  600. delete(c.connectsSuccessful, d)
  601. }
  602. }
  603. for d := range c.retransmits {
  604. if d.src == dst {
  605. delete(c.retransmits, d)
  606. }
  607. }
  608. }
  609. }
  610. }
  611. func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.IPPort]string) {
  612. for addr, byPid := range c.listens {
  613. if _, open := actualListens[addr]; open {
  614. continue
  615. }
  616. klog.Warningln("deleting the outdated listen:", addr)
  617. for pid, closedAt := range byPid {
  618. if closedAt.IsZero() {
  619. byPid[pid] = now
  620. }
  621. }
  622. }
  623. missingListens := map[netaddr.IPPort]string{}
  624. for addr, inode := range actualListens {
  625. byPids, found := c.listens[addr]
  626. if !found {
  627. missingListens[addr] = inode
  628. continue
  629. }
  630. open := false
  631. for _, closedAt := range byPids {
  632. if closedAt.IsZero() {
  633. open = true
  634. break
  635. }
  636. }
  637. if !open {
  638. missingListens[addr] = inode
  639. }
  640. }
  641. if len(missingListens) > 0 {
  642. inodeToPid := map[string]uint32{}
  643. for pid := range c.pids {
  644. fds, err := proc.ReadFds(pid)
  645. if err != nil {
  646. continue
  647. }
  648. for _, fd := range fds {
  649. if fd.SocketInode != "" {
  650. inodeToPid[fd.SocketInode] = pid
  651. }
  652. }
  653. }
  654. for addr, inode := range missingListens {
  655. pid, found := inodeToPid[inode]
  656. if !found {
  657. klog.Errorln("failed to determine pid for listen:", addr)
  658. continue
  659. }
  660. klog.Warningln("missing listen found:", addr, pid)
  661. c.onListenOpen(pid, addr, true)
  662. }
  663. }
  664. for addr, pids := range c.listens {
  665. for pid, closedAt := range pids {
  666. if !closedAt.IsZero() && now.Sub(closedAt) > gcInterval {
  667. delete(c.listens[addr], pid)
  668. }
  669. }
  670. if len(c.listens[addr]) == 0 {
  671. delete(c.listens, addr)
  672. }
  673. }
  674. }
  675. func resolveFd(pid uint32, fd uint32) (mntId string, logPath string) {
  676. info := proc.GetFdInfo(pid, fd)
  677. if info == nil {
  678. return
  679. }
  680. switch {
  681. case info.Flags&os.O_WRONLY == 0 && info.Flags&os.O_RDWR == 0,
  682. !strings.HasPrefix(info.Dest, "/"),
  683. strings.HasPrefix(info.Dest, "/proc/"),
  684. strings.HasPrefix(info.Dest, "/dev/"),
  685. strings.HasPrefix(info.Dest, "/sys/"),
  686. strings.HasSuffix(info.Dest, "(deleted)"):
  687. return
  688. }
  689. mntId = info.MntId
  690. if info.Flags&os.O_WRONLY != 0 && strings.HasPrefix(info.Dest, "/var/log/") &&
  691. !strings.HasPrefix(info.Dest, "/var/log/pods/") &&
  692. !strings.HasPrefix(info.Dest, "/var/log/containers/") &&
  693. !strings.HasPrefix(info.Dest, "/var/log/journal/") {
  694. logPath = info.Dest
  695. }
  696. return
  697. }
  698. func counter(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  699. return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labelValues...)
  700. }
  701. func gauge(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  702. return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...)
  703. }