container.go 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345
  1. package containers
  2. import (
  3. debugelf "debug/elf"
  4. . "github.com/coroot/coroot-node-agent/utils/modelse"
  5. "os"
  6. "sort"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/coroot/coroot-node-agent/cgroup"
  11. "github.com/coroot/coroot-node-agent/common"
  12. "github.com/coroot/coroot-node-agent/ebpftracer"
  13. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  14. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  15. "github.com/coroot/coroot-node-agent/flags"
  16. "github.com/coroot/coroot-node-agent/logs"
  17. "github.com/coroot/coroot-node-agent/node"
  18. "github.com/coroot/coroot-node-agent/pinger"
  19. "github.com/coroot/coroot-node-agent/proc"
  20. "github.com/coroot/coroot-node-agent/tracing"
  21. "github.com/coroot/logparser"
  22. "github.com/prometheus/client_golang/prometheus"
  23. klog "github.com/sirupsen/logrus"
  24. "github.com/vishvananda/netns"
  25. "golang.org/x/exp/maps"
  26. "inet.af/netaddr"
  27. )
  28. var (
  29. gcInterval = 10 * time.Second
  30. pingTimeout = 300 * time.Millisecond
  31. )
  32. type ContainerID string
  33. type ContainerNetwork struct {
  34. NetworkID string
  35. }
  36. type ContainerMetadata struct {
  37. name string
  38. labels map[string]string
  39. volumes map[string]string
  40. logPath string
  41. image string
  42. logDecoder logparser.Decoder
  43. hostListens map[string][]netaddr.IPPort
  44. networks map[string]ContainerNetwork
  45. env map[string]string
  46. systemdTriggeredBy string
  47. }
  48. type Delays struct {
  49. cpu time.Duration
  50. disk time.Duration
  51. }
  52. type LogParser struct {
  53. parser *logparser.Parser
  54. stop func()
  55. }
  56. func (p *LogParser) Stop() {
  57. if p.stop != nil {
  58. p.stop()
  59. }
  60. p.parser.Stop()
  61. }
  62. type AddrPair struct {
  63. src netaddr.IPPort
  64. dst netaddr.IPPort
  65. }
  66. type ActiveConnection struct {
  67. Dest netaddr.IPPort
  68. ActualDest netaddr.IPPort
  69. Pid uint32
  70. Fd uint64
  71. Timestamp uint64
  72. Closed time.Time
  73. BytesSent uint64
  74. BytesReceived uint64
  75. http2Parser *l7.Http2Parser
  76. postgresParser *l7.PostgresParser
  77. mysqlParser *l7.MysqlParser
  78. dmParser *l7.DmParser
  79. }
  80. type ListenDetails struct {
  81. ClosedAt time.Time
  82. NsIPs []netaddr.IP
  83. }
  84. type PidFd struct {
  85. Pid uint32
  86. Fd uint64
  87. }
  88. type K8sContainer struct {
  89. ns string
  90. podName string
  91. podId string
  92. workload string
  93. containerName string
  94. pid string
  95. type ConnectionStats struct {
  96. Count uint64
  97. TotalTime time.Duration
  98. Retransmissions uint64
  99. BytesSent uint64
  100. BytesReceived uint64
  101. }
  102. type Container struct {
  103. id ContainerID
  104. cgroup *cgroup.Cgroup
  105. metadata *ContainerMetadata
  106. K8sContainer
  107. processes map[uint32]*Process
  108. startedAt time.Time
  109. zombieAt time.Time
  110. restarts int
  111. delays Delays
  112. delaysByPid map[uint32]Delays
  113. delaysLock sync.Mutex
  114. listens map[netaddr.IPPort]map[uint32]*ListenDetails
  115. connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count
  116. connectsFailed map[netaddr.IPPort]int64 // dst -> count
  117. connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  118. connectionsActive map[AddrPair]*ActiveConnection
  119. connectionsByPidFd map[PidFd]*ActiveConnection
  120. l7Stats L7Stats
  121. dnsStats *L7Metrics
  122. oomKills int
  123. pythonThreadLockWaitTime time.Duration
  124. mounts map[string]proc.MountInfo
  125. logParsers map[string]*LogParser
  126. hostConntrack *Conntrack
  127. nsConntrack *Conntrack
  128. lbConntracks []*Conntrack
  129. registry *Registry
  130. lock sync.RWMutex
  131. done chan struct{}
  132. traceMap map[uint64]*tracing.Trace
  133. //instanceID utils.ID
  134. Symbols []debugelf.Symbol
  135. Uprobes []tracer.Uprobe
  136. UprobesMap map[string]tracer.Uprobe
  137. l7EventReady bool
  138. l7Attach bool
  139. // 白名单详情
  140. WhiteSettingInfo WhiteSettingInfo
  141. // 应用详情
  142. AppInfo AppInfo
  143. }
  144. func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) {
  145. netNs, err := proc.GetNetNs(pid)
  146. if err != nil {
  147. return nil, err
  148. }
  149. defer netNs.Close()
  150. c := &Container{
  151. id: id,
  152. cgroup: cg,
  153. metadata: md,
  154. processes: map[uint32]*Process{},
  155. delaysByPid: map[uint32]Delays{},
  156. listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
  157. connectsSuccessful: map[AddrPair]*ConnectionStats{},
  158. connectsFailed: map[netaddr.IPPort]int64{},
  159. connectLastAttempt: map[netaddr.IPPort]time.Time{},
  160. connectionsActive: map[AddrPair]*ActiveConnection{},
  161. connectionsByPidFd: map[PidFd]*ActiveConnection{},
  162. l7Stats: L7Stats{},
  163. dnsStats: &L7Metrics{},
  164. mounts: map[string]proc.MountInfo{},
  165. logParsers: map[string]*LogParser{},
  166. hostConntrack: hostConntrack,
  167. done: make(chan struct{}),
  168. traceMap: make(map[uint64]*tracing.Trace),
  169. registry: registry,
  170. }
  171. for _, n := range md.networks {
  172. if nsHandle := FindNetworkLoadBalancerNs(n.NetworkID); nsHandle.IsOpen() {
  173. if ct, err := NewConntrack(nsHandle); err != nil {
  174. klog.Warningln(err)
  175. } else {
  176. c.lbConntracks = append(c.lbConntracks, ct)
  177. }
  178. _ = nsHandle.Close()
  179. }
  180. }
  181. c.runLogParser("")
  182. go func() {
  183. ticker := time.NewTicker(gcInterval)
  184. defer ticker.Stop()
  185. for {
  186. select {
  187. case <-c.done:
  188. return
  189. case t := <-ticker.C:
  190. c.gc(t)
  191. }
  192. }
  193. }()
  194. return c, nil
  195. }
  196. func (c *Container) Close() {
  197. for _, p := range c.logParsers {
  198. p.Stop()
  199. }
  200. for _, ct := range c.lbConntracks {
  201. _ = ct.Close()
  202. }
  203. if c.nsConntrack != nil {
  204. _ = c.nsConntrack.Close()
  205. }
  206. close(c.done)
  207. }
  208. func (c *Container) Dead(now time.Time) bool {
  209. return !c.zombieAt.IsZero() && now.Sub(c.zombieAt) > gcInterval
  210. }
  211. func (c *Container) Describe(ch chan<- *prometheus.Desc) {
  212. // some fixed metric description is required here to register/unregister the collector correctly
  213. ch <- prometheus.NewDesc("container", "", nil, nil)
  214. }
  215. func (c *Container) Collect(ch chan<- prometheus.Metric) {
  216. c.registry.updateTrafficStatsIfNecessary()
  217. c.lock.RLock()
  218. defer c.lock.RUnlock()
  219. if c.metadata.image != "" || c.metadata.systemdTriggeredBy != "" {
  220. ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image, c.metadata.systemdTriggeredBy)
  221. }
  222. ch <- counter(metrics.Restarts, float64(c.restarts))
  223. if cpu, err := c.cgroup.CpuStat(); err == nil {
  224. if cpu.LimitCores > 0 {
  225. ch <- gauge(metrics.CPULimit, cpu.LimitCores)
  226. }
  227. ch <- counter(metrics.CPUUsage, cpu.UsageSeconds)
  228. ch <- counter(metrics.ThrottledTime, cpu.ThrottledTimeSeconds)
  229. }
  230. if taskstatsClient != nil {
  231. c.updateDelays()
  232. ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second))
  233. ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second))
  234. }
  235. if s, err := c.cgroup.MemoryStat(); err == nil {
  236. ch <- gauge(metrics.MemoryRss, float64(s.RSS))
  237. ch <- gauge(metrics.MemoryCache, float64(s.Cache))
  238. if s.Limit > 0 {
  239. ch <- gauge(metrics.MemoryLimit, float64(s.Limit))
  240. }
  241. }
  242. if c.oomKills > 0 {
  243. ch <- counter(metrics.OOMKills, float64(c.oomKills))
  244. }
  245. if disks, err := node.GetDisks(); err == nil {
  246. ioStat, _ := c.cgroup.IOStat()
  247. for majorMinor, mounts := range c.getMounts() {
  248. dev := disks.GetParentBlockDevice(majorMinor)
  249. if dev == nil {
  250. continue
  251. }
  252. for mountPoint, fsStat := range mounts {
  253. dls := []string{mountPoint, dev.Name, c.metadata.volumes[mountPoint]}
  254. ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...)
  255. ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...)
  256. ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...)
  257. if io, ok := ioStat[majorMinor]; ok {
  258. ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...)
  259. ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...)
  260. ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...)
  261. ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...)
  262. }
  263. }
  264. }
  265. }
  266. for addr, open := range c.getListens() {
  267. ch <- gauge(metrics.NetListenInfo, float64(open), addr.String())
  268. }
  269. for proxy, addrs := range c.getProxiedListens() {
  270. for addr := range addrs {
  271. ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy)
  272. }
  273. }
  274. for d, stats := range c.connectsSuccessful {
  275. ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), d.src.String(), d.dst.String())
  276. ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), d.src.String(), d.dst.String())
  277. if stats.Retransmissions > 0 {
  278. ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), d.src.String(), d.dst.String())
  279. }
  280. ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String())
  281. ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String())
  282. }
  283. for dst, count := range c.connectsFailed {
  284. ch <- counter(metrics.NetConnectionsFailed, float64(count), dst.String())
  285. }
  286. connections := map[AddrPair]int{}
  287. for addrPair, conn := range c.connectionsActive {
  288. if !conn.Closed.IsZero() {
  289. continue
  290. }
  291. connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest}]++
  292. }
  293. for d, count := range connections {
  294. ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
  295. }
  296. for source, p := range c.logParsers {
  297. for _, c := range p.parser.GetCounters() {
  298. ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
  299. }
  300. }
  301. appTypes := map[string]struct{}{}
  302. seenJvms := map[string]bool{}
  303. seenDotNetApps := map[string]bool{}
  304. pids := maps.Keys(c.processes)
  305. sort.Slice(pids, func(i, j int) bool {
  306. return pids[i] < pids[j]
  307. })
  308. for _, pid := range pids {
  309. process := c.processes[pid]
  310. cmdline := proc.GetCmdline(pid)
  311. if len(cmdline) == 0 {
  312. continue
  313. }
  314. appType := guessApplicationType(cmdline)
  315. if appType != "" {
  316. appTypes[appType] = struct{}{}
  317. }
  318. if process.isGolangApp {
  319. appTypes["golang"] = struct{}{}
  320. }
  321. switch {
  322. case isJvm(cmdline):
  323. jvm, jMetrics := jvmMetrics(pid)
  324. if len(jMetrics) > 0 && !seenJvms[jvm] {
  325. seenJvms[jvm] = true
  326. for _, m := range jMetrics {
  327. ch <- m
  328. }
  329. }
  330. case process.dotNetMonitor != nil:
  331. appTypes["dotnet"] = struct{}{}
  332. appName := process.dotNetMonitor.AppName()
  333. if !seenDotNetApps[appName] {
  334. seenDotNetApps[appName] = true
  335. process.dotNetMonitor.Collect(ch)
  336. }
  337. }
  338. }
  339. for appType := range appTypes {
  340. ch <- gauge(metrics.ApplicationType, 1, appType)
  341. }
  342. if c.pythonThreadLockWaitTime > 0 {
  343. ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds())
  344. }
  345. if c.dnsStats.Requests != nil {
  346. c.dnsStats.Requests.Collect(ch)
  347. }
  348. if c.dnsStats.Latency != nil {
  349. c.dnsStats.Latency.Collect(ch)
  350. }
  351. c.l7Stats.collect(ch)
  352. if !*flags.DisablePinger {
  353. for ip, rtt := range c.ping() {
  354. ch <- gauge(metrics.NetLatency, rtt, ip.String())
  355. }
  356. }
  357. }
  358. func (c *Container) onProcessStart(pid uint32) *Process {
  359. c.lock.Lock()
  360. defer c.lock.Unlock()
  361. stats, err := TaskstatsPID(pid)
  362. if err != nil {
  363. return nil
  364. }
  365. c.zombieAt = time.Time{}
  366. p := NewProcess(pid, stats, c.registry.tracer)
  367. if p == nil {
  368. return nil
  369. }
  370. c.processes[pid] = p
  371. if c.startedAt.IsZero() {
  372. c.startedAt = stats.BeginTime
  373. } else {
  374. min := stats.BeginTime
  375. for _, p := range c.processes {
  376. if p.StartedAt.Before(min) {
  377. min = p.StartedAt
  378. }
  379. }
  380. if min.After(c.startedAt) {
  381. c.restarts++
  382. c.startedAt = min
  383. }
  384. }
  385. return p
  386. }
  387. func (c *Container) onProcessExit(pid uint32, oomKill bool) {
  388. c.lock.Lock()
  389. defer c.lock.Unlock()
  390. if p := c.processes[pid]; p != nil {
  391. p.Close()
  392. }
  393. delete(c.processes, pid)
  394. if len(c.processes) == 0 {
  395. c.zombieAt = time.Now()
  396. }
  397. delete(c.delaysByPid, pid)
  398. if oomKill {
  399. c.oomKills++
  400. }
  401. }
  402. func (c *Container) onFileOpen(pid uint32, fd uint64) {
  403. mntId, logPath := resolveFd(pid, fd)
  404. func() {
  405. if mntId == "" {
  406. return
  407. }
  408. c.lock.Lock()
  409. _, ok := c.mounts[mntId]
  410. c.lock.Unlock()
  411. if ok {
  412. return
  413. }
  414. byMountId := proc.GetMountInfo(pid)
  415. if byMountId == nil {
  416. return
  417. }
  418. if mi, ok := byMountId[mntId]; ok {
  419. c.lock.Lock()
  420. c.mounts[mntId] = mi
  421. c.lock.Unlock()
  422. }
  423. }()
  424. if logPath != "" {
  425. c.lock.Lock()
  426. c.runLogParser(logPath)
  427. c.lock.Unlock()
  428. }
  429. }
  430. // set
  431. func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
  432. klog.Infof("TCP listen open pid=%d id=%s addr=%s", pid, c.id, addr)
  433. if common.PortFilter.ShouldBeSkipped(addr.Port()) {
  434. return
  435. }
  436. if !safe {
  437. c.lock.Lock()
  438. defer c.lock.Unlock()
  439. }
  440. if _, ok := c.listens[addr]; !ok {
  441. c.listens[addr] = map[uint32]*ListenDetails{}
  442. }
  443. details := &ListenDetails{}
  444. c.listens[addr][pid] = details
  445. if addr.IP().IsUnspecified() {
  446. ns, err := proc.GetNetNs(pid)
  447. if err != nil {
  448. if !common.IsNotExist(err) {
  449. klog.Warningln(err)
  450. }
  451. return
  452. }
  453. defer ns.Close()
  454. ips, err := proc.GetNsIps(ns)
  455. if err != nil {
  456. klog.Warningln(err)
  457. return
  458. }
  459. klog.Infof("got IPs %s for %s", ips, ns.UniqueId())
  460. details.NsIPs = ips
  461. }
  462. }
  463. func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
  464. klog.Infof("TCP listen close pid=%d id=%s addr=%s", pid, c.id, addr)
  465. c.lock.Lock()
  466. defer c.lock.Unlock()
  467. if _, byAddr := c.listens[addr]; byAddr {
  468. if _, byPid := c.listens[addr][pid]; byPid {
  469. if details := c.listens[addr][pid]; details != nil {
  470. details.ClosedAt = time.Now()
  471. }
  472. }
  473. }
  474. }
  475. func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  476. if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  477. return
  478. }
  479. p := c.processes[pid]
  480. if p == nil {
  481. return
  482. }
  483. if dst.IP().IsLoopback() && !p.isHostNs() {
  484. return
  485. }
  486. actualDst, err := c.getActualDestination(p, src, dst)
  487. if err != nil {
  488. if !common.IsNotExist(err) {
  489. klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  490. }
  491. return
  492. }
  493. switch {
  494. case actualDst == nil:
  495. actualDst = &dst
  496. case actualDst.IP().IsLoopback() && !p.isHostNs():
  497. return
  498. }
  499. if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  500. return
  501. }
  502. c.lock.Lock()
  503. defer c.lock.Unlock()
  504. if failed {
  505. c.connectsFailed[dst]++
  506. } else {
  507. key := AddrPair{src: dst, dst: *actualDst}
  508. stats := c.connectsSuccessful[key]
  509. if stats == nil {
  510. stats = &ConnectionStats{}
  511. c.connectsSuccessful[key] = stats
  512. }
  513. stats.Count++
  514. stats.TotalTime += duration
  515. connection := &ActiveConnection{
  516. Dest: dst,
  517. ActualDest: *actualDst,
  518. Pid: pid,
  519. Fd: fd,
  520. Timestamp: timestamp,
  521. }
  522. c.connectionsActive[AddrPair{src: src, dst: dst}] = connection
  523. c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection
  524. }
  525. c.connectLastAttempt[dst] = time.Now()
  526. }
  527. func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) {
  528. if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil {
  529. return actualDst, nil
  530. }
  531. for _, lb := range c.lbConntracks {
  532. if actualDst := lb.GetActualDestination(src, dst); actualDst != nil {
  533. return actualDst, nil
  534. }
  535. }
  536. actualDst := c.hostConntrack.GetActualDestination(src, dst)
  537. if actualDst != nil {
  538. return actualDst, nil
  539. }
  540. if !p.isHostNs() {
  541. if c.nsConntrack == nil {
  542. netNs, err := proc.GetNetNs(p.Pid)
  543. if err != nil {
  544. return nil, err
  545. }
  546. defer netNs.Close()
  547. c.nsConntrack, err = NewConntrack(netNs)
  548. if err != nil {
  549. return nil, err
  550. }
  551. }
  552. return c.nsConntrack.GetActualDestination(src, dst), nil
  553. }
  554. return nil, nil
  555. }
  556. func (c *Container) onConnectionClose(e ebpftracer.Event) {
  557. c.lock.Lock()
  558. conn := c.connectionsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  559. c.lock.Unlock()
  560. if conn != nil {
  561. if conn.Closed.IsZero() {
  562. if e.TrafficStats != nil {
  563. c.lock.Lock()
  564. c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived)
  565. c.lock.Unlock()
  566. }
  567. conn.Closed = time.Now()
  568. }
  569. }
  570. }
  571. func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) {
  572. if u == nil {
  573. return
  574. }
  575. c.lock.Lock()
  576. defer c.lock.Unlock()
  577. c.updateConnectionTrafficStats(c.connectionsByPidFd[PidFd{Pid: u.Pid, Fd: u.FD}], u.BytesSent, u.BytesReceived)
  578. }
  579. func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, received uint64) {
  580. if ac == nil {
  581. return
  582. }
  583. key := AddrPair{src: ac.Dest, dst: ac.ActualDest}
  584. stats := c.connectsSuccessful[key]
  585. if stats == nil {
  586. stats = &ConnectionStats{}
  587. c.connectsSuccessful[key] = stats
  588. }
  589. if sent > ac.BytesSent {
  590. stats.BytesSent += sent - ac.BytesSent
  591. }
  592. if received > ac.BytesReceived {
  593. stats.BytesReceived += received - ac.BytesReceived
  594. }
  595. ac.BytesSent = sent
  596. ac.BytesReceived = received
  597. }
  598. func (c *Container) onDNSRequest(r *l7.RequestData) (map[netaddr.IP]string, string, string) {
  599. status := r.Status.DNS()
  600. if status == "" {
  601. return nil, "", ""
  602. }
  603. t, fqdn, ips := l7.ParseDns(r.Payload)
  604. if t == "" {
  605. return nil, "", ""
  606. }
  607. if c.dnsStats.Requests == nil {
  608. dnsReq := L7Requests[l7.ProtocolDNS]
  609. c.dnsStats.Requests = prometheus.NewCounterVec(
  610. prometheus.CounterOpts{Name: dnsReq.Name, Help: dnsReq.Help},
  611. []string{"request_type", "domain", "status"},
  612. )
  613. }
  614. if m, _ := c.dnsStats.Requests.GetMetricWithLabelValues(t, fqdn, status); m != nil {
  615. m.Inc()
  616. }
  617. if r.Duration != 0 {
  618. if c.dnsStats.Latency == nil {
  619. dnsLatency := L7Latency[l7.ProtocolDNS]
  620. c.dnsStats.Latency = prometheus.NewHistogram(prometheus.HistogramOpts{Name: dnsLatency.Name, Help: dnsLatency.Help})
  621. }
  622. c.dnsStats.Latency.Observe(r.Duration.Seconds())
  623. }
  624. ip2fqdn := map[netaddr.IP]string{}
  625. if fqdn != "" {
  626. for _, ip := range ips {
  627. ip2fqdn[ip] = fqdn
  628. }
  629. }
  630. return ip2fqdn, t, fqdn
  631. }
  632. func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  633. c.lock.Lock()
  634. defer c.lock.Unlock()
  635. if r.Protocol == l7.ProtocolDNS {
  636. //return c.onDNSRequest(r)
  637. }
  638. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  639. if conn == nil {
  640. return nil
  641. }
  642. if timestamp != 0 && conn.Timestamp != timestamp {
  643. return nil
  644. }
  645. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  646. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  647. switch r.Protocol {
  648. case l7.ProtocolHTTP:
  649. stats.observe(r.Status.Http(), "", r.Duration)
  650. method, path := l7.ParseHttp(r.Payload)
  651. trace.HttpRequest(method, path, r.Status, r.Duration)
  652. case l7.ProtocolHTTP2:
  653. if conn.http2Parser == nil {
  654. conn.http2Parser = l7.NewHttp2Parser()
  655. }
  656. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  657. for _, req := range requests {
  658. stats.observe(req.Status.Http(), "", req.Duration)
  659. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  660. }
  661. case l7.ProtocolPostgres:
  662. if r.Method != l7.MethodStatementClose {
  663. stats.observe(r.Status.String(), "", r.Duration)
  664. }
  665. if conn.postgresParser == nil {
  666. conn.postgresParser = l7.NewPostgresParser()
  667. }
  668. query := conn.postgresParser.Parse(r.Payload)
  669. trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  670. case l7.ProtocolMysql:
  671. if r.Method != l7.MethodStatementClose {
  672. stats.observe(r.Status.String(), "", r.Duration)
  673. }
  674. if conn.mysqlParser == nil {
  675. conn.mysqlParser = l7.NewMysqlParser()
  676. }
  677. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  678. trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  679. case l7.ProtocolMemcached:
  680. stats.observe(r.Status.String(), "", r.Duration)
  681. cmd, items := l7.ParseMemcached(r.Payload)
  682. trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  683. case l7.ProtocolRedis:
  684. stats.observe(r.Status.String(), "", r.Duration)
  685. cmd, args := l7.ParseRedis(r.Payload)
  686. trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  687. case l7.ProtocolMongo:
  688. stats.observe(r.Status.String(), "", r.Duration)
  689. query := l7.ParseMongo(r.Payload)
  690. trace.MongoQuery(query, r.Status.Error(), r.Duration)
  691. case l7.ProtocolKafka, l7.ProtocolCassandra:
  692. stats.observe(r.Status.String(), "", r.Duration)
  693. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  694. stats.observe(r.Status.String(), r.Method.String(), 0)
  695. case l7.ProtocolDubbo2:
  696. stats.observe(r.Status.String(), "", r.Duration)
  697. }
  698. return nil
  699. }
  700. func (c *Container) onRetransmission(srcDst AddrPair) bool {
  701. c.lock.Lock()
  702. defer c.lock.Unlock()
  703. conn, ok := c.connectionsActive[srcDst]
  704. if !ok {
  705. return false
  706. }
  707. key := AddrPair{src: srcDst.dst, dst: conn.ActualDest}
  708. stats := c.connectsSuccessful[key]
  709. if stats == nil {
  710. stats = &ConnectionStats{}
  711. c.connectsSuccessful[key] = stats
  712. }
  713. stats.Retransmissions++
  714. return true
  715. }
  716. func (c *Container) updateDelays() {
  717. c.delaysLock.Lock()
  718. defer c.delaysLock.Unlock()
  719. for pid := range c.processes {
  720. stats, err := TaskstatsTGID(pid)
  721. if err != nil {
  722. continue
  723. }
  724. d := c.delaysByPid[pid]
  725. c.delays.cpu += stats.CPUDelay - d.cpu
  726. c.delays.disk += stats.BlockIODelay - d.disk
  727. d.cpu = stats.CPUDelay
  728. d.disk = stats.BlockIODelay
  729. c.delaysByPid[pid] = d
  730. }
  731. }
  732. func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
  733. if len(c.mounts) == 0 {
  734. return nil
  735. }
  736. res := map[string]map[string]*proc.FSStat{}
  737. for _, mi := range c.mounts {
  738. var stat *proc.FSStat
  739. for pid := range c.processes {
  740. s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
  741. if err == nil {
  742. stat = &s
  743. break
  744. }
  745. }
  746. if stat == nil {
  747. continue
  748. }
  749. if _, ok := res[mi.MajorMinor]; !ok {
  750. res[mi.MajorMinor] = map[string]*proc.FSStat{}
  751. }
  752. res[mi.MajorMinor][mi.MountPoint] = stat
  753. }
  754. return res
  755. }
  756. func (c *Container) getListens() map[netaddr.IPPort]int {
  757. res := map[netaddr.IPPort]int{}
  758. for addr, byPid := range c.listens {
  759. open := 0
  760. isHostNs := false
  761. ips := map[netaddr.IP]bool{}
  762. for pid, details := range byPid {
  763. p := c.processes[pid]
  764. if p == nil {
  765. continue
  766. }
  767. if p.isHostNs() {
  768. isHostNs = true
  769. }
  770. if details.ClosedAt.IsZero() {
  771. open = 1
  772. }
  773. for _, ip := range details.NsIPs {
  774. ips[ip] = true
  775. }
  776. }
  777. if !addr.IP().IsUnspecified() {
  778. ips = map[netaddr.IP]bool{addr.IP(): true}
  779. }
  780. for ip := range ips {
  781. if ip.IsLoopback() && !isHostNs {
  782. continue
  783. }
  784. res[netaddr.IPPortFrom(ip, addr.Port())] = open
  785. }
  786. }
  787. return res
  788. }
  789. func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {
  790. if len(c.metadata.hostListens) == 0 {
  791. return nil
  792. }
  793. hasUnspecified := false
  794. for _, addrs := range c.metadata.hostListens {
  795. for _, addr := range addrs {
  796. if addr.IP().IsUnspecified() {
  797. hasUnspecified = true
  798. break
  799. }
  800. }
  801. }
  802. var hostIps []netaddr.IP
  803. if hasUnspecified {
  804. if ns, err := proc.GetHostNetNs(); err != nil {
  805. klog.Warningln(err)
  806. } else {
  807. ips, err := proc.GetNsIps(ns)
  808. _ = ns.Close()
  809. if err != nil {
  810. klog.Warningln(err)
  811. } else {
  812. hostIps = ips
  813. }
  814. }
  815. }
  816. res := map[string]map[netaddr.IPPort]struct{}{}
  817. for proxy, addrs := range c.metadata.hostListens {
  818. res[proxy] = map[netaddr.IPPort]struct{}{}
  819. for _, addr := range addrs {
  820. if addr.IP().IsUnspecified() {
  821. for _, ip := range hostIps {
  822. if addr.IP().Is4() && ip.Is4() || addr.IP().Is6() && ip.Is6() {
  823. res[proxy][netaddr.IPPortFrom(ip, addr.Port())] = struct{}{}
  824. }
  825. }
  826. } else {
  827. res[proxy][addr] = struct{}{}
  828. }
  829. }
  830. }
  831. return res
  832. }
  833. func (c *Container) ping() map[netaddr.IP]float64 {
  834. netNs := netns.None()
  835. for pid := range c.processes {
  836. if pid == agentPid {
  837. netNs = selfNetNs
  838. break
  839. }
  840. ns, err := proc.GetNetNs(pid)
  841. if err != nil {
  842. if !common.IsNotExist(err) {
  843. klog.Warningln(err)
  844. }
  845. continue
  846. }
  847. netNs = ns
  848. defer netNs.Close()
  849. break
  850. }
  851. if !netNs.IsOpen() {
  852. return nil
  853. }
  854. ips := map[netaddr.IP]struct{}{}
  855. for d := range c.connectsSuccessful {
  856. ips[d.dst.IP()] = struct{}{}
  857. }
  858. for dst := range c.connectsFailed {
  859. ips[dst.IP()] = struct{}{}
  860. }
  861. if len(ips) == 0 {
  862. return nil
  863. }
  864. targets := make([]netaddr.IP, 0, len(ips))
  865. for ip := range ips {
  866. if ip.IsLoopback() {
  867. continue
  868. }
  869. if !ip.Is4() { // pinger doesn't support IPv6 yet
  870. continue
  871. }
  872. targets = append(targets, ip)
  873. }
  874. rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout)
  875. if err != nil {
  876. klog.Warningln(err)
  877. return nil
  878. }
  879. return rtt
  880. }
  881. func (c *Container) runLogParser(logPath string) {
  882. if *flags.DisableLogParsing {
  883. return
  884. }
  885. containerId := string(c.id)
  886. if logPath != "" {
  887. if c.logParsers[logPath] != nil {
  888. return
  889. }
  890. ch := make(chan logparser.LogEntry)
  891. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  892. reader, err := logs.NewTailReader(proc.HostPath(logPath), ch)
  893. if err != nil {
  894. klog.Warningln(err)
  895. parser.Stop()
  896. return
  897. }
  898. klog.Infoln("started varlog logparser", "cg", c.cgroup.Id, "log", logPath)
  899. c.logParsers[logPath] = &LogParser{parser: parser, stop: reader.Stop}
  900. return
  901. }
  902. switch c.cgroup.ContainerType {
  903. case cgroup.ContainerTypeSystemdService:
  904. ch := make(chan logparser.LogEntry)
  905. if err := JournaldSubscribe(c.cgroup, ch); err != nil {
  906. klog.Warningln(err)
  907. return
  908. }
  909. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  910. stop := func() {
  911. JournaldUnsubscribe(c.cgroup)
  912. }
  913. klog.Infoln("started journald logparser", "cg", c.cgroup.Id)
  914. c.logParsers["journald"] = &LogParser{parser: parser, stop: stop}
  915. case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd, cgroup.ContainerTypeCrio:
  916. if c.metadata.logPath == "" {
  917. return
  918. }
  919. if parser := c.logParsers["stdout/stderr"]; parser != nil {
  920. parser.Stop()
  921. delete(c.logParsers, "stdout/stderr")
  922. }
  923. ch := make(chan logparser.LogEntry)
  924. parser := logparser.NewParser(ch, c.metadata.logDecoder, logs.OtelLogEmitter(containerId))
  925. reader, err := logs.NewTailReader(proc.HostPath(c.metadata.logPath), ch)
  926. if err != nil {
  927. klog.Warningln(err)
  928. parser.Stop()
  929. return
  930. }
  931. klog.Infoln("started container logparser", "cg", c.cgroup.Id)
  932. c.logParsers["stdout/stderr"] = &LogParser{parser: parser, stop: reader.Stop}
  933. }
  934. }
  935. func (c *Container) gc(now time.Time) {
  936. c.lock.Lock()
  937. defer c.lock.Unlock()
  938. established := map[AddrPair]struct{}{}
  939. establishedDst := map[netaddr.IPPort]struct{}{}
  940. listens := map[netaddr.IPPort]string{}
  941. seenNamespaces := map[string]bool{}
  942. fdMap := map[uint64]struct{}{}
  943. for _, p := range c.processes {
  944. if seenNamespaces[p.NetNsId()] {
  945. continue
  946. }
  947. sockets, err := proc.GetSockets(p.Pid)
  948. if err != nil {
  949. continue
  950. }
  951. fds, err := proc.ReadFds(p.Pid)
  952. if err == nil {
  953. for _, fd := range fds {
  954. fdMap[fd.Fd] = struct{}{}
  955. }
  956. }
  957. for _, s := range sockets {
  958. if s.Listen {
  959. listens[s.SAddr] = s.Inode
  960. } else {
  961. established[AddrPair{src: s.SAddr, dst: s.DAddr}] = struct{}{}
  962. establishedDst[s.DAddr] = struct{}{}
  963. }
  964. }
  965. seenNamespaces[p.NetNsId()] = true
  966. }
  967. for ns := range c.ipsByNs {
  968. if !seenNamespaces[ns] {
  969. delete(c.ipsByNs, ns)
  970. }
  971. }
  972. c.revalidateListens(now, listens)
  973. for srcDst, conn := range c.connectionsActive {
  974. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  975. if _, ok := established[srcDst]; !ok {
  976. delete(c.connectionsActive, srcDst)
  977. if conn == c.connectionsByPidFd[pidFd] {
  978. delete(c.connectionsByPidFd, pidFd)
  979. }
  980. continue
  981. }
  982. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  983. delete(c.connectionsActive, srcDst)
  984. if conn == c.connectionsByPidFd[pidFd] {
  985. delete(c.connectionsByPidFd, pidFd)
  986. }
  987. }
  988. }
  989. for _, conn := range c.connectionsByPidFd {
  990. if _, ok := fdMap[conn.Fd]; !ok {
  991. delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  992. }
  993. }
  994. for dst, at := range c.connectLastAttempt {
  995. _, active := establishedDst[dst]
  996. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  997. delete(c.connectLastAttempt, dst)
  998. delete(c.connectsFailed, dst)
  999. for d := range c.connectsSuccessful {
  1000. if d.src == dst {
  1001. delete(c.connectsSuccessful, d)
  1002. }
  1003. }
  1004. c.l7Stats.delete(dst)
  1005. }
  1006. }
  1007. }
  1008. func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.IPPort]string) {
  1009. for addr, byPid := range c.listens {
  1010. if _, open := actualListens[addr]; open {
  1011. continue
  1012. }
  1013. klog.Warningln("deleting the outdated listen:", addr)
  1014. for _, details := range byPid {
  1015. if details.ClosedAt.IsZero() {
  1016. details.ClosedAt = now
  1017. }
  1018. }
  1019. }
  1020. missingListens := map[netaddr.IPPort]string{}
  1021. for addr, inode := range actualListens {
  1022. byPids, found := c.listens[addr]
  1023. if !found {
  1024. missingListens[addr] = inode
  1025. continue
  1026. }
  1027. open := false
  1028. for _, details := range byPids {
  1029. if details.ClosedAt.IsZero() {
  1030. open = true
  1031. break
  1032. }
  1033. }
  1034. if !open {
  1035. missingListens[addr] = inode
  1036. }
  1037. }
  1038. if len(missingListens) > 0 {
  1039. inodeToPid := map[string]uint32{}
  1040. for pid := range c.processes {
  1041. fds, err := proc.ReadFds(pid)
  1042. if err != nil {
  1043. klog.Warningln(err)
  1044. continue
  1045. }
  1046. for _, fd := range fds {
  1047. if fd.SocketInode != "" {
  1048. inodeToPid[fd.SocketInode] = pid
  1049. }
  1050. }
  1051. }
  1052. for addr, inode := range missingListens {
  1053. pid, found := inodeToPid[inode]
  1054. if !found {
  1055. continue
  1056. }
  1057. //klog.Warningln("missing listen found:", addr, pid)
  1058. c.onListenOpen(pid, addr, true)
  1059. }
  1060. }
  1061. for addr, pids := range c.listens {
  1062. for pid, details := range pids {
  1063. if !details.ClosedAt.IsZero() && now.Sub(details.ClosedAt) > gcInterval {
  1064. delete(c.listens[addr], pid)
  1065. }
  1066. }
  1067. if len(c.listens[addr]) == 0 {
  1068. delete(c.listens, addr)
  1069. }
  1070. }
  1071. }
  1072. func (c *Container) attachUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1073. klog.Infoln("[attach] attachUprobes start")
  1074. if tracer.DisableL7Tracing() {
  1075. return nil
  1076. }
  1077. codeType := c.GetCodeTypeFromCache(pid)
  1078. if codeType.IsUnknownCode() {
  1079. return nil
  1080. }
  1081. var err error
  1082. switch codeType {
  1083. case CodeTypeJava:
  1084. err = c.attachJVMUprobes(tracer, pid)
  1085. case CodeTypeJavaAot:
  1086. err = c.attachJVMUprobes(tracer, pid)
  1087. case CodeTypeGo:
  1088. err = c.attachTlsUprobes(tracer, pid)
  1089. case CodeTypeNetCoreAot:
  1090. err = c.attachNetCoreUprobes(tracer, pid)
  1091. }
  1092. if err != nil {
  1093. c.errorClose(pid)
  1094. return err
  1095. }
  1096. c.l7AttachSuccess()
  1097. return nil
  1098. }
  1099. func (c *Container) GetCodeTypeFromCache(pid uint32) CodeType {
  1100. p := c.processes[pid]
  1101. if p == nil {
  1102. return CodeTypeUnknown
  1103. }
  1104. if p.codeType.IsWaitCheck() {
  1105. p.codeType = GetExeType(pid)
  1106. }
  1107. return p.codeType
  1108. }
  1109. func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1110. p := c.processes[pid]
  1111. if p == nil {
  1112. return nil
  1113. }
  1114. if !p.openSslUprobesChecked {
  1115. sslProbes, err := tracer.AttachOpenSslUprobes(pid)
  1116. if err != nil {
  1117. return err
  1118. }
  1119. p.uprobes = append(p.uprobes, sslProbes...)
  1120. p.openSslUprobesChecked = true
  1121. }
  1122. if !p.goTlsUprobesChecked {
  1123. uprobes, isGolangApp := tracer.AttachGoTlsUprobes(pid)
  1124. p.isGolangApp = isGolangApp
  1125. p.uprobes = append(p.uprobes, uprobes...)
  1126. p.goTlsUprobesChecked = true
  1127. }
  1128. return nil
  1129. }
  1130. func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1131. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1132. return nil
  1133. }
  1134. p := c.processes[pid]
  1135. if p == nil {
  1136. return nil
  1137. }
  1138. codeType := c.GetCodeTypeFromCache(pid)
  1139. if !p.jvmUprobesChecked {
  1140. tracer.InitKProcInfo(pid, &c.AppInfo)
  1141. libNioProbes, err := tracer.AttachJavaNioReadUprobes(pid, codeType)
  1142. if err != nil {
  1143. klog.Error(err)
  1144. return err
  1145. }
  1146. p.uprobes = append(p.uprobes, libNioProbes...)
  1147. libNetProbes, err := tracer.AttachJavaNetWriteUprobes(pid)
  1148. if err != nil {
  1149. klog.Error(err)
  1150. return err
  1151. }
  1152. p.uprobes = append(p.uprobes, libNetProbes...)
  1153. p.jvmUprobesChecked = true
  1154. } else {
  1155. klog.Infof("[attach] %s-%d already attach", codeType.String(), pid)
  1156. }
  1157. return nil
  1158. }
  1159. func (c *Container) errorClose(pid uint32) {
  1160. p := c.processes[pid]
  1161. if p != nil {
  1162. p.DynamicClose()
  1163. }
  1164. }
  1165. func (c *Container) attachNetCoreUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1166. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1167. return nil
  1168. }
  1169. p := c.processes[pid]
  1170. if p == nil {
  1171. return nil
  1172. }
  1173. if !p.jvmUprobesChecked {
  1174. //codeType := c.GetCodeTypeFromCache(pid)
  1175. tracer.InitKProcInfo(pid, &c.AppInfo)
  1176. p.uprobes = append(p.uprobes, tracer.AttachNetCoreNetThreadUprobes(pid)...)
  1177. readProbes, err := tracer.AttachNetCoreNetReadUprobes(pid)
  1178. if err != nil {
  1179. klog.Error(err)
  1180. return err
  1181. }
  1182. p.uprobes = append(p.uprobes, readProbes...)
  1183. WriteProbes, err := tracer.AttachNetCoreNetWriteUprobes(pid)
  1184. if err != nil {
  1185. klog.Error(err)
  1186. return err
  1187. }
  1188. p.uprobes = append(p.uprobes, WriteProbes...)
  1189. p.jvmUprobesChecked = true
  1190. }
  1191. return nil
  1192. }
  1193. func resolveFd(pid uint32, fd uint64) (mntId string, logPath string) {
  1194. info := proc.GetFdInfo(pid, fd)
  1195. if info == nil {
  1196. return
  1197. }
  1198. switch {
  1199. case info.Flags&os.O_WRONLY == 0 && info.Flags&os.O_RDWR == 0,
  1200. !strings.HasPrefix(info.Dest, "/"),
  1201. strings.HasPrefix(info.Dest, "/proc/"),
  1202. strings.HasPrefix(info.Dest, "/dev/"),
  1203. strings.HasPrefix(info.Dest, "/sys/"),
  1204. strings.HasSuffix(info.Dest, "(deleted)"):
  1205. return
  1206. }
  1207. mntId = info.MntId
  1208. if info.Flags&os.O_WRONLY != 0 && strings.HasPrefix(info.Dest, "/var/log/") &&
  1209. !strings.HasPrefix(info.Dest, "/var/log/pods/") &&
  1210. !strings.HasPrefix(info.Dest, "/var/log/containers/") &&
  1211. !strings.HasPrefix(info.Dest, "/var/log/journal/") {
  1212. logPath = info.Dest
  1213. }
  1214. return
  1215. }
  1216. func counter(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1217. return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labelValues...)
  1218. }
  1219. func gauge(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1220. return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...)
  1221. }