container.go 25 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058
  1. package containers
  2. import (
  3. "github.com/cilium/ebpf/link"
  4. "github.com/coroot/coroot-node-agent/cgroup"
  5. "github.com/coroot/coroot-node-agent/common"
  6. "github.com/coroot/coroot-node-agent/ebpftracer"
  7. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  8. "github.com/coroot/coroot-node-agent/flags"
  9. "github.com/coroot/coroot-node-agent/logs"
  10. "github.com/coroot/coroot-node-agent/node"
  11. "github.com/coroot/coroot-node-agent/pinger"
  12. "github.com/coroot/coroot-node-agent/proc"
  13. "github.com/coroot/coroot-node-agent/tracing"
  14. "github.com/coroot/logparser"
  15. "github.com/prometheus/client_golang/prometheus"
  16. "github.com/vishvananda/netns"
  17. "inet.af/netaddr"
  18. "k8s.io/klog/v2"
  19. "os"
  20. "strings"
  21. "sync"
  22. "time"
  23. )
  24. var (
  25. gcInterval = 10 * time.Minute
  26. pingTimeout = 300 * time.Millisecond
  27. )
  28. type ContainerID string
  29. type ContainerNetwork struct {
  30. NetworkID string
  31. }
  32. type ContainerMetadata struct {
  33. name string
  34. labels map[string]string
  35. volumes map[string]string
  36. logPath string
  37. image string
  38. logDecoder logparser.Decoder
  39. hostListens map[string][]netaddr.IPPort
  40. networks map[string]ContainerNetwork
  41. }
  42. type Delays struct {
  43. cpu time.Duration
  44. disk time.Duration
  45. }
  46. type LogParser struct {
  47. parser *logparser.Parser
  48. stop func()
  49. }
  50. func (p *LogParser) Stop() {
  51. if p.stop != nil {
  52. p.stop()
  53. }
  54. p.parser.Stop()
  55. }
  56. type AddrPair struct {
  57. src netaddr.IPPort
  58. dst netaddr.IPPort
  59. }
  60. type ActiveConnection struct {
  61. Dest netaddr.IPPort
  62. ActualDest netaddr.IPPort
  63. Pid uint32
  64. Fd uint64
  65. Timestamp uint64
  66. Closed time.Time
  67. http2Parser *l7.Http2Parser
  68. postgresParser *l7.PostgresParser
  69. mysqlParser *l7.MysqlParser
  70. }
  71. type ListenDetails struct {
  72. ClosedAt time.Time
  73. NsIPs []netaddr.IP
  74. }
  75. type Process struct {
  76. Pid uint32
  77. StartedAt time.Time
  78. NetNsId string
  79. uprobes []link.Link
  80. goTlsUprobesChecked bool
  81. openSslUprobesChecked bool
  82. }
  83. func (p *Process) isHostNs() bool {
  84. return p.NetNsId == hostNetNsId
  85. }
  86. type PidFd struct {
  87. Pid uint32
  88. Fd uint64
  89. }
  90. type Container struct {
  91. id ContainerID
  92. cgroup *cgroup.Cgroup
  93. metadata *ContainerMetadata
  94. processes map[uint32]*Process
  95. startedAt time.Time
  96. zombieAt time.Time
  97. restarts int
  98. delays Delays
  99. delaysByPid map[uint32]Delays
  100. delaysLock sync.Mutex
  101. listens map[netaddr.IPPort]map[uint32]*ListenDetails
  102. ipsByNs map[string][]netaddr.IP
  103. connectsSuccessful map[AddrPair]int64 // dst:actual_dst -> count
  104. connectsFailed map[netaddr.IPPort]int64 // dst -> count
  105. connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  106. connectionsActive map[AddrPair]*ActiveConnection
  107. connectionsByPidFd map[PidFd]*ActiveConnection
  108. retransmits map[AddrPair]int64 // dst:actual_dst -> count
  109. l7Stats L7Stats
  110. oomKills int
  111. mounts map[string]proc.MountInfo
  112. logParsers map[string]*LogParser
  113. hostConntrack *Conntrack
  114. nsConntrack *Conntrack
  115. lbConntracks []*Conntrack
  116. lock sync.RWMutex
  117. done chan struct{}
  118. }
  119. func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32) (*Container, error) {
  120. netNs, err := proc.GetNetNs(pid)
  121. if err != nil {
  122. return nil, err
  123. }
  124. defer netNs.Close()
  125. c := &Container{
  126. id: id,
  127. cgroup: cg,
  128. metadata: md,
  129. processes: map[uint32]*Process{},
  130. delaysByPid: map[uint32]Delays{},
  131. listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
  132. ipsByNs: map[string][]netaddr.IP{},
  133. connectsSuccessful: map[AddrPair]int64{},
  134. connectsFailed: map[netaddr.IPPort]int64{},
  135. connectLastAttempt: map[netaddr.IPPort]time.Time{},
  136. connectionsActive: map[AddrPair]*ActiveConnection{},
  137. connectionsByPidFd: map[PidFd]*ActiveConnection{},
  138. retransmits: map[AddrPair]int64{},
  139. l7Stats: L7Stats{},
  140. mounts: map[string]proc.MountInfo{},
  141. logParsers: map[string]*LogParser{},
  142. hostConntrack: hostConntrack,
  143. done: make(chan struct{}),
  144. }
  145. for _, n := range md.networks {
  146. if nsHandle := FindNetworkLoadBalancerNs(n.NetworkID); nsHandle.IsOpen() {
  147. if ct, err := NewConntrack(nsHandle); err != nil {
  148. klog.Warningln(err)
  149. } else {
  150. c.lbConntracks = append(c.lbConntracks, ct)
  151. }
  152. _ = nsHandle.Close()
  153. }
  154. }
  155. c.runLogParser("")
  156. go func() {
  157. ticker := time.NewTicker(gcInterval)
  158. defer ticker.Stop()
  159. for {
  160. select {
  161. case <-c.done:
  162. return
  163. case t := <-ticker.C:
  164. c.gc(t)
  165. }
  166. }
  167. }()
  168. return c, nil
  169. }
  170. func (c *Container) Close() {
  171. for _, p := range c.logParsers {
  172. p.Stop()
  173. }
  174. for _, ct := range c.lbConntracks {
  175. _ = ct.Close()
  176. }
  177. if c.nsConntrack != nil {
  178. _ = c.nsConntrack.Close()
  179. }
  180. close(c.done)
  181. }
  182. func (c *Container) Dead(now time.Time) bool {
  183. return !c.zombieAt.IsZero() && now.Sub(c.zombieAt) > gcInterval
  184. }
  185. func (c *Container) Describe(ch chan<- *prometheus.Desc) {
  186. // some fixed metric description is required here to register/unregister the collector correctly
  187. ch <- prometheus.NewDesc("container", "", nil, nil)
  188. }
  189. func (c *Container) Collect(ch chan<- prometheus.Metric) {
  190. c.lock.RLock()
  191. defer c.lock.RUnlock()
  192. if c.metadata.image != "" {
  193. ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image)
  194. }
  195. ch <- counter(metrics.Restarts, float64(c.restarts))
  196. if cpu, err := c.cgroup.CpuStat(); err == nil {
  197. if cpu.LimitCores > 0 {
  198. ch <- gauge(metrics.CPULimit, cpu.LimitCores)
  199. }
  200. ch <- counter(metrics.CPUUsage, cpu.UsageSeconds)
  201. ch <- counter(metrics.ThrottledTime, cpu.ThrottledTimeSeconds)
  202. }
  203. if taskstatsClient != nil {
  204. c.updateDelays()
  205. ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second))
  206. ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second))
  207. }
  208. if s, err := c.cgroup.MemoryStat(); err == nil {
  209. ch <- gauge(metrics.MemoryRss, float64(s.RSS))
  210. ch <- gauge(metrics.MemoryCache, float64(s.Cache))
  211. if s.Limit > 0 {
  212. ch <- gauge(metrics.MemoryLimit, float64(s.Limit))
  213. }
  214. }
  215. if c.oomKills > 0 {
  216. ch <- counter(metrics.OOMKills, float64(c.oomKills))
  217. }
  218. if disks, err := node.GetDisks(); err == nil {
  219. ioStat, _ := c.cgroup.IOStat()
  220. for majorMinor, mounts := range c.getMounts() {
  221. dev := disks.GetParentBlockDevice(majorMinor)
  222. if dev == nil {
  223. continue
  224. }
  225. for mountPoint, fsStat := range mounts {
  226. dls := []string{mountPoint, dev.Name, c.metadata.volumes[mountPoint]}
  227. ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...)
  228. ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...)
  229. ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...)
  230. if io, ok := ioStat[majorMinor]; ok {
  231. ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...)
  232. ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...)
  233. ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...)
  234. ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...)
  235. }
  236. }
  237. }
  238. }
  239. for addr, open := range c.getListens() {
  240. ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "")
  241. }
  242. for proxy, addrs := range c.getProxiedListens() {
  243. for addr := range addrs {
  244. ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy)
  245. }
  246. }
  247. for d, count := range c.connectsSuccessful {
  248. ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String())
  249. }
  250. for dst, count := range c.connectsFailed {
  251. ch <- counter(metrics.NetConnectsFailed, float64(count), dst.String())
  252. }
  253. for d, count := range c.retransmits {
  254. ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String())
  255. }
  256. connections := map[AddrPair]int{}
  257. for addrPair, conn := range c.connectionsActive {
  258. if !conn.Closed.IsZero() {
  259. continue
  260. }
  261. connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest}]++
  262. }
  263. for d, count := range connections {
  264. ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
  265. }
  266. for source, p := range c.logParsers {
  267. for _, c := range p.parser.GetCounters() {
  268. ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
  269. }
  270. }
  271. appTypes := map[string]struct{}{}
  272. seenJvms := map[string]bool{}
  273. for pid := range c.processes {
  274. cmdline := proc.GetCmdline(pid)
  275. if len(cmdline) == 0 {
  276. continue
  277. }
  278. appType := guessApplicationType(cmdline)
  279. if appType != "" {
  280. appTypes[appType] = struct{}{}
  281. }
  282. if isJvm(cmdline) {
  283. jvm, jMetrics := jvmMetrics(pid)
  284. if len(jMetrics) > 0 && !seenJvms[jvm] {
  285. seenJvms[jvm] = true
  286. for _, m := range jMetrics {
  287. ch <- m
  288. }
  289. }
  290. }
  291. }
  292. for appType := range appTypes {
  293. ch <- gauge(metrics.ApplicationType, 1, appType)
  294. }
  295. c.l7Stats.collect(ch)
  296. if !*flags.DisablePinger {
  297. for ip, rtt := range c.ping() {
  298. ch <- gauge(metrics.NetLatency, rtt, ip.String())
  299. }
  300. }
  301. }
  302. func (c *Container) onProcessStart(pid uint32) {
  303. c.lock.Lock()
  304. defer c.lock.Unlock()
  305. stats, err := TaskstatsPID(pid)
  306. if err != nil {
  307. return
  308. }
  309. ns, err := proc.GetNetNs(pid)
  310. if err != nil {
  311. return
  312. }
  313. defer ns.Close()
  314. c.zombieAt = time.Time{}
  315. c.processes[pid] = &Process{Pid: pid, StartedAt: stats.BeginTime, NetNsId: ns.UniqueId()}
  316. if c.startedAt.IsZero() {
  317. c.startedAt = stats.BeginTime
  318. } else {
  319. min := stats.BeginTime
  320. for _, p := range c.processes {
  321. if p.StartedAt.Before(min) {
  322. min = p.StartedAt
  323. }
  324. }
  325. if min.After(c.startedAt) {
  326. c.restarts++
  327. c.startedAt = min
  328. }
  329. }
  330. }
  331. func (c *Container) onProcessExit(pid uint32, oomKill bool) {
  332. c.lock.Lock()
  333. defer c.lock.Unlock()
  334. if p := c.processes[pid]; p != nil {
  335. for _, u := range p.uprobes {
  336. _ = u.Close()
  337. }
  338. }
  339. delete(c.processes, pid)
  340. if len(c.processes) == 0 {
  341. c.zombieAt = time.Now()
  342. }
  343. delete(c.delaysByPid, pid)
  344. if oomKill {
  345. c.oomKills++
  346. }
  347. }
  348. func (c *Container) onFileOpen(pid uint32, fd uint64) {
  349. mntId, logPath := resolveFd(pid, fd)
  350. func() {
  351. if mntId == "" {
  352. return
  353. }
  354. c.lock.Lock()
  355. _, ok := c.mounts[mntId]
  356. c.lock.Unlock()
  357. if ok {
  358. return
  359. }
  360. byMountId := proc.GetMountInfo(pid)
  361. if byMountId == nil {
  362. return
  363. }
  364. if mi, ok := byMountId[mntId]; ok {
  365. c.lock.Lock()
  366. c.mounts[mntId] = mi
  367. c.lock.Unlock()
  368. }
  369. }()
  370. if logPath != "" {
  371. c.lock.Lock()
  372. c.runLogParser(logPath)
  373. c.lock.Unlock()
  374. }
  375. }
  376. func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
  377. if common.PortFilter.ShouldBeSkipped(addr.Port()) {
  378. return
  379. }
  380. if !safe {
  381. c.lock.Lock()
  382. defer c.lock.Unlock()
  383. }
  384. if _, ok := c.listens[addr]; !ok {
  385. c.listens[addr] = map[uint32]*ListenDetails{}
  386. }
  387. details := &ListenDetails{}
  388. c.listens[addr][pid] = details
  389. if addr.IP().IsUnspecified() {
  390. ns, err := proc.GetNetNs(pid)
  391. if err != nil {
  392. if !common.IsNotExist(err) {
  393. klog.Warningln(err)
  394. }
  395. return
  396. }
  397. defer ns.Close()
  398. nsId := ns.UniqueId()
  399. ips, ok := c.ipsByNs[nsId]
  400. if !ok {
  401. if ips, err = proc.GetNsIps(ns); err != nil {
  402. klog.Warningln(err)
  403. } else {
  404. c.ipsByNs[nsId] = ips
  405. }
  406. }
  407. details.NsIPs = ips
  408. }
  409. }
  410. func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
  411. c.lock.Lock()
  412. defer c.lock.Unlock()
  413. if _, byAddr := c.listens[addr]; byAddr {
  414. if _, byPid := c.listens[addr][pid]; byPid {
  415. if details := c.listens[addr][pid]; details != nil {
  416. details.ClosedAt = time.Now()
  417. }
  418. }
  419. }
  420. }
  421. func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool) {
  422. if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  423. return
  424. }
  425. p := c.processes[pid]
  426. if p == nil {
  427. return
  428. }
  429. if dst.IP().IsLoopback() && !p.isHostNs() {
  430. return
  431. }
  432. actualDst, err := c.getActualDestination(p, src, dst)
  433. if err != nil {
  434. if !common.IsNotExist(err) {
  435. klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  436. }
  437. return
  438. }
  439. switch {
  440. case actualDst == nil:
  441. actualDst = &dst
  442. case actualDst.IP().IsLoopback() && !p.isHostNs():
  443. return
  444. }
  445. if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  446. return
  447. }
  448. c.lock.Lock()
  449. defer c.lock.Unlock()
  450. if failed {
  451. c.connectsFailed[dst]++
  452. } else {
  453. c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
  454. connection := &ActiveConnection{
  455. Dest: dst,
  456. ActualDest: *actualDst,
  457. Pid: pid,
  458. Fd: fd,
  459. Timestamp: timestamp,
  460. }
  461. c.connectionsActive[AddrPair{src: src, dst: dst}] = connection
  462. c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection
  463. }
  464. c.connectLastAttempt[dst] = time.Now()
  465. }
  466. func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) {
  467. if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil {
  468. return actualDst, nil
  469. }
  470. for _, lb := range c.lbConntracks {
  471. if actualDst := lb.GetActualDestination(src, dst); actualDst != nil {
  472. return actualDst, nil
  473. }
  474. }
  475. actualDst := c.hostConntrack.GetActualDestination(src, dst)
  476. if actualDst != nil {
  477. return actualDst, nil
  478. }
  479. if !p.isHostNs() {
  480. if c.nsConntrack == nil {
  481. netNs, err := proc.GetNetNs(p.Pid)
  482. if err != nil {
  483. return nil, err
  484. }
  485. defer netNs.Close()
  486. c.nsConntrack, err = NewConntrack(netNs)
  487. if err != nil {
  488. return nil, err
  489. }
  490. }
  491. return c.nsConntrack.GetActualDestination(src, dst), nil
  492. }
  493. return nil, nil
  494. }
  495. func (c *Container) onConnectionClose(srcDst AddrPair) bool {
  496. c.lock.Lock()
  497. defer c.lock.Unlock()
  498. conn := c.connectionsActive[srcDst]
  499. if conn == nil {
  500. return false
  501. }
  502. conn.Closed = time.Now()
  503. return true
  504. }
  505. func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) {
  506. c.lock.Lock()
  507. defer c.lock.Unlock()
  508. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  509. if conn == nil {
  510. return
  511. }
  512. if timestamp != 0 && conn.Timestamp != timestamp {
  513. return
  514. }
  515. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  516. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  517. switch r.Protocol {
  518. case l7.ProtocolHTTP:
  519. stats.observe(r.Status.Http(), "", r.Duration)
  520. method, path := l7.ParseHttp(r.Payload)
  521. trace.HttpRequest(method, path, r.Status, r.Duration)
  522. case l7.ProtocolHTTP2:
  523. if conn.http2Parser == nil {
  524. conn.http2Parser = l7.NewHttp2Parser()
  525. }
  526. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  527. for _, req := range requests {
  528. stats.observe(req.Status.Http(), "", req.Duration)
  529. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  530. }
  531. case l7.ProtocolPostgres:
  532. if r.Method != l7.MethodStatementClose {
  533. stats.observe(r.Status.String(), "", r.Duration)
  534. }
  535. if conn.postgresParser == nil {
  536. conn.postgresParser = l7.NewPostgresParser()
  537. }
  538. query := conn.postgresParser.Parse(r.Payload)
  539. trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  540. case l7.ProtocolMysql:
  541. if r.Method != l7.MethodStatementClose {
  542. stats.observe(r.Status.String(), "", r.Duration)
  543. }
  544. if conn.mysqlParser == nil {
  545. conn.mysqlParser = l7.NewMysqlParser()
  546. }
  547. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  548. trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  549. case l7.ProtocolMemcached:
  550. stats.observe(r.Status.String(), "", r.Duration)
  551. cmd, items := l7.ParseMemcached(r.Payload)
  552. trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  553. case l7.ProtocolRedis:
  554. stats.observe(r.Status.String(), "", r.Duration)
  555. cmd, args := l7.ParseRedis(r.Payload)
  556. trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  557. case l7.ProtocolMongo:
  558. stats.observe(r.Status.String(), "", r.Duration)
  559. query := l7.ParseMongo(r.Payload)
  560. trace.MongoQuery(query, r.Status.Error(), r.Duration)
  561. case l7.ProtocolKafka, l7.ProtocolCassandra:
  562. stats.observe(r.Status.String(), "", r.Duration)
  563. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  564. stats.observe(r.Status.String(), r.Method.String(), 0)
  565. }
  566. }
  567. func (c *Container) onRetransmit(srcDst AddrPair) bool {
  568. c.lock.Lock()
  569. defer c.lock.Unlock()
  570. conn, ok := c.connectionsActive[srcDst]
  571. if !ok {
  572. return false
  573. }
  574. c.retransmits[AddrPair{src: srcDst.dst, dst: conn.ActualDest}]++
  575. return true
  576. }
  577. func (c *Container) updateDelays() {
  578. c.delaysLock.Lock()
  579. defer c.delaysLock.Unlock()
  580. for pid := range c.processes {
  581. stats, err := TaskstatsTGID(pid)
  582. if err != nil {
  583. continue
  584. }
  585. d := c.delaysByPid[pid]
  586. c.delays.cpu += stats.CPUDelay - d.cpu
  587. c.delays.disk += stats.BlockIODelay - d.disk
  588. d.cpu = stats.CPUDelay
  589. d.disk = stats.BlockIODelay
  590. c.delaysByPid[pid] = d
  591. }
  592. }
  593. func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
  594. if len(c.mounts) == 0 {
  595. return nil
  596. }
  597. res := map[string]map[string]*proc.FSStat{}
  598. for _, mi := range c.mounts {
  599. var stat *proc.FSStat
  600. for pid := range c.processes {
  601. s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
  602. if err == nil {
  603. stat = &s
  604. break
  605. }
  606. }
  607. if stat == nil {
  608. continue
  609. }
  610. if _, ok := res[mi.MajorMinor]; !ok {
  611. res[mi.MajorMinor] = map[string]*proc.FSStat{}
  612. }
  613. res[mi.MajorMinor][mi.MountPoint] = stat
  614. }
  615. return res
  616. }
  617. func (c *Container) getListens() map[netaddr.IPPort]int {
  618. res := map[netaddr.IPPort]int{}
  619. for addr, byPid := range c.listens {
  620. open := 0
  621. isHostNs := false
  622. ips := map[netaddr.IP]bool{}
  623. for pid, details := range byPid {
  624. p := c.processes[pid]
  625. if p == nil {
  626. continue
  627. }
  628. if p.isHostNs() {
  629. isHostNs = true
  630. }
  631. if details.ClosedAt.IsZero() {
  632. open = 1
  633. }
  634. for _, ip := range details.NsIPs {
  635. ips[ip] = true
  636. }
  637. }
  638. if !addr.IP().IsUnspecified() {
  639. ips = map[netaddr.IP]bool{addr.IP(): true}
  640. }
  641. for ip := range ips {
  642. if ip.IsLoopback() && !isHostNs {
  643. continue
  644. }
  645. res[netaddr.IPPortFrom(ip, addr.Port())] = open
  646. }
  647. }
  648. return res
  649. }
  650. func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {
  651. if len(c.metadata.hostListens) == 0 {
  652. return nil
  653. }
  654. hasUnspecified := false
  655. for _, addrs := range c.metadata.hostListens {
  656. for _, addr := range addrs {
  657. if addr.IP().IsUnspecified() {
  658. hasUnspecified = true
  659. break
  660. }
  661. }
  662. }
  663. var hostIps []netaddr.IP
  664. if hasUnspecified {
  665. if ns, err := proc.GetHostNetNs(); err != nil {
  666. klog.Warningln(err)
  667. } else {
  668. ips, err := proc.GetNsIps(ns)
  669. _ = ns.Close()
  670. if err != nil {
  671. klog.Warningln(err)
  672. } else {
  673. hostIps = ips
  674. }
  675. }
  676. }
  677. res := map[string]map[netaddr.IPPort]struct{}{}
  678. for proxy, addrs := range c.metadata.hostListens {
  679. res[proxy] = map[netaddr.IPPort]struct{}{}
  680. for _, addr := range addrs {
  681. if addr.IP().IsUnspecified() {
  682. for _, ip := range hostIps {
  683. if addr.IP().Is4() && ip.Is4() || addr.IP().Is6() && ip.Is6() {
  684. res[proxy][netaddr.IPPortFrom(ip, addr.Port())] = struct{}{}
  685. }
  686. }
  687. } else {
  688. res[proxy][addr] = struct{}{}
  689. }
  690. }
  691. }
  692. return res
  693. }
  694. func (c *Container) ping() map[netaddr.IP]float64 {
  695. netNs := netns.None()
  696. for pid := range c.processes {
  697. if pid == agentPid {
  698. netNs = selfNetNs
  699. break
  700. }
  701. ns, err := proc.GetNetNs(pid)
  702. if err != nil {
  703. if !common.IsNotExist(err) {
  704. klog.Warningln(err)
  705. }
  706. continue
  707. }
  708. netNs = ns
  709. defer netNs.Close()
  710. break
  711. }
  712. if !netNs.IsOpen() {
  713. return nil
  714. }
  715. ips := map[netaddr.IP]struct{}{}
  716. for d := range c.connectsSuccessful {
  717. ips[d.dst.IP()] = struct{}{}
  718. }
  719. for dst := range c.connectsFailed {
  720. ips[dst.IP()] = struct{}{}
  721. }
  722. if len(ips) == 0 {
  723. return nil
  724. }
  725. targets := make([]netaddr.IP, 0, len(ips))
  726. for ip := range ips {
  727. targets = append(targets, ip)
  728. }
  729. rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout)
  730. if err != nil {
  731. klog.Warningln(err)
  732. return nil
  733. }
  734. return rtt
  735. }
  736. func (c *Container) runLogParser(logPath string) {
  737. if *flags.DisableLogParsing {
  738. return
  739. }
  740. containerId := string(c.id)
  741. if logPath != "" {
  742. if c.logParsers[logPath] != nil {
  743. return
  744. }
  745. ch := make(chan logparser.LogEntry)
  746. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  747. reader, err := logs.NewTailReader(proc.HostPath(logPath), ch)
  748. if err != nil {
  749. klog.Warningln(err)
  750. parser.Stop()
  751. return
  752. }
  753. klog.InfoS("started varlog logparser", "cg", c.cgroup.Id, "log", logPath)
  754. c.logParsers[logPath] = &LogParser{parser: parser, stop: reader.Stop}
  755. return
  756. }
  757. switch c.cgroup.ContainerType {
  758. case cgroup.ContainerTypeSystemdService:
  759. ch := make(chan logparser.LogEntry)
  760. if err := JournaldSubscribe(c.cgroup, ch); err != nil {
  761. klog.Warningln(err)
  762. return
  763. }
  764. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  765. stop := func() {
  766. JournaldUnsubscribe(c.cgroup)
  767. }
  768. klog.InfoS("started journald logparser", "cg", c.cgroup.Id)
  769. c.logParsers["journald"] = &LogParser{parser: parser, stop: stop}
  770. case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd:
  771. if c.metadata.logPath == "" {
  772. return
  773. }
  774. if parser := c.logParsers["stdout/stderr"]; parser != nil {
  775. parser.Stop()
  776. delete(c.logParsers, "stdout/stderr")
  777. }
  778. ch := make(chan logparser.LogEntry)
  779. parser := logparser.NewParser(ch, c.metadata.logDecoder, logs.OtelLogEmitter(containerId))
  780. reader, err := logs.NewTailReader(proc.HostPath(c.metadata.logPath), ch)
  781. if err != nil {
  782. klog.Warningln(err)
  783. parser.Stop()
  784. return
  785. }
  786. klog.InfoS("started container logparser", "cg", c.cgroup.Id)
  787. c.logParsers["stdout/stderr"] = &LogParser{parser: parser, stop: reader.Stop}
  788. }
  789. }
  790. func (c *Container) gc(now time.Time) {
  791. c.lock.Lock()
  792. defer c.lock.Unlock()
  793. established := map[AddrPair]struct{}{}
  794. establishedDst := map[netaddr.IPPort]struct{}{}
  795. listens := map[netaddr.IPPort]string{}
  796. seenNamespaces := map[string]bool{}
  797. for _, p := range c.processes {
  798. if seenNamespaces[p.NetNsId] {
  799. continue
  800. }
  801. sockets, err := proc.GetSockets(p.Pid)
  802. if err != nil {
  803. continue
  804. }
  805. for _, s := range sockets {
  806. if s.Listen {
  807. listens[s.SAddr] = s.Inode
  808. } else {
  809. established[AddrPair{src: s.SAddr, dst: s.DAddr}] = struct{}{}
  810. establishedDst[s.DAddr] = struct{}{}
  811. }
  812. }
  813. seenNamespaces[p.NetNsId] = true
  814. }
  815. for ns := range c.ipsByNs {
  816. if !seenNamespaces[ns] {
  817. delete(c.ipsByNs, ns)
  818. }
  819. }
  820. c.revalidateListens(now, listens)
  821. for srcDst, conn := range c.connectionsActive {
  822. if _, ok := established[srcDst]; !ok {
  823. delete(c.connectionsActive, srcDst)
  824. delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  825. continue
  826. }
  827. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  828. delete(c.connectionsActive, srcDst)
  829. delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  830. }
  831. }
  832. for dst, at := range c.connectLastAttempt {
  833. _, active := establishedDst[dst]
  834. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  835. delete(c.connectLastAttempt, dst)
  836. delete(c.connectsFailed, dst)
  837. for d := range c.connectsSuccessful {
  838. if d.src == dst {
  839. delete(c.connectsSuccessful, d)
  840. }
  841. }
  842. for d := range c.retransmits {
  843. if d.src == dst {
  844. delete(c.retransmits, d)
  845. }
  846. }
  847. c.l7Stats.delete(dst)
  848. }
  849. }
  850. }
  851. func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.IPPort]string) {
  852. for addr, byPid := range c.listens {
  853. if _, open := actualListens[addr]; open {
  854. continue
  855. }
  856. klog.Warningln("deleting the outdated listen:", addr)
  857. for _, details := range byPid {
  858. if details.ClosedAt.IsZero() {
  859. details.ClosedAt = now
  860. }
  861. }
  862. }
  863. missingListens := map[netaddr.IPPort]string{}
  864. for addr, inode := range actualListens {
  865. byPids, found := c.listens[addr]
  866. if !found {
  867. missingListens[addr] = inode
  868. continue
  869. }
  870. open := false
  871. for _, details := range byPids {
  872. if details.ClosedAt.IsZero() {
  873. open = true
  874. break
  875. }
  876. }
  877. if !open {
  878. missingListens[addr] = inode
  879. }
  880. }
  881. if len(missingListens) > 0 {
  882. inodeToPid := map[string]uint32{}
  883. for pid := range c.processes {
  884. fds, err := proc.ReadFds(pid)
  885. if err != nil {
  886. continue
  887. }
  888. for _, fd := range fds {
  889. if fd.SocketInode != "" {
  890. inodeToPid[fd.SocketInode] = pid
  891. }
  892. }
  893. }
  894. for addr, inode := range missingListens {
  895. pid, found := inodeToPid[inode]
  896. if !found {
  897. continue
  898. }
  899. klog.Warningln("missing listen found:", addr, pid)
  900. c.onListenOpen(pid, addr, true)
  901. }
  902. }
  903. for addr, pids := range c.listens {
  904. for pid, details := range pids {
  905. if !details.ClosedAt.IsZero() && now.Sub(details.ClosedAt) > gcInterval {
  906. delete(c.listens[addr], pid)
  907. }
  908. }
  909. if len(c.listens[addr]) == 0 {
  910. delete(c.listens, addr)
  911. }
  912. }
  913. }
  914. func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) {
  915. p := c.processes[pid]
  916. if p == nil {
  917. return
  918. }
  919. if !p.openSslUprobesChecked {
  920. p.uprobes = append(p.uprobes, tracer.AttachOpenSslUprobes(pid)...)
  921. p.openSslUprobesChecked = true
  922. }
  923. if !p.goTlsUprobesChecked {
  924. p.uprobes = append(p.uprobes, tracer.AttachGoTlsUprobes(pid)...)
  925. p.goTlsUprobesChecked = true
  926. }
  927. }
  928. func resolveFd(pid uint32, fd uint64) (mntId string, logPath string) {
  929. info := proc.GetFdInfo(pid, fd)
  930. if info == nil {
  931. return
  932. }
  933. switch {
  934. case info.Flags&os.O_WRONLY == 0 && info.Flags&os.O_RDWR == 0,
  935. !strings.HasPrefix(info.Dest, "/"),
  936. strings.HasPrefix(info.Dest, "/proc/"),
  937. strings.HasPrefix(info.Dest, "/dev/"),
  938. strings.HasPrefix(info.Dest, "/sys/"),
  939. strings.HasSuffix(info.Dest, "(deleted)"):
  940. return
  941. }
  942. mntId = info.MntId
  943. if info.Flags&os.O_WRONLY != 0 && strings.HasPrefix(info.Dest, "/var/log/") &&
  944. !strings.HasPrefix(info.Dest, "/var/log/pods/") &&
  945. !strings.HasPrefix(info.Dest, "/var/log/containers/") &&
  946. !strings.HasPrefix(info.Dest, "/var/log/journal/") {
  947. logPath = info.Dest
  948. }
  949. return
  950. }
  951. func counter(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  952. return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labelValues...)
  953. }
  954. func gauge(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  955. return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...)
  956. }