container.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. package containers
  2. import (
  3. "github.com/coroot/coroot-node-agent/cgroup"
  4. "github.com/coroot/coroot-node-agent/common"
  5. "github.com/coroot/coroot-node-agent/flags"
  6. "github.com/coroot/coroot-node-agent/logs"
  7. "github.com/coroot/coroot-node-agent/node"
  8. "github.com/coroot/coroot-node-agent/pinger"
  9. "github.com/coroot/coroot-node-agent/proc"
  10. "github.com/coroot/logparser"
  11. "github.com/prometheus/client_golang/prometheus"
  12. "github.com/vishvananda/netns"
  13. "inet.af/netaddr"
  14. "k8s.io/klog/v2"
  15. "os"
  16. "strings"
  17. "sync"
  18. "time"
  19. )
  20. var (
  21. gcInterval = 10 * time.Minute
  22. pingTimeout = 300 * time.Millisecond
  23. )
  24. type ContainerID string
  25. type ContainerMetadata struct {
  26. name string
  27. labels map[string]string
  28. volumes map[string]Volume
  29. logPath string
  30. logDecoder logparser.Decoder
  31. hostListens map[string][]netaddr.IPPort
  32. }
  33. type Volume struct {
  34. provisioner string
  35. volume string
  36. }
  37. type Delays struct {
  38. cpu time.Duration
  39. disk time.Duration
  40. }
  41. type LogParser struct {
  42. parser *logparser.Parser
  43. stop func()
  44. }
  45. func (p *LogParser) Stop() {
  46. if p.stop != nil {
  47. p.stop()
  48. }
  49. p.parser.Stop()
  50. }
  51. type AddrPair struct {
  52. src netaddr.IPPort
  53. dst netaddr.IPPort
  54. }
  55. type Container struct {
  56. cgroup *cgroup.Cgroup
  57. metadata *ContainerMetadata
  58. pids map[uint32]time.Time // pid -> start time
  59. startedAt time.Time
  60. zombieAt time.Time
  61. restarts int
  62. delays Delays
  63. delaysByPid map[uint32]Delays
  64. delaysLock sync.Mutex
  65. listens map[netaddr.IPPort]map[uint32]time.Time // listen addr -> pid -> close time
  66. connectsSuccessful map[AddrPair]int // dst:actual_dst -> count
  67. connectsFailed map[netaddr.IPPort]int // dst -> count
  68. connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  69. connectionsActive map[AddrPair]netaddr.IPPort // src:dst -> actual_dst
  70. retransmits map[AddrPair]int // dst:actual_dst -> count
  71. oomKills int
  72. mountIds map[string]struct{}
  73. logParsers map[string]*LogParser
  74. lock sync.RWMutex
  75. done chan struct{}
  76. }
  77. func NewContainer(cg *cgroup.Cgroup, md *ContainerMetadata) *Container {
  78. c := &Container{
  79. cgroup: cg,
  80. metadata: md,
  81. pids: map[uint32]time.Time{},
  82. delaysByPid: map[uint32]Delays{},
  83. listens: map[netaddr.IPPort]map[uint32]time.Time{},
  84. connectsSuccessful: map[AddrPair]int{},
  85. connectsFailed: map[netaddr.IPPort]int{},
  86. connectLastAttempt: map[netaddr.IPPort]time.Time{},
  87. connectionsActive: map[AddrPair]netaddr.IPPort{},
  88. retransmits: map[AddrPair]int{},
  89. mountIds: map[string]struct{}{},
  90. logParsers: map[string]*LogParser{},
  91. done: make(chan struct{}),
  92. }
  93. c.runLogParser("")
  94. go func() {
  95. ticker := time.NewTicker(gcInterval)
  96. defer ticker.Stop()
  97. for {
  98. select {
  99. case <-c.done:
  100. return
  101. case t := <-ticker.C:
  102. c.gc(t)
  103. }
  104. }
  105. }()
  106. return c
  107. }
  108. func (c *Container) Close() {
  109. for _, p := range c.logParsers {
  110. p.Stop()
  111. }
  112. close(c.done)
  113. }
  114. func (c *Container) Dead(now time.Time) bool {
  115. return !c.zombieAt.IsZero() && now.Sub(c.zombieAt) > gcInterval
  116. }
  117. func (c *Container) Describe(ch chan<- *prometheus.Desc) {
  118. for _, m := range metricsList {
  119. ch <- m
  120. }
  121. }
  122. func (c *Container) Collect(ch chan<- prometheus.Metric) {
  123. c.lock.RLock()
  124. defer c.lock.RUnlock()
  125. ch <- counter(metrics.Restarts, float64(c.restarts))
  126. if v, err := c.cgroup.CpuQuotaCores(); err == nil && v > 0 {
  127. ch <- gauge(metrics.CPULimit, v)
  128. }
  129. if v, err := c.cgroup.CpuUsageSeconds(); err == nil {
  130. ch <- counter(metrics.CPUUsage, v)
  131. }
  132. if v, err := c.cgroup.ThrottledTimeSeconds(); err == nil {
  133. ch <- counter(metrics.ThrottledTime, v)
  134. }
  135. if taskstatsClient != nil {
  136. c.updateDelays()
  137. ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second))
  138. ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second))
  139. }
  140. if v, err := c.cgroup.MemoryLimitBytes(); err == nil && v > 0 {
  141. ch <- gauge(metrics.MemoryLimit, float64(v))
  142. }
  143. if s, err := c.cgroup.MemoryStat(); err == nil {
  144. ch <- gauge(metrics.MemoryRss, float64(s.RSS))
  145. ch <- gauge(metrics.MemoryCache, float64(s.Cache))
  146. }
  147. if c.oomKills > 0 {
  148. ch <- counter(metrics.OOMKills, float64(c.oomKills))
  149. }
  150. if disks, err := node.GetDisks(); err == nil {
  151. ioStat, _ := c.cgroup.BlkioStat()
  152. for majorMinor, mounts := range c.getMounts() {
  153. dev := disks.GetParentBlockDevice(majorMinor)
  154. if dev == nil {
  155. continue
  156. }
  157. for mountPoint, fsStat := range mounts {
  158. v := c.metadata.volumes[mountPoint]
  159. dls := []string{mountPoint, dev.Name, v.provisioner, v.volume}
  160. ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...)
  161. ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...)
  162. ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...)
  163. if io, ok := ioStat[majorMinor]; ok {
  164. ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...)
  165. ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...)
  166. ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...)
  167. ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...)
  168. }
  169. }
  170. }
  171. }
  172. netNs := netns.None()
  173. for pid := range c.pids {
  174. if pid == agentPid {
  175. netNs = selfNetNs
  176. break
  177. }
  178. ns, err := proc.GetNetNs(pid)
  179. if err != nil {
  180. if !common.IsNotExist(err) {
  181. klog.Warningln(err)
  182. }
  183. continue
  184. }
  185. netNs = ns
  186. defer netNs.Close()
  187. break
  188. }
  189. listens := c.getListens(netNs)
  190. klog.Infof("got listens for %s ns=%s: %v", c.cgroup.Id, netNs.UniqueId(), listens)
  191. for addr, open := range listens {
  192. ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "")
  193. }
  194. for proxy, addrs := range c.getProxiedListens() {
  195. for addr := range addrs {
  196. ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy)
  197. }
  198. }
  199. for d, count := range c.connectsSuccessful {
  200. ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String())
  201. }
  202. for dst, count := range c.connectsFailed {
  203. ch <- counter(metrics.NetConnectsFailed, float64(count), dst.String())
  204. }
  205. for d, count := range c.retransmits {
  206. ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String())
  207. }
  208. connections := map[AddrPair]int{}
  209. for c, actualDst := range c.connectionsActive {
  210. connections[AddrPair{src: c.dst, dst: actualDst}]++
  211. }
  212. for d, count := range connections {
  213. ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
  214. }
  215. for source, p := range c.logParsers {
  216. for _, c := range p.parser.GetCounters() {
  217. ch <- counter(metrics.LogMessages, float64(c.Messages), source, string(c.Level), c.Hash, c.Sample)
  218. }
  219. }
  220. appTypes := map[string]struct{}{}
  221. for pid := range c.pids {
  222. cmdline := proc.GetCmdline(pid)
  223. if len(cmdline) == 0 {
  224. continue
  225. }
  226. appType := guessApplicationType(cmdline)
  227. if appType == "" {
  228. continue
  229. }
  230. appTypes[appType] = struct{}{}
  231. }
  232. for appType := range appTypes {
  233. ch <- gauge(metrics.ApplicationType, 1, appType)
  234. }
  235. if !*flags.NoPingUpstreams {
  236. for ip, rtt := range c.ping(netNs) {
  237. ch <- gauge(metrics.NetLatency, rtt, ip.String())
  238. }
  239. }
  240. }
  241. func (c *Container) onProcessStart(pid uint32) {
  242. c.lock.Lock()
  243. defer c.lock.Unlock()
  244. stats, err := TaskstatsPID(pid)
  245. if err != nil {
  246. return
  247. }
  248. c.zombieAt = time.Time{}
  249. c.pids[pid] = stats.BeginTime
  250. if c.startedAt.IsZero() {
  251. c.startedAt = stats.BeginTime
  252. } else {
  253. min := stats.BeginTime
  254. for _, t := range c.pids {
  255. if t.Before(min) {
  256. min = t
  257. }
  258. }
  259. if min.After(c.startedAt) {
  260. c.restarts++
  261. c.startedAt = min
  262. }
  263. }
  264. }
  265. func (c *Container) onProcessExit(pid uint32, oomKill bool) {
  266. c.lock.Lock()
  267. defer c.lock.Unlock()
  268. delete(c.pids, pid)
  269. if len(c.pids) == 0 {
  270. c.zombieAt = time.Now()
  271. }
  272. delete(c.delaysByPid, pid)
  273. if oomKill {
  274. c.oomKills++
  275. }
  276. }
  277. func (c *Container) onFileOpen(pid uint32, fd uint32) {
  278. mntId, logPath := resolveFd(pid, fd)
  279. c.lock.Lock()
  280. defer c.lock.Unlock()
  281. if mntId != "" {
  282. c.mountIds[mntId] = struct{}{}
  283. }
  284. if logPath != "" {
  285. c.runLogParser(logPath)
  286. }
  287. }
  288. func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort) {
  289. netNs, err := proc.GetNetNs(pid)
  290. isHostNs := err == nil && hostNetNsId == netNs.UniqueId()
  291. _ = netNs.Close()
  292. if addr.IP().IsLoopback() && !isHostNs {
  293. return
  294. }
  295. c.lock.Lock()
  296. defer c.lock.Unlock()
  297. if _, ok := c.listens[addr]; !ok {
  298. c.listens[addr] = map[uint32]time.Time{}
  299. }
  300. c.listens[addr][pid] = time.Time{}
  301. }
  302. func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
  303. c.lock.Lock()
  304. defer c.lock.Unlock()
  305. if _, byAddr := c.listens[addr]; byAddr {
  306. if _, byPid := c.listens[addr][pid]; byPid {
  307. c.listens[addr][pid] = time.Now()
  308. }
  309. }
  310. }
  311. func (c *Container) onConnectionOpen(pid uint32, src, dst netaddr.IPPort, failed bool) {
  312. if dst.IP().IsLoopback() {
  313. netNs, err := proc.GetNetNs(pid)
  314. isHostNs := err == nil && hostNetNsId == netNs.UniqueId()
  315. netNs.Close()
  316. if !isHostNs {
  317. return
  318. }
  319. } else {
  320. whitelisted := false
  321. for _, prefix := range flags.ExternalNetworksWhitelist {
  322. if prefix.Contains(dst.IP()) {
  323. whitelisted = true
  324. break
  325. }
  326. }
  327. if !whitelisted && !common.IsIpPrivate(dst.IP()) {
  328. return
  329. }
  330. }
  331. c.lock.Lock()
  332. defer c.lock.Unlock()
  333. if failed {
  334. c.connectsFailed[dst]++
  335. } else {
  336. actualDst, err := ConntrackGetActualDestination(src, dst)
  337. if err != nil {
  338. klog.Errorf("failed to resolve actual destination for %s->%s: %s", src, dst, err)
  339. } else if actualDst.IsValid() {
  340. c.connectsSuccessful[AddrPair{src: dst, dst: actualDst}]++
  341. c.connectionsActive[AddrPair{src: src, dst: dst}] = actualDst
  342. } else {
  343. klog.Errorf("invalid actual destination for %s->%s: %s", src, dst, actualDst)
  344. }
  345. }
  346. c.connectLastAttempt[dst] = time.Now()
  347. }
  348. func (c *Container) onConnectionClose(srcDst AddrPair) bool {
  349. c.lock.Lock()
  350. defer c.lock.Unlock()
  351. if _, ok := c.connectionsActive[srcDst]; !ok {
  352. return false
  353. }
  354. delete(c.connectionsActive, srcDst)
  355. return true
  356. }
  357. func (c *Container) onRetransmit(srcDst AddrPair) bool {
  358. c.lock.Lock()
  359. defer c.lock.Unlock()
  360. actualDst, ok := c.connectionsActive[srcDst]
  361. if !ok {
  362. return false
  363. }
  364. c.retransmits[AddrPair{src: srcDst.dst, dst: actualDst}]++
  365. return true
  366. }
  367. func (c *Container) updateDelays() {
  368. c.delaysLock.Lock()
  369. defer c.delaysLock.Unlock()
  370. for pid := range c.pids {
  371. stats, err := TaskstatsTGID(pid)
  372. if err != nil {
  373. continue
  374. }
  375. d := c.delaysByPid[pid]
  376. c.delays.cpu += stats.CPUDelay - d.cpu
  377. c.delays.disk += stats.BlockIODelay - d.disk
  378. d.cpu = stats.CPUDelay
  379. d.disk = stats.BlockIODelay
  380. c.delaysByPid[pid] = d
  381. }
  382. }
  383. func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
  384. mounts := map[string]proc.MountInfo{}
  385. for p := range c.pids {
  386. mi := proc.GetMountInfo(p)
  387. if mi != nil {
  388. mounts = mi
  389. break
  390. }
  391. }
  392. for mountId := range mounts {
  393. if _, ok := c.mountIds[mountId]; !ok {
  394. delete(mounts, mountId)
  395. }
  396. }
  397. if len(mounts) == 0 {
  398. return nil
  399. }
  400. res := map[string]map[string]*proc.FSStat{}
  401. for _, mi := range mounts {
  402. var stat *proc.FSStat
  403. for pid := range c.pids {
  404. s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
  405. if err == nil {
  406. stat = &s
  407. break
  408. }
  409. }
  410. if stat == nil {
  411. continue
  412. }
  413. if _, ok := res[mi.MajorMinor]; !ok {
  414. res[mi.MajorMinor] = map[string]*proc.FSStat{}
  415. }
  416. res[mi.MajorMinor][mi.MountPoint] = stat
  417. }
  418. return res
  419. }
  420. func (c *Container) getListens(netNs netns.NsHandle) map[netaddr.IPPort]int {
  421. if !netNs.IsOpen() {
  422. return nil
  423. }
  424. isHostNs := hostNetNsId == netNs.UniqueId()
  425. res := map[netaddr.IPPort]int{}
  426. for addr, byPid := range c.listens {
  427. open := 0
  428. for _, closedAt := range byPid {
  429. if closedAt.IsZero() {
  430. open = 1
  431. break
  432. }
  433. }
  434. var ips []netaddr.IP
  435. if addr.IP().IsUnspecified() {
  436. if nsIps, err := proc.GetNsIps(netNs); err != nil {
  437. klog.Warningln(err)
  438. } else {
  439. ips = nsIps
  440. }
  441. } else {
  442. ips = []netaddr.IP{addr.IP()}
  443. }
  444. for _, ip := range ips {
  445. if ip.IsLoopback() && !isHostNs {
  446. continue
  447. }
  448. res[netaddr.IPPortFrom(ip, addr.Port())] = open
  449. }
  450. }
  451. return res
  452. }
  453. func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {
  454. if len(c.metadata.hostListens) == 0 {
  455. return nil
  456. }
  457. hasUnspecified := false
  458. for _, addrs := range c.metadata.hostListens {
  459. for _, addr := range addrs {
  460. if addr.IP().IsUnspecified() {
  461. hasUnspecified = true
  462. break
  463. }
  464. }
  465. }
  466. var hostIps []netaddr.IP
  467. if hasUnspecified {
  468. if ns, err := proc.GetHostNetNs(); err != nil {
  469. klog.Warningln(err)
  470. } else {
  471. ips, err := proc.GetNsIps(ns)
  472. _ = ns.Close()
  473. if err != nil {
  474. klog.Warningln(err)
  475. } else {
  476. hostIps = ips
  477. }
  478. }
  479. }
  480. res := map[string]map[netaddr.IPPort]struct{}{}
  481. for proxy, addrs := range c.metadata.hostListens {
  482. res[proxy] = map[netaddr.IPPort]struct{}{}
  483. for _, addr := range addrs {
  484. if addr.IP().IsUnspecified() {
  485. for _, ip := range hostIps {
  486. if addr.IP().Is4() && ip.Is4() || addr.IP().Is6() && ip.Is6() {
  487. res[proxy][netaddr.IPPortFrom(ip, addr.Port())] = struct{}{}
  488. }
  489. }
  490. } else {
  491. res[proxy][addr] = struct{}{}
  492. }
  493. }
  494. }
  495. return res
  496. }
  497. func (c *Container) ping(netNs netns.NsHandle) map[netaddr.IP]float64 {
  498. if !netNs.IsOpen() {
  499. return nil
  500. }
  501. ips := map[netaddr.IP]struct{}{}
  502. for d := range c.connectsSuccessful {
  503. ips[d.dst.IP()] = struct{}{}
  504. }
  505. for dst := range c.connectsFailed {
  506. ips[dst.IP()] = struct{}{}
  507. }
  508. if len(ips) == 0 {
  509. return nil
  510. }
  511. targets := make([]netaddr.IP, 0, len(ips))
  512. for ip := range ips {
  513. targets = append(targets, ip)
  514. }
  515. rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout)
  516. if err != nil {
  517. klog.Warningln(err)
  518. return nil
  519. }
  520. return rtt
  521. }
  522. func (c *Container) runLogParser(logPath string) {
  523. if *flags.NoParseLogs {
  524. return
  525. }
  526. switch {
  527. case logPath != "":
  528. if c.logParsers[logPath] != nil {
  529. return
  530. }
  531. ch := make(chan logparser.LogEntry)
  532. parser := logparser.NewParser(ch, nil)
  533. reader, err := logs.NewTailReader(proc.HostPath(logPath), ch)
  534. if err != nil {
  535. klog.Warningln(err)
  536. parser.Stop()
  537. return
  538. }
  539. klog.InfoS("started varlog logparser", "cg", c.cgroup.Id, "log", logPath)
  540. c.logParsers[logPath] = &LogParser{parser: parser, stop: reader.Stop}
  541. case c.cgroup.ContainerType == cgroup.ContainerTypeSystemdService:
  542. ch := make(chan logparser.LogEntry)
  543. if err := JournaldSubscribe(c.cgroup, ch); err != nil {
  544. klog.Warningln(err)
  545. return
  546. }
  547. parser := logparser.NewParser(ch, nil)
  548. stop := func() {
  549. JournaldUnsubscribe(c.cgroup)
  550. }
  551. klog.InfoS("started journald logparser", "cg", c.cgroup.Id)
  552. c.logParsers["journald"] = &LogParser{parser: parser, stop: stop}
  553. case c.cgroup.ContainerType == cgroup.ContainerTypeDocker:
  554. if c.metadata.logPath == "" {
  555. return
  556. }
  557. if parser := c.logParsers["stdout/stderr"]; parser != nil {
  558. parser.Stop()
  559. delete(c.logParsers, "stdout/stderr")
  560. }
  561. ch := make(chan logparser.LogEntry)
  562. parser := logparser.NewParser(ch, c.metadata.logDecoder)
  563. reader, err := logs.NewTailReader(proc.HostPath(c.metadata.logPath), ch)
  564. if err != nil {
  565. klog.Warningln(err)
  566. parser.Stop()
  567. return
  568. }
  569. klog.InfoS("started container logparser", "cg", c.cgroup.Id)
  570. c.logParsers["stdout/stderr"] = &LogParser{parser: parser, stop: reader.Stop}
  571. }
  572. }
  573. func (c *Container) gc(now time.Time) {
  574. c.lock.Lock()
  575. defer c.lock.Unlock()
  576. established := map[AddrPair]struct{}{}
  577. establishedDst := map[netaddr.IPPort]struct{}{}
  578. listens := map[netaddr.IPPort]struct{}{}
  579. for pid := range c.pids {
  580. sockets, err := proc.GetSockets(pid)
  581. if err != nil {
  582. continue
  583. }
  584. for _, s := range sockets {
  585. if s.Listen {
  586. listens[s.SAddr] = struct{}{}
  587. } else {
  588. established[AddrPair{src: s.SAddr, dst: s.DAddr}] = struct{}{}
  589. establishedDst[s.DAddr] = struct{}{}
  590. }
  591. }
  592. break
  593. }
  594. for addr, byPid := range c.listens {
  595. _, open := listens[addr]
  596. if open {
  597. continue
  598. }
  599. for pid, closedAt := range byPid {
  600. if closedAt.IsZero() {
  601. byPid[pid] = now
  602. }
  603. }
  604. }
  605. for pid, addrs := range c.listens {
  606. for addr, closedAt := range addrs {
  607. if !closedAt.IsZero() && now.Sub(closedAt) > gcInterval {
  608. delete(c.listens[pid], addr)
  609. }
  610. }
  611. if len(c.listens[pid]) == 0 {
  612. delete(c.listens, pid)
  613. }
  614. }
  615. for srcDst := range c.connectionsActive {
  616. if _, ok := established[srcDst]; !ok {
  617. delete(c.connectionsActive, srcDst)
  618. }
  619. }
  620. for dst, at := range c.connectLastAttempt {
  621. _, active := establishedDst[dst]
  622. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  623. delete(c.connectLastAttempt, dst)
  624. delete(c.connectsFailed, dst)
  625. for d := range c.connectsSuccessful {
  626. if d.src == dst {
  627. delete(c.connectsSuccessful, d)
  628. }
  629. }
  630. for d := range c.retransmits {
  631. if d.src == dst {
  632. delete(c.retransmits, d)
  633. }
  634. }
  635. }
  636. }
  637. }
  638. func resolveFd(pid uint32, fd uint32) (mntId string, logPath string) {
  639. info := proc.GetFdInfo(pid, fd)
  640. if info == nil {
  641. return
  642. }
  643. switch {
  644. case info.Flags&os.O_WRONLY == 0 && info.Flags&os.O_RDWR == 0,
  645. !strings.HasPrefix(info.Dest, "/"),
  646. strings.HasPrefix(info.Dest, "/proc/"),
  647. strings.HasPrefix(info.Dest, "/dev/"),
  648. strings.HasPrefix(info.Dest, "/sys/"),
  649. strings.HasSuffix(info.Dest, "(deleted)"):
  650. return
  651. }
  652. mntId = info.MntId
  653. if info.Flags&os.O_WRONLY != 0 && strings.HasPrefix(info.Dest, "/var/log/") &&
  654. !strings.HasPrefix(info.Dest, "/var/log/pods/") &&
  655. !strings.HasPrefix(info.Dest, "/var/log/containers/") &&
  656. !strings.HasPrefix(info.Dest, "/var/log/journal/") {
  657. logPath = info.Dest
  658. }
  659. return
  660. }
  661. func counter(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  662. return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labelValues...)
  663. }
  664. func gauge(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  665. return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...)
  666. }