container.go 39 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537
  1. package containers
  2. import (
  3. debugelf "debug/elf"
  4. "os"
  5. "sort"
  6. "strings"
  7. "sync"
  8. "time"
  9. . "github.com/coroot/coroot-node-agent/utils/modelse"
  10. "github.com/coroot/coroot-node-agent/cgroup"
  11. "github.com/coroot/coroot-node-agent/common"
  12. "github.com/coroot/coroot-node-agent/ebpftracer"
  13. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  14. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  15. "github.com/coroot/coroot-node-agent/flags"
  16. "github.com/coroot/coroot-node-agent/logs"
  17. "github.com/coroot/coroot-node-agent/node"
  18. "github.com/coroot/coroot-node-agent/pinger"
  19. "github.com/coroot/coroot-node-agent/proc"
  20. "github.com/coroot/coroot-node-agent/tracing"
  21. "github.com/coroot/logparser"
  22. "github.com/prometheus/client_golang/prometheus"
  23. klog "github.com/sirupsen/logrus"
  24. "github.com/vishvananda/netns"
  25. "golang.org/x/exp/maps"
  26. "inet.af/netaddr"
  27. )
  28. var (
  29. gcInterval = 10 * time.Second
  30. pingTimeout = 300 * time.Millisecond
  31. )
  32. type ContainerID string
  33. type ContainerNetwork struct {
  34. NetworkID string
  35. }
  36. type ContainerMetadata struct {
  37. name string
  38. labels map[string]string
  39. volumes map[string]string
  40. logPath string
  41. image string
  42. logDecoder logparser.Decoder
  43. hostListens map[string][]netaddr.IPPort
  44. networks map[string]ContainerNetwork
  45. env map[string]string
  46. systemdTriggeredBy string
  47. rootfs string
  48. }
  49. type Delays struct {
  50. cpu time.Duration
  51. disk time.Duration
  52. }
  53. type LogParser struct {
  54. parser *logparser.Parser
  55. stop func()
  56. }
  57. func (p *LogParser) Stop() {
  58. if p.stop != nil {
  59. p.stop()
  60. }
  61. p.parser.Stop()
  62. }
  63. type AddrPair struct {
  64. src netaddr.IPPort
  65. dst netaddr.IPPort
  66. }
  67. type ActiveConnection struct {
  68. Dest netaddr.IPPort
  69. Src netaddr.IPPort
  70. ActualDest netaddr.IPPort
  71. Pid uint32
  72. Fd uint64
  73. Timestamp uint64
  74. Closed time.Time
  75. BytesSent uint64
  76. BytesReceived uint64
  77. http2Parser *l7.Http2Parser
  78. postgresParser *l7.PostgresParser
  79. mysqlParser *l7.MysqlParser
  80. dmParser *l7.DmParser
  81. }
  82. type ActiveAccept struct {
  83. Dest netaddr.IPPort
  84. Src netaddr.IPPort
  85. Pid uint32
  86. Fd uint64
  87. Timestamp uint64
  88. Closed time.Time
  89. BytesSent uint64
  90. BytesReceived uint64
  91. }
  92. type ListenDetails struct {
  93. ClosedAt time.Time
  94. NsIPs []netaddr.IP
  95. }
  96. type PidFd struct {
  97. Pid uint32
  98. Fd uint64
  99. }
  100. type K8sContainer struct {
  101. ns string
  102. podName string
  103. podId string
  104. workload string
  105. containerName string
  106. pid string
  107. }
  108. type ConnectionStats struct {
  109. Count uint64
  110. TotalTime time.Duration
  111. Retransmissions uint64
  112. BytesSent uint64
  113. PerBytesSent uint64
  114. BytesReceived uint64
  115. PerBytesReceived uint64
  116. Src netaddr.IPPort
  117. ConEstTime time.Duration
  118. FirstReadTime uint64
  119. FirstWriteTime uint64
  120. NewReadTime uint64
  121. }
  122. type AcceptStats struct {
  123. BytesSent uint64
  124. BytesReceived uint64
  125. }
  126. type Container struct {
  127. id ContainerID
  128. cgroup *cgroup.Cgroup
  129. metadata *ContainerMetadata
  130. K8sContainer
  131. processes map[uint32]*Process
  132. startedAt time.Time
  133. zombieAt time.Time
  134. restarts int
  135. delays Delays
  136. delaysByPid map[uint32]Delays
  137. delaysLock sync.Mutex
  138. listens map[netaddr.IPPort]map[uint32]*ListenDetails
  139. connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count
  140. connectsFailed map[netaddr.IPPort]int64 // dst -> count
  141. connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  142. connectionsActive map[AddrPair]*ActiveConnection
  143. connectionsByPidFd map[PidFd]*ActiveConnection
  144. acceptsSuccessful map[AddrPair]*AcceptStats
  145. acceptLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  146. acceptsActive map[AddrPair]*ActiveAccept
  147. acceptsByPidFd map[PidFd]*ActiveAccept
  148. l7Stats L7Stats
  149. dnsStats *L7Metrics
  150. oomKills int
  151. pythonThreadLockWaitTime time.Duration
  152. mounts map[string]proc.MountInfo
  153. logParsers map[string]*LogParser
  154. hostConntrack *Conntrack
  155. nsConntrack *Conntrack
  156. lbConntracks []*Conntrack
  157. registry *Registry
  158. lock sync.RWMutex
  159. done chan struct{}
  160. traceMap map[uint64]*tracing.Trace
  161. //instanceID utils.ID
  162. Symbols []debugelf.Symbol
  163. Uprobes []tracer.Uprobe
  164. UprobesMap map[string]tracer.Uprobe
  165. l7EventReady bool
  166. l7Attach bool
  167. // 白名单详情
  168. WhiteSettingInfo WhiteSettingInfo
  169. // 应用详情
  170. AppInfo AppInfo
  171. }
  172. func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) {
  173. netNs, err := proc.GetNetNs(pid)
  174. if err != nil {
  175. return nil, err
  176. }
  177. defer netNs.Close()
  178. c := &Container{
  179. id: id,
  180. cgroup: cg,
  181. metadata: md,
  182. processes: map[uint32]*Process{},
  183. delaysByPid: map[uint32]Delays{},
  184. listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
  185. connectsSuccessful: map[AddrPair]*ConnectionStats{},
  186. connectsFailed: map[netaddr.IPPort]int64{},
  187. connectLastAttempt: map[netaddr.IPPort]time.Time{},
  188. connectionsActive: map[AddrPair]*ActiveConnection{},
  189. connectionsByPidFd: map[PidFd]*ActiveConnection{},
  190. acceptsSuccessful: map[AddrPair]*AcceptStats{},
  191. acceptLastAttempt: map[netaddr.IPPort]time.Time{},
  192. acceptsActive: map[AddrPair]*ActiveAccept{},
  193. acceptsByPidFd: map[PidFd]*ActiveAccept{},
  194. l7Stats: L7Stats{},
  195. dnsStats: &L7Metrics{},
  196. mounts: map[string]proc.MountInfo{},
  197. logParsers: map[string]*LogParser{},
  198. hostConntrack: hostConntrack,
  199. done: make(chan struct{}),
  200. traceMap: make(map[uint64]*tracing.Trace),
  201. registry: registry,
  202. }
  203. for _, n := range md.networks {
  204. if nsHandle := FindNetworkLoadBalancerNs(n.NetworkID); nsHandle.IsOpen() {
  205. if ct, err := NewConntrack(nsHandle); err != nil {
  206. klog.Warningln(err)
  207. } else {
  208. c.lbConntracks = append(c.lbConntracks, ct)
  209. }
  210. _ = nsHandle.Close()
  211. }
  212. }
  213. c.runLogParser("")
  214. // go func() {
  215. // ticker := time.NewTicker(gcInterval)
  216. // defer ticker.Stop()
  217. // for {
  218. // select {
  219. // case <-c.done:
  220. // return
  221. // case t := <-ticker.C:
  222. // c.gc(t)
  223. // }
  224. // }
  225. // }()
  226. return c, nil
  227. }
  228. func (c *Container) Close() {
  229. for _, p := range c.logParsers {
  230. p.Stop()
  231. }
  232. for _, ct := range c.lbConntracks {
  233. _ = ct.Close()
  234. }
  235. if c.nsConntrack != nil {
  236. _ = c.nsConntrack.Close()
  237. }
  238. close(c.done)
  239. }
  240. func (c *Container) Dead(now time.Time) bool {
  241. return !c.zombieAt.IsZero() && now.Sub(c.zombieAt) > gcInterval
  242. }
  243. func (c *Container) Describe(ch chan<- *prometheus.Desc) {
  244. // some fixed metric description is required here to register/unregister the collector correctly
  245. ch <- prometheus.NewDesc("container", "", nil, nil)
  246. }
  247. func (c *Container) Collect(ch chan<- prometheus.Metric) {
  248. c.registry.updateTrafficStatsIfNecessary()
  249. c.lock.RLock()
  250. defer c.lock.RUnlock()
  251. if c.metadata.image != "" || c.metadata.systemdTriggeredBy != "" {
  252. ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image, c.metadata.systemdTriggeredBy)
  253. }
  254. ch <- counter(metrics.Restarts, float64(c.restarts))
  255. if cpu, err := c.cgroup.CpuStat(); err == nil {
  256. if cpu.LimitCores > 0 {
  257. ch <- gauge(metrics.CPULimit, cpu.LimitCores)
  258. }
  259. ch <- counter(metrics.CPUUsage, cpu.UsageSeconds)
  260. ch <- counter(metrics.ThrottledTime, cpu.ThrottledTimeSeconds)
  261. }
  262. if taskstatsClient != nil {
  263. c.updateDelays()
  264. ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second))
  265. ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second))
  266. }
  267. if s, err := c.cgroup.MemoryStat(); err == nil {
  268. ch <- gauge(metrics.MemoryRss, float64(s.RSS))
  269. ch <- gauge(metrics.MemoryCache, float64(s.Cache))
  270. if s.Limit > 0 {
  271. ch <- gauge(metrics.MemoryLimit, float64(s.Limit))
  272. }
  273. }
  274. if c.oomKills > 0 {
  275. ch <- counter(metrics.OOMKills, float64(c.oomKills))
  276. }
  277. if disks, err := node.GetDisks(); err == nil {
  278. ioStat, _ := c.cgroup.IOStat()
  279. for majorMinor, mounts := range c.getMounts() {
  280. dev := disks.GetParentBlockDevice(majorMinor)
  281. if dev == nil {
  282. continue
  283. }
  284. for mountPoint, fsStat := range mounts {
  285. dls := []string{mountPoint, dev.Name, c.metadata.volumes[mountPoint]}
  286. ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...)
  287. ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...)
  288. ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...)
  289. if io, ok := ioStat[majorMinor]; ok {
  290. ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...)
  291. ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...)
  292. ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...)
  293. ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...)
  294. }
  295. }
  296. }
  297. }
  298. for addr, open := range c.getListens() {
  299. ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "")
  300. }
  301. for proxy, addrs := range c.getProxiedListens() {
  302. for addr := range addrs {
  303. ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy)
  304. }
  305. }
  306. for d, stats := range c.connectsSuccessful {
  307. ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), d.src.String(), d.dst.String())
  308. ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), d.src.String(), d.dst.String())
  309. if stats.Retransmissions > 0 {
  310. ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), d.src.String(), d.dst.String())
  311. }
  312. ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), d.dst.String(), d.dst.String(),stats.Src.String())
  313. ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), d.dst.String(), d.dst.String(),stats.Src.String())
  314. ch <- counter(metrics.NetBytesSentPer, float64(stats.PerBytesSent), d.dst.String(), d.dst.String(),stats.Src.String())
  315. ch <- counter(metrics.NetBytesReceivedPer, float64(stats.PerBytesReceived), d.dst.String(), d.dst.String(),stats.Src.String())
  316. ch <- counter(metrics.NetDataLatency, float64(stats.FirstReadTime-stats.FirstWriteTime), d.dst.String(), d.dst.String(),stats.Src.String())
  317. ch <- counter(metrics.NetDataDuration, float64(stats.NewReadTime-stats.FirstWriteTime), d.dst.String(), d.dst.String(),stats.Src.String())
  318. ch <- counter(metrics.NetEstTime, float64(stats.ConEstTime), d.dst.String(), d.dst.String(),stats.Src.String())
  319. klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime)
  320. stats.PerBytesReceived = 0
  321. stats.PerBytesSent = 0
  322. }
  323. // for d, stats := range c.acceptsSuccessful {
  324. // ch <- counter(metrics.NetAcceptsSuccessful, float64(0), d.src.String(), d.dst.String())
  325. // ch <- counter(metrics.NetAcceptBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String())
  326. // ch <- counter(metrics.NetAcceptBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String())
  327. // klog.Infof("c.acceptsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d", d.src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived)
  328. // }
  329. for dst, count := range c.connectsFailed {
  330. ch <- counter(metrics.NetConnectionsFailed, float64(count), dst.String())
  331. }
  332. connections := map[AddrPair]int{}
  333. for addrPair, conn := range c.connectionsActive {
  334. if !conn.Closed.IsZero() {
  335. continue
  336. }
  337. connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest}]++
  338. }
  339. for d, count := range connections {
  340. ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
  341. }
  342. for source, p := range c.logParsers {
  343. for _, c := range p.parser.GetCounters() {
  344. ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
  345. }
  346. }
  347. appTypes := map[string]struct{}{}
  348. seenJvms := map[string]bool{}
  349. seenDotNetApps := map[string]bool{}
  350. pids := maps.Keys(c.processes)
  351. sort.Slice(pids, func(i, j int) bool {
  352. return pids[i] < pids[j]
  353. })
  354. for _, pid := range pids {
  355. process := c.processes[pid]
  356. cmdline := proc.GetCmdline(pid)
  357. if len(cmdline) == 0 {
  358. continue
  359. }
  360. appType := guessApplicationType(cmdline)
  361. if appType != "" {
  362. appTypes[appType] = struct{}{}
  363. }
  364. if process.isGolangApp {
  365. appTypes["golang"] = struct{}{}
  366. }
  367. switch {
  368. case isJvm(cmdline):
  369. jvm, jMetrics := jvmMetrics(pid)
  370. if len(jMetrics) > 0 && !seenJvms[jvm] {
  371. seenJvms[jvm] = true
  372. for _, m := range jMetrics {
  373. ch <- m
  374. }
  375. }
  376. case process.dotNetMonitor != nil:
  377. appTypes["dotnet"] = struct{}{}
  378. appName := process.dotNetMonitor.AppName()
  379. if !seenDotNetApps[appName] {
  380. seenDotNetApps[appName] = true
  381. process.dotNetMonitor.Collect(ch)
  382. }
  383. }
  384. }
  385. for appType := range appTypes {
  386. ch <- gauge(metrics.ApplicationType, 1, appType)
  387. }
  388. if c.pythonThreadLockWaitTime > 0 {
  389. ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds())
  390. }
  391. if c.dnsStats.Requests != nil {
  392. c.dnsStats.Requests.Collect(ch)
  393. }
  394. if c.dnsStats.Latency != nil {
  395. c.dnsStats.Latency.Collect(ch)
  396. }
  397. c.l7Stats.collect(ch)
  398. if !*flags.DisablePinger {
  399. for ip, rtt := range c.ping() {
  400. ch <- gauge(metrics.NetLatency, rtt, ip.String())
  401. }
  402. }
  403. c.gc(time.Now())
  404. }
  405. func (c *Container) onProcessStart(pid uint32) *Process {
  406. c.lock.Lock()
  407. defer c.lock.Unlock()
  408. stats, err := TaskstatsPID(pid)
  409. if err != nil {
  410. klog.WithError(err).Errorf("Failed onProcessStart [%d]", pid)
  411. return nil
  412. }
  413. c.zombieAt = time.Time{}
  414. p := NewProcess(pid, stats, c.registry.tracer)
  415. if p == nil {
  416. return nil
  417. }
  418. c.processes[pid] = p
  419. if c.startedAt.IsZero() {
  420. c.startedAt = stats.BeginTime
  421. } else {
  422. min := stats.BeginTime
  423. for _, p := range c.processes {
  424. if p.StartedAt.Before(min) {
  425. min = p.StartedAt
  426. }
  427. }
  428. if min.After(c.startedAt) {
  429. c.restarts++
  430. c.startedAt = min
  431. }
  432. }
  433. return p
  434. }
  435. func (c *Container) onProcessExit(pid uint32, oomKill bool) {
  436. c.lock.Lock()
  437. defer c.lock.Unlock()
  438. if p := c.processes[pid]; p != nil {
  439. p.Close()
  440. }
  441. delete(c.processes, pid)
  442. if len(c.processes) == 0 {
  443. c.zombieAt = time.Now()
  444. }
  445. delete(c.delaysByPid, pid)
  446. if oomKill {
  447. c.oomKills++
  448. }
  449. }
  450. func (c *Container) onFileOpen(pid uint32, fd uint64) {
  451. mntId, logPath := resolveFd(pid, fd)
  452. func() {
  453. if mntId == "" {
  454. return
  455. }
  456. c.lock.Lock()
  457. _, ok := c.mounts[mntId]
  458. c.lock.Unlock()
  459. if ok {
  460. return
  461. }
  462. byMountId := proc.GetMountInfo(pid)
  463. if byMountId == nil {
  464. return
  465. }
  466. if mi, ok := byMountId[mntId]; ok {
  467. c.lock.Lock()
  468. c.mounts[mntId] = mi
  469. c.lock.Unlock()
  470. }
  471. }()
  472. if logPath != "" {
  473. c.lock.Lock()
  474. c.runLogParser(logPath)
  475. c.lock.Unlock()
  476. }
  477. }
  478. // set
  479. func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
  480. klog.Infof("TCP listen open pid=%d id=%s addr=%s", pid, c.id, addr)
  481. if common.PortFilter.ShouldBeSkipped(addr.Port()) {
  482. return
  483. }
  484. if !safe {
  485. c.lock.Lock()
  486. defer c.lock.Unlock()
  487. }
  488. if _, ok := c.listens[addr]; !ok {
  489. c.listens[addr] = map[uint32]*ListenDetails{}
  490. }
  491. details := &ListenDetails{}
  492. c.listens[addr][pid] = details
  493. if addr.IP().IsUnspecified() {
  494. ns, err := proc.GetNetNs(pid)
  495. if err != nil {
  496. if !common.IsNotExist(err) {
  497. klog.Warningln(err)
  498. }
  499. return
  500. }
  501. defer ns.Close()
  502. ips, err := proc.GetNsIps(ns)
  503. if err != nil {
  504. klog.Warningln(err)
  505. return
  506. }
  507. klog.Infof("got IPs %s for %s", ips, ns.UniqueId())
  508. details.NsIPs = ips
  509. }
  510. }
  511. func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
  512. klog.Infof("TCP listen close pid=%d id=%s addr=%s", pid, c.id, addr)
  513. c.lock.Lock()
  514. defer c.lock.Unlock()
  515. if _, byAddr := c.listens[addr]; byAddr {
  516. if _, byPid := c.listens[addr][pid]; byPid {
  517. if details := c.listens[addr][pid]; details != nil {
  518. details.ClosedAt = time.Now()
  519. }
  520. }
  521. }
  522. }
  523. func (c *Container) onAcceptOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  524. klog.Infof("accept pid=%d id=%s dstaddr=%s srcaddr=%s", pid, c.id, dst.IP(), src.IP())
  525. // if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  526. // return
  527. // }
  528. p := c.processes[pid]
  529. if p == nil {
  530. return
  531. }
  532. if dst.IP().IsLoopback() && !p.isHostNs() {
  533. return
  534. }
  535. // actualDst, err := c.getActualDestination(p, src, dst)
  536. // if err != nil {
  537. // if !common.IsNotExist(err) {
  538. // klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  539. // }
  540. // return
  541. // }
  542. // switch {
  543. // case actualDst == nil:
  544. // actualDst = &dst
  545. // case actualDst.IP().IsLoopback() && !p.isHostNs():
  546. // return
  547. // }
  548. // if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  549. // return
  550. // }
  551. c.lock.Lock()
  552. defer c.lock.Unlock()
  553. if !failed {
  554. key := AddrPair{src: dst, dst: src}
  555. stats := c.acceptsSuccessful[key]
  556. if stats == nil {
  557. stats = &AcceptStats{}
  558. c.acceptsSuccessful[key] = stats
  559. }
  560. acceptCon := &ActiveAccept{
  561. Dest: src,
  562. Src: dst,
  563. Pid: pid,
  564. Fd: fd,
  565. Timestamp: timestamp,
  566. }
  567. c.acceptsActive[AddrPair{src: dst, dst: src}] = acceptCon
  568. c.acceptsByPidFd[PidFd{Pid: pid, Fd: fd}] = acceptCon
  569. }
  570. c.acceptLastAttempt[dst] = time.Now()
  571. }
  572. func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  573. if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  574. return
  575. }
  576. p := c.processes[pid]
  577. if p == nil {
  578. return
  579. }
  580. if dst.IP().IsLoopback() && !p.isHostNs() {
  581. return
  582. }
  583. actualDst, err := c.getActualDestination(p, src, dst)
  584. if err != nil {
  585. if !common.IsNotExist(err) {
  586. klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  587. }
  588. return
  589. }
  590. switch {
  591. case actualDst == nil:
  592. actualDst = &dst
  593. case actualDst.IP().IsLoopback() && !p.isHostNs():
  594. return
  595. }
  596. if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  597. return
  598. }
  599. c.lock.Lock()
  600. defer c.lock.Unlock()
  601. if failed {
  602. c.connectsFailed[dst]++
  603. } else {
  604. key := AddrPair{src: dst, dst: *actualDst}
  605. stats := c.connectsSuccessful[key]
  606. if stats == nil {
  607. stats = &ConnectionStats{}
  608. c.connectsSuccessful[key] = stats
  609. }
  610. stats.Count++
  611. stats.TotalTime += duration
  612. stats.Src = src
  613. stats.ConEstTime = duration
  614. connection := &ActiveConnection{
  615. Dest: dst,
  616. Src: src,
  617. ActualDest: *actualDst,
  618. Pid: pid,
  619. Fd: fd,
  620. Timestamp: timestamp,
  621. }
  622. c.connectionsActive[AddrPair{src: src, dst: dst}] = connection
  623. c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection
  624. }
  625. c.connectLastAttempt[dst] = time.Now()
  626. }
  627. func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) {
  628. if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil {
  629. return actualDst, nil
  630. }
  631. for _, lb := range c.lbConntracks {
  632. if actualDst := lb.GetActualDestination(src, dst); actualDst != nil {
  633. return actualDst, nil
  634. }
  635. }
  636. actualDst := c.hostConntrack.GetActualDestination(src, dst)
  637. if actualDst != nil {
  638. return actualDst, nil
  639. }
  640. if !p.isHostNs() {
  641. if c.nsConntrack == nil {
  642. netNs, err := proc.GetNetNs(p.Pid)
  643. if err != nil {
  644. return nil, err
  645. }
  646. defer netNs.Close()
  647. c.nsConntrack, err = NewConntrack(netNs)
  648. if err != nil {
  649. return nil, err
  650. }
  651. }
  652. return c.nsConntrack.GetActualDestination(src, dst), nil
  653. }
  654. return nil, nil
  655. }
  656. func (c *Container) onConnectionClose(e ebpftracer.Event) {
  657. c.lock.Lock()
  658. conn := c.connectionsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  659. c.lock.Unlock()
  660. if conn != nil {
  661. if conn.Closed.IsZero() {
  662. if e.TrafficStats != nil {
  663. c.lock.Lock()
  664. c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived, e.FirstReadTime, e.FirstWriteTime, e.NewReadTime)
  665. c.lock.Unlock()
  666. }
  667. conn.Closed = time.Now()
  668. }
  669. }
  670. }
  671. func (c *Container) onAcceptClose(e ebpftracer.Event) {
  672. c.lock.Lock()
  673. conn := c.acceptsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  674. c.lock.Unlock()
  675. if conn != nil {
  676. if conn.Closed.IsZero() {
  677. if e.TrafficStats != nil {
  678. c.lock.Lock()
  679. c.updateAcceptTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived)
  680. c.lock.Unlock()
  681. }
  682. conn.Closed = time.Now()
  683. }
  684. }
  685. }
  686. func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) {
  687. if u == nil {
  688. return
  689. }
  690. c.lock.Lock()
  691. defer c.lock.Unlock()
  692. c.updateConnectionTrafficStats(c.connectionsByPidFd[PidFd{Pid: u.Pid, Fd: u.FD}], u.BytesSent, u.BytesReceived, 0, 0, 0)
  693. }
  694. func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, received, firstreadtime, firstwritetime, newreadtime uint64) {
  695. if ac == nil {
  696. return
  697. }
  698. key := AddrPair{src: ac.Dest, dst: ac.ActualDest}
  699. stats := c.connectsSuccessful[key]
  700. if stats == nil {
  701. stats = &ConnectionStats{}
  702. c.connectsSuccessful[key] = stats
  703. }
  704. if sent > ac.BytesSent {
  705. stats.BytesSent += sent - ac.BytesSent
  706. stats.PerBytesSent = sent - ac.BytesSent
  707. }
  708. if received > ac.BytesReceived {
  709. stats.BytesReceived += received - ac.BytesReceived
  710. stats.PerBytesReceived = received - ac.BytesReceived
  711. }
  712. if firstreadtime != 0 && firstwritetime != 0 && newreadtime != 0 {
  713. stats.FirstReadTime = firstreadtime
  714. stats.FirstWriteTime = firstwritetime
  715. stats.NewReadTime = newreadtime
  716. }
  717. ac.BytesSent = sent
  718. ac.BytesReceived = received
  719. }
  720. func (c *Container) updateAcceptTrafficStats(ac *ActiveAccept, sent, received uint64) {
  721. if ac == nil {
  722. return
  723. }
  724. klog.Infoln("TCP onConnectionClose5", ac.BytesSent, ac.BytesReceived, ac)
  725. key := AddrPair{src: ac.Src, dst: ac.Dest}
  726. stats := c.acceptsSuccessful[key]
  727. if stats == nil {
  728. stats = &AcceptStats{}
  729. c.acceptsSuccessful[key] = stats
  730. }
  731. if sent > ac.BytesSent {
  732. stats.BytesSent += sent - ac.BytesSent
  733. }
  734. if received > ac.BytesReceived {
  735. stats.BytesReceived += received - ac.BytesReceived
  736. }
  737. ac.BytesSent = sent
  738. ac.BytesReceived = received
  739. }
  740. func (c *Container) onDNSRequest(r *l7.RequestData) (map[netaddr.IP]string, string, string) {
  741. status := r.Status.DNS()
  742. if status == "" {
  743. return nil, "", ""
  744. }
  745. t, fqdn, ips := l7.ParseDns(r.Payload)
  746. if t == "" {
  747. return nil, "", ""
  748. }
  749. if c.dnsStats.Requests == nil {
  750. dnsReq := L7Requests[l7.ProtocolDNS]
  751. c.dnsStats.Requests = prometheus.NewCounterVec(
  752. prometheus.CounterOpts{Name: dnsReq.Name, Help: dnsReq.Help},
  753. []string{"request_type", "domain", "status"},
  754. )
  755. }
  756. if m, _ := c.dnsStats.Requests.GetMetricWithLabelValues(t, fqdn, status); m != nil {
  757. m.Inc()
  758. }
  759. if r.Duration != 0 {
  760. if c.dnsStats.Latency == nil {
  761. dnsLatency := L7Latency[l7.ProtocolDNS]
  762. c.dnsStats.Latency = prometheus.NewHistogram(prometheus.HistogramOpts{Name: dnsLatency.Name, Help: dnsLatency.Help})
  763. }
  764. c.dnsStats.Latency.Observe(r.Duration.Seconds())
  765. }
  766. ip2fqdn := map[netaddr.IP]string{}
  767. if fqdn != "" {
  768. for _, ip := range ips {
  769. ip2fqdn[ip] = fqdn
  770. }
  771. }
  772. return ip2fqdn, t, fqdn
  773. }
  774. func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  775. c.lock.Lock()
  776. defer c.lock.Unlock()
  777. if r.Protocol == l7.ProtocolDNS {
  778. //return c.onDNSRequest(r)
  779. }
  780. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  781. if conn == nil {
  782. return nil
  783. }
  784. if timestamp != 0 && conn.Timestamp != timestamp {
  785. return nil
  786. }
  787. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  788. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  789. switch r.Protocol {
  790. case l7.ProtocolHTTP:
  791. stats.observe(r.Status.Http(), "", r.Duration)
  792. method, path := l7.ParseHttp(r.Payload)
  793. trace.HttpRequest(method, path, r.Status, r.Duration)
  794. case l7.ProtocolHTTP2:
  795. if conn.http2Parser == nil {
  796. conn.http2Parser = l7.NewHttp2Parser()
  797. }
  798. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  799. for _, req := range requests {
  800. stats.observe(req.Status.Http(), "", req.Duration)
  801. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  802. }
  803. case l7.ProtocolPostgres:
  804. if r.Method != l7.MethodStatementClose {
  805. stats.observe(r.Status.String(), "", r.Duration)
  806. }
  807. if conn.postgresParser == nil {
  808. conn.postgresParser = l7.NewPostgresParser()
  809. }
  810. query := conn.postgresParser.Parse(r.Payload)
  811. trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  812. case l7.ProtocolMysql:
  813. if r.Method != l7.MethodStatementClose {
  814. stats.observe(r.Status.String(), "", r.Duration)
  815. }
  816. if conn.mysqlParser == nil {
  817. conn.mysqlParser = l7.NewMysqlParser()
  818. }
  819. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  820. trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  821. case l7.ProtocolMemcached:
  822. stats.observe(r.Status.String(), "", r.Duration)
  823. cmd, items := l7.ParseMemcached(r.Payload)
  824. trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  825. case l7.ProtocolRedis:
  826. stats.observe(r.Status.String(), "", r.Duration)
  827. cmd, args := l7.ParseRedis(r.Payload)
  828. trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  829. case l7.ProtocolMongo:
  830. stats.observe(r.Status.String(), "", r.Duration)
  831. query := l7.ParseMongo(r.Payload)
  832. trace.MongoQuery(query, r.Status.Error(), r.Duration)
  833. case l7.ProtocolKafka, l7.ProtocolCassandra:
  834. stats.observe(r.Status.String(), "", r.Duration)
  835. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  836. stats.observe(r.Status.String(), r.Method.String(), 0)
  837. case l7.ProtocolDubbo2:
  838. stats.observe(r.Status.String(), "", r.Duration)
  839. }
  840. return nil
  841. }
  842. func (c *Container) onRetransmission(srcDst AddrPair) bool {
  843. c.lock.Lock()
  844. defer c.lock.Unlock()
  845. conn, ok := c.connectionsActive[srcDst]
  846. if !ok {
  847. return false
  848. }
  849. key := AddrPair{src: srcDst.dst, dst: conn.ActualDest}
  850. stats := c.connectsSuccessful[key]
  851. if stats == nil {
  852. stats = &ConnectionStats{}
  853. c.connectsSuccessful[key] = stats
  854. }
  855. stats.Retransmissions++
  856. return true
  857. }
  858. func (c *Container) updateDelays() {
  859. c.delaysLock.Lock()
  860. defer c.delaysLock.Unlock()
  861. for pid := range c.processes {
  862. stats, err := TaskstatsTGID(pid)
  863. if err != nil {
  864. continue
  865. }
  866. d := c.delaysByPid[pid]
  867. c.delays.cpu += stats.CPUDelay - d.cpu
  868. c.delays.disk += stats.BlockIODelay - d.disk
  869. d.cpu = stats.CPUDelay
  870. d.disk = stats.BlockIODelay
  871. c.delaysByPid[pid] = d
  872. }
  873. }
  874. func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
  875. if len(c.mounts) == 0 {
  876. return nil
  877. }
  878. res := map[string]map[string]*proc.FSStat{}
  879. for _, mi := range c.mounts {
  880. var stat *proc.FSStat
  881. for pid := range c.processes {
  882. s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
  883. if err == nil {
  884. stat = &s
  885. break
  886. }
  887. }
  888. if stat == nil {
  889. continue
  890. }
  891. if _, ok := res[mi.MajorMinor]; !ok {
  892. res[mi.MajorMinor] = map[string]*proc.FSStat{}
  893. }
  894. res[mi.MajorMinor][mi.MountPoint] = stat
  895. }
  896. return res
  897. }
  898. func (c *Container) getListens() map[netaddr.IPPort]int {
  899. res := map[netaddr.IPPort]int{}
  900. for addr, byPid := range c.listens {
  901. open := 0
  902. isHostNs := false
  903. ips := map[netaddr.IP]bool{}
  904. for pid, details := range byPid {
  905. p := c.processes[pid]
  906. if p == nil {
  907. continue
  908. }
  909. if p.isHostNs() {
  910. isHostNs = true
  911. }
  912. if details.ClosedAt.IsZero() {
  913. open = 1
  914. }
  915. for _, ip := range details.NsIPs {
  916. ips[ip] = true
  917. }
  918. }
  919. if !addr.IP().IsUnspecified() {
  920. ips = map[netaddr.IP]bool{addr.IP(): true}
  921. }
  922. for ip := range ips {
  923. if ip.IsLoopback() && !isHostNs {
  924. continue
  925. }
  926. res[netaddr.IPPortFrom(ip, addr.Port())] = open
  927. }
  928. }
  929. return res
  930. }
  931. func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {
  932. if len(c.metadata.hostListens) == 0 {
  933. return nil
  934. }
  935. hasUnspecified := false
  936. for _, addrs := range c.metadata.hostListens {
  937. for _, addr := range addrs {
  938. if addr.IP().IsUnspecified() {
  939. hasUnspecified = true
  940. break
  941. }
  942. }
  943. }
  944. var hostIps []netaddr.IP
  945. if hasUnspecified {
  946. if ns, err := proc.GetHostNetNs(); err != nil {
  947. klog.Warningln(err)
  948. } else {
  949. ips, err := proc.GetNsIps(ns)
  950. _ = ns.Close()
  951. if err != nil {
  952. klog.Warningln(err)
  953. } else {
  954. hostIps = ips
  955. }
  956. }
  957. }
  958. res := map[string]map[netaddr.IPPort]struct{}{}
  959. for proxy, addrs := range c.metadata.hostListens {
  960. res[proxy] = map[netaddr.IPPort]struct{}{}
  961. for _, addr := range addrs {
  962. if addr.IP().IsUnspecified() {
  963. for _, ip := range hostIps {
  964. if addr.IP().Is4() && ip.Is4() || addr.IP().Is6() && ip.Is6() {
  965. res[proxy][netaddr.IPPortFrom(ip, addr.Port())] = struct{}{}
  966. }
  967. }
  968. } else {
  969. res[proxy][addr] = struct{}{}
  970. }
  971. }
  972. }
  973. return res
  974. }
  975. func (c *Container) ping() map[netaddr.IP]float64 {
  976. netNs := netns.None()
  977. for pid := range c.processes {
  978. if pid == agentPid {
  979. netNs = selfNetNs
  980. break
  981. }
  982. ns, err := proc.GetNetNs(pid)
  983. if err != nil {
  984. if !common.IsNotExist(err) {
  985. klog.Warningln(err)
  986. }
  987. continue
  988. }
  989. netNs = ns
  990. defer netNs.Close()
  991. break
  992. }
  993. if !netNs.IsOpen() {
  994. return nil
  995. }
  996. ips := map[netaddr.IP]struct{}{}
  997. for d := range c.connectsSuccessful {
  998. ips[d.dst.IP()] = struct{}{}
  999. }
  1000. for dst := range c.connectsFailed {
  1001. ips[dst.IP()] = struct{}{}
  1002. }
  1003. if len(ips) == 0 {
  1004. return nil
  1005. }
  1006. targets := make([]netaddr.IP, 0, len(ips))
  1007. for ip := range ips {
  1008. if ip.IsLoopback() {
  1009. continue
  1010. }
  1011. if !ip.Is4() { // pinger doesn't support IPv6 yet
  1012. continue
  1013. }
  1014. targets = append(targets, ip)
  1015. }
  1016. rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout)
  1017. if err != nil {
  1018. klog.Warningln(err)
  1019. return nil
  1020. }
  1021. return rtt
  1022. }
  1023. func (c *Container) runLogParser(logPath string) {
  1024. if *flags.DisableLogParsing {
  1025. return
  1026. }
  1027. containerId := string(c.id)
  1028. if logPath != "" {
  1029. if c.logParsers[logPath] != nil {
  1030. return
  1031. }
  1032. ch := make(chan logparser.LogEntry)
  1033. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1034. reader, err := logs.NewTailReader(proc.HostPath(logPath), ch)
  1035. if err != nil {
  1036. klog.Warningln(err)
  1037. parser.Stop()
  1038. return
  1039. }
  1040. klog.Infoln("started varlog logparser", "cg", c.cgroup.Id, "log", logPath)
  1041. c.logParsers[logPath] = &LogParser{parser: parser, stop: reader.Stop}
  1042. return
  1043. }
  1044. switch c.cgroup.ContainerType {
  1045. case cgroup.ContainerTypeSystemdService:
  1046. ch := make(chan logparser.LogEntry)
  1047. if err := JournaldSubscribe(c.cgroup, ch); err != nil {
  1048. klog.Warningln(err)
  1049. return
  1050. }
  1051. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1052. stop := func() {
  1053. JournaldUnsubscribe(c.cgroup)
  1054. }
  1055. klog.Infoln("started journald logparser", "cg", c.cgroup.Id)
  1056. c.logParsers["journald"] = &LogParser{parser: parser, stop: stop}
  1057. case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd, cgroup.ContainerTypeCrio:
  1058. if c.metadata.logPath == "" {
  1059. return
  1060. }
  1061. if parser := c.logParsers["stdout/stderr"]; parser != nil {
  1062. parser.Stop()
  1063. delete(c.logParsers, "stdout/stderr")
  1064. }
  1065. ch := make(chan logparser.LogEntry)
  1066. parser := logparser.NewParser(ch, c.metadata.logDecoder, logs.OtelLogEmitter(containerId))
  1067. reader, err := logs.NewTailReader(proc.HostPath(c.metadata.logPath), ch)
  1068. if err != nil {
  1069. klog.Warningln(err)
  1070. parser.Stop()
  1071. return
  1072. }
  1073. klog.Infoln("started container logparser", "cg", c.cgroup.Id)
  1074. c.logParsers["stdout/stderr"] = &LogParser{parser: parser, stop: reader.Stop}
  1075. }
  1076. }
  1077. func (c *Container) gc(now time.Time) {
  1078. // c.lock.Lock()
  1079. // defer c.lock.Unlock()
  1080. established := map[AddrPair]struct{}{}
  1081. establishedDst := map[netaddr.IPPort]struct{}{}
  1082. listens := map[netaddr.IPPort]string{}
  1083. seenNamespaces := map[string]bool{}
  1084. fdMap := map[uint64]struct{}{}
  1085. for _, p := range c.processes {
  1086. if seenNamespaces[p.NetNsId()] {
  1087. continue
  1088. }
  1089. sockets, err := proc.GetSockets(p.Pid)
  1090. if err != nil {
  1091. continue
  1092. }
  1093. fds, err := proc.ReadFds(p.Pid)
  1094. if err == nil {
  1095. for _, fd := range fds {
  1096. fdMap[fd.Fd] = struct{}{}
  1097. }
  1098. }
  1099. for _, s := range sockets {
  1100. if s.Listen {
  1101. listens[s.SAddr] = s.Inode
  1102. } else {
  1103. established[AddrPair{src: s.SAddr, dst: s.DAddr}] = struct{}{}
  1104. establishedDst[s.DAddr] = struct{}{}
  1105. }
  1106. }
  1107. seenNamespaces[p.NetNsId()] = true
  1108. }
  1109. c.revalidateListens(now, listens)
  1110. for srcDst, conn := range c.connectionsActive {
  1111. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1112. if _, ok := established[srcDst]; !ok {
  1113. delete(c.connectionsActive, srcDst)
  1114. if conn == c.connectionsByPidFd[pidFd] {
  1115. delete(c.connectionsByPidFd, pidFd)
  1116. }
  1117. continue
  1118. }
  1119. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1120. delete(c.connectionsActive, srcDst)
  1121. if conn == c.connectionsByPidFd[pidFd] {
  1122. delete(c.connectionsByPidFd, pidFd)
  1123. }
  1124. }
  1125. }
  1126. for srcDst, conn := range c.acceptsActive {
  1127. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1128. if _, ok := established[srcDst]; !ok {
  1129. delete(c.acceptsActive, srcDst)
  1130. if conn == c.acceptsByPidFd[pidFd] {
  1131. delete(c.acceptsByPidFd, pidFd)
  1132. }
  1133. continue
  1134. }
  1135. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1136. delete(c.acceptsActive, srcDst)
  1137. if conn == c.acceptsByPidFd[pidFd] {
  1138. delete(c.acceptsByPidFd, pidFd)
  1139. }
  1140. }
  1141. }
  1142. for _, conn := range c.connectionsByPidFd {
  1143. if _, ok := fdMap[conn.Fd]; !ok {
  1144. delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1145. }
  1146. }
  1147. for _, conn := range c.acceptsByPidFd {
  1148. if _, ok := fdMap[conn.Fd]; !ok {
  1149. delete(c.acceptsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1150. }
  1151. }
  1152. for dst, at := range c.connectLastAttempt {
  1153. _, active := establishedDst[dst]
  1154. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1155. delete(c.connectLastAttempt, dst)
  1156. delete(c.connectsFailed, dst)
  1157. for d := range c.connectsSuccessful {
  1158. if d.src == dst {
  1159. delete(c.connectsSuccessful, d)
  1160. }
  1161. }
  1162. c.l7Stats.delete(dst)
  1163. }
  1164. }
  1165. for dst, at := range c.acceptLastAttempt {
  1166. _, active := establishedDst[dst]
  1167. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1168. delete(c.acceptLastAttempt, dst)
  1169. for d := range c.acceptsSuccessful {
  1170. if d.src == dst {
  1171. delete(c.acceptsSuccessful, d)
  1172. }
  1173. }
  1174. c.l7Stats.delete(dst)
  1175. }
  1176. }
  1177. }
  1178. func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.IPPort]string) {
  1179. for addr, byPid := range c.listens {
  1180. if _, open := actualListens[addr]; open {
  1181. continue
  1182. }
  1183. klog.Warningln("deleting the outdated listen:", addr)
  1184. for _, details := range byPid {
  1185. if details.ClosedAt.IsZero() {
  1186. details.ClosedAt = now
  1187. }
  1188. }
  1189. }
  1190. missingListens := map[netaddr.IPPort]string{}
  1191. for addr, inode := range actualListens {
  1192. byPids, found := c.listens[addr]
  1193. if !found {
  1194. missingListens[addr] = inode
  1195. continue
  1196. }
  1197. open := false
  1198. for _, details := range byPids {
  1199. if details.ClosedAt.IsZero() {
  1200. open = true
  1201. break
  1202. }
  1203. }
  1204. if !open {
  1205. missingListens[addr] = inode
  1206. }
  1207. }
  1208. if len(missingListens) > 0 {
  1209. inodeToPid := map[string]uint32{}
  1210. for pid := range c.processes {
  1211. fds, err := proc.ReadFds(pid)
  1212. if err != nil {
  1213. klog.Warningln(err)
  1214. continue
  1215. }
  1216. for _, fd := range fds {
  1217. if fd.SocketInode != "" {
  1218. inodeToPid[fd.SocketInode] = pid
  1219. }
  1220. }
  1221. }
  1222. for addr, inode := range missingListens {
  1223. pid, found := inodeToPid[inode]
  1224. if !found {
  1225. continue
  1226. }
  1227. //klog.Warningln("missing listen found:", addr, pid)
  1228. c.onListenOpen(pid, addr, true)
  1229. }
  1230. }
  1231. for addr, pids := range c.listens {
  1232. for pid, details := range pids {
  1233. if !details.ClosedAt.IsZero() && now.Sub(details.ClosedAt) > gcInterval {
  1234. delete(c.listens[addr], pid)
  1235. }
  1236. }
  1237. if len(c.listens[addr]) == 0 {
  1238. delete(c.listens, addr)
  1239. }
  1240. }
  1241. }
  1242. func (c *Container) attachUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1243. klog.Infoln("[attach] attachUprobes start")
  1244. if tracer.DisableL7Tracing() {
  1245. return nil
  1246. }
  1247. codeType := c.GetCodeTypeFromCache(pid)
  1248. if codeType.IsUnknownCode() {
  1249. return nil
  1250. }
  1251. var err error
  1252. switch codeType {
  1253. case CodeTypeJava:
  1254. err = c.attachJVMUprobes(tracer, pid)
  1255. case CodeTypeJavaAot:
  1256. err = c.attachJVMUprobes(tracer, pid)
  1257. case CodeTypeGo:
  1258. err = c.attachTlsUprobes(tracer, pid)
  1259. case CodeTypeNetCoreAot:
  1260. err = c.attachNetCoreUprobes(tracer, pid)
  1261. }
  1262. if err != nil {
  1263. c.errorClose(pid)
  1264. return err
  1265. }
  1266. c.l7AttachSuccess()
  1267. return nil
  1268. }
  1269. func (c *Container) GetCodeTypeFromCache(pid uint32) CodeType {
  1270. p := c.processes[pid]
  1271. if p == nil {
  1272. return CodeTypeUnknown
  1273. }
  1274. if p.codeType.IsWaitCheck() {
  1275. p.codeType = GetExeType(pid, c.getRootfs())
  1276. }
  1277. return p.codeType
  1278. }
  1279. func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1280. p := c.processes[pid]
  1281. if p == nil {
  1282. return nil
  1283. }
  1284. if !p.openSslUprobesChecked {
  1285. sslProbes, err := tracer.AttachOpenSslUprobes(pid)
  1286. if err != nil {
  1287. return err
  1288. }
  1289. p.uprobes = append(p.uprobes, sslProbes...)
  1290. p.openSslUprobesChecked = true
  1291. }
  1292. if !p.goTlsUprobesChecked {
  1293. codeType := c.GetCodeTypeFromCache(pid)
  1294. goProbes, err := tracer.AttachGoTlsUprobes(pid, &c.AppInfo, uint16(codeType))
  1295. if err != nil {
  1296. return err
  1297. }
  1298. p.uprobes = append(p.uprobes, goProbes...)
  1299. p.goTlsUprobesChecked = true
  1300. }
  1301. return nil
  1302. }
  1303. func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1304. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1305. return nil
  1306. }
  1307. p := c.processes[pid]
  1308. if p == nil {
  1309. return nil
  1310. }
  1311. codeType := c.GetCodeTypeFromCache(pid)
  1312. if !p.jvmUprobesChecked {
  1313. tracer.InitKProcInfo(pid, &c.AppInfo)
  1314. libNioProbes, err := tracer.AttachJavaNioReadUprobes(pid, codeType, c.getRootfs())
  1315. if err != nil {
  1316. klog.Error(err)
  1317. return err
  1318. }
  1319. p.uprobes = append(p.uprobes, libNioProbes...)
  1320. libNetProbes, err := tracer.AttachJavaNetWriteUprobes(pid, c.getRootfs())
  1321. if err != nil {
  1322. klog.Error(err)
  1323. return err
  1324. }
  1325. p.uprobes = append(p.uprobes, libNetProbes...)
  1326. p.jvmUprobesChecked = true
  1327. } else {
  1328. klog.Infof("[attach] %s-%d already attach", codeType.String(), pid)
  1329. }
  1330. return nil
  1331. }
  1332. func (c *Container) errorClose(pid uint32) {
  1333. p := c.processes[pid]
  1334. if p != nil {
  1335. p.DynamicClose()
  1336. }
  1337. }
  1338. func (c *Container) attachNetCoreUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1339. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1340. return nil
  1341. }
  1342. p := c.processes[pid]
  1343. if p == nil {
  1344. return nil
  1345. }
  1346. if !p.jvmUprobesChecked {
  1347. //codeType := c.GetCodeTypeFromCache(pid)
  1348. tracer.InitKProcInfo(pid, &c.AppInfo)
  1349. p.uprobes = append(p.uprobes, tracer.AttachNetCoreNetThreadUprobes(pid)...)
  1350. readProbes, err := tracer.AttachNetCoreNetReadUprobes(pid)
  1351. if err != nil {
  1352. klog.Error(err)
  1353. return err
  1354. }
  1355. p.uprobes = append(p.uprobes, readProbes...)
  1356. WriteProbes, err := tracer.AttachNetCoreNetWriteUprobes(pid)
  1357. if err != nil {
  1358. klog.Error(err)
  1359. return err
  1360. }
  1361. p.uprobes = append(p.uprobes, WriteProbes...)
  1362. p.jvmUprobesChecked = true
  1363. }
  1364. return nil
  1365. }
  1366. func resolveFd(pid uint32, fd uint64) (mntId string, logPath string) {
  1367. info := proc.GetFdInfo(pid, fd)
  1368. if info == nil {
  1369. return
  1370. }
  1371. switch {
  1372. case info.Flags&os.O_WRONLY == 0 && info.Flags&os.O_RDWR == 0,
  1373. !strings.HasPrefix(info.Dest, "/"),
  1374. strings.HasPrefix(info.Dest, "/proc/"),
  1375. strings.HasPrefix(info.Dest, "/dev/"),
  1376. strings.HasPrefix(info.Dest, "/sys/"),
  1377. strings.HasSuffix(info.Dest, "(deleted)"):
  1378. return
  1379. }
  1380. mntId = info.MntId
  1381. if info.Flags&os.O_WRONLY != 0 && strings.HasPrefix(info.Dest, "/var/log/") &&
  1382. !strings.HasPrefix(info.Dest, "/var/log/pods/") &&
  1383. !strings.HasPrefix(info.Dest, "/var/log/containers/") &&
  1384. !strings.HasPrefix(info.Dest, "/var/log/journal/") {
  1385. logPath = info.Dest
  1386. }
  1387. return
  1388. }
  1389. func counter(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1390. return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labelValues...)
  1391. }
  1392. func gauge(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1393. return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...)
  1394. }