container.go 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535
  1. package containers
  2. import (
  3. debugelf "debug/elf"
  4. "os"
  5. "sort"
  6. "strings"
  7. "sync"
  8. "time"
  9. . "github.com/coroot/coroot-node-agent/utils/modelse"
  10. "github.com/coroot/coroot-node-agent/cgroup"
  11. "github.com/coroot/coroot-node-agent/common"
  12. "github.com/coroot/coroot-node-agent/ebpftracer"
  13. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  14. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  15. "github.com/coroot/coroot-node-agent/flags"
  16. "github.com/coroot/coroot-node-agent/logs"
  17. "github.com/coroot/coroot-node-agent/node"
  18. "github.com/coroot/coroot-node-agent/pinger"
  19. "github.com/coroot/coroot-node-agent/proc"
  20. "github.com/coroot/coroot-node-agent/tracing"
  21. "github.com/coroot/logparser"
  22. "github.com/prometheus/client_golang/prometheus"
  23. klog "github.com/sirupsen/logrus"
  24. "github.com/vishvananda/netns"
  25. "golang.org/x/exp/maps"
  26. "inet.af/netaddr"
  27. )
  28. var (
  29. gcInterval = 10 * time.Second
  30. pingTimeout = 300 * time.Millisecond
  31. )
  32. type ContainerID string
  33. type ContainerNetwork struct {
  34. NetworkID string
  35. }
  36. type ContainerMetadata struct {
  37. name string
  38. labels map[string]string
  39. volumes map[string]string
  40. logPath string
  41. image string
  42. logDecoder logparser.Decoder
  43. hostListens map[string][]netaddr.IPPort
  44. networks map[string]ContainerNetwork
  45. env map[string]string
  46. systemdTriggeredBy string
  47. }
  48. type Delays struct {
  49. cpu time.Duration
  50. disk time.Duration
  51. }
  52. type LogParser struct {
  53. parser *logparser.Parser
  54. stop func()
  55. }
  56. func (p *LogParser) Stop() {
  57. if p.stop != nil {
  58. p.stop()
  59. }
  60. p.parser.Stop()
  61. }
  62. type AddrPair struct {
  63. src netaddr.IPPort
  64. dst netaddr.IPPort
  65. }
  66. type ActiveConnection struct {
  67. Dest netaddr.IPPort
  68. Src netaddr.IPPort
  69. ActualDest netaddr.IPPort
  70. Pid uint32
  71. Fd uint64
  72. Timestamp uint64
  73. Closed time.Time
  74. BytesSent uint64
  75. BytesReceived uint64
  76. http2Parser *l7.Http2Parser
  77. postgresParser *l7.PostgresParser
  78. mysqlParser *l7.MysqlParser
  79. dmParser *l7.DmParser
  80. }
  81. type ActiveAccept struct {
  82. Dest netaddr.IPPort
  83. Src netaddr.IPPort
  84. Pid uint32
  85. Fd uint64
  86. Timestamp uint64
  87. Closed time.Time
  88. BytesSent uint64
  89. BytesReceived uint64
  90. }
  91. type ListenDetails struct {
  92. ClosedAt time.Time
  93. NsIPs []netaddr.IP
  94. }
  95. type PidFd struct {
  96. Pid uint32
  97. Fd uint64
  98. }
  99. type K8sContainer struct {
  100. ns string
  101. podName string
  102. podId string
  103. workload string
  104. containerName string
  105. pid string
  106. }
  107. type ConnectionStats struct {
  108. Count uint64
  109. TotalTime time.Duration
  110. Retransmissions uint64
  111. BytesSent uint64
  112. PerBytesSent uint64
  113. BytesReceived uint64
  114. PerBytesReceived uint64
  115. Src netaddr.IPPort
  116. ConEstTime time.Duration
  117. FirstReadTime uint64
  118. FirstWriteTime uint64
  119. NewReadTime uint64
  120. }
  121. type AcceptStats struct {
  122. BytesSent uint64
  123. BytesReceived uint64
  124. }
  125. type Container struct {
  126. id ContainerID
  127. cgroup *cgroup.Cgroup
  128. metadata *ContainerMetadata
  129. K8sContainer
  130. processes map[uint32]*Process
  131. startedAt time.Time
  132. zombieAt time.Time
  133. restarts int
  134. delays Delays
  135. delaysByPid map[uint32]Delays
  136. delaysLock sync.Mutex
  137. listens map[netaddr.IPPort]map[uint32]*ListenDetails
  138. connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count
  139. connectsFailed map[netaddr.IPPort]int64 // dst -> count
  140. connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  141. connectionsActive map[AddrPair]*ActiveConnection
  142. connectionsByPidFd map[PidFd]*ActiveConnection
  143. acceptsSuccessful map[AddrPair]*AcceptStats
  144. acceptLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  145. acceptsActive map[AddrPair]*ActiveAccept
  146. acceptsByPidFd map[PidFd]*ActiveAccept
  147. l7Stats L7Stats
  148. dnsStats *L7Metrics
  149. oomKills int
  150. pythonThreadLockWaitTime time.Duration
  151. mounts map[string]proc.MountInfo
  152. logParsers map[string]*LogParser
  153. hostConntrack *Conntrack
  154. nsConntrack *Conntrack
  155. lbConntracks []*Conntrack
  156. registry *Registry
  157. lock sync.RWMutex
  158. done chan struct{}
  159. traceMap map[uint64]*tracing.Trace
  160. //instanceID utils.ID
  161. Symbols []debugelf.Symbol
  162. Uprobes []tracer.Uprobe
  163. UprobesMap map[string]tracer.Uprobe
  164. l7EventReady bool
  165. l7Attach bool
  166. // 白名单详情
  167. WhiteSettingInfo WhiteSettingInfo
  168. // 应用详情
  169. AppInfo AppInfo
  170. }
  171. func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) {
  172. netNs, err := proc.GetNetNs(pid)
  173. if err != nil {
  174. return nil, err
  175. }
  176. defer netNs.Close()
  177. c := &Container{
  178. id: id,
  179. cgroup: cg,
  180. metadata: md,
  181. processes: map[uint32]*Process{},
  182. delaysByPid: map[uint32]Delays{},
  183. listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
  184. connectsSuccessful: map[AddrPair]*ConnectionStats{},
  185. connectsFailed: map[netaddr.IPPort]int64{},
  186. connectLastAttempt: map[netaddr.IPPort]time.Time{},
  187. connectionsActive: map[AddrPair]*ActiveConnection{},
  188. connectionsByPidFd: map[PidFd]*ActiveConnection{},
  189. acceptsSuccessful: map[AddrPair]*AcceptStats{},
  190. acceptLastAttempt: map[netaddr.IPPort]time.Time{},
  191. acceptsActive: map[AddrPair]*ActiveAccept{},
  192. acceptsByPidFd: map[PidFd]*ActiveAccept{},
  193. l7Stats: L7Stats{},
  194. dnsStats: &L7Metrics{},
  195. mounts: map[string]proc.MountInfo{},
  196. logParsers: map[string]*LogParser{},
  197. hostConntrack: hostConntrack,
  198. done: make(chan struct{}),
  199. traceMap: make(map[uint64]*tracing.Trace),
  200. registry: registry,
  201. }
  202. for _, n := range md.networks {
  203. if nsHandle := FindNetworkLoadBalancerNs(n.NetworkID); nsHandle.IsOpen() {
  204. if ct, err := NewConntrack(nsHandle); err != nil {
  205. klog.Warningln(err)
  206. } else {
  207. c.lbConntracks = append(c.lbConntracks, ct)
  208. }
  209. _ = nsHandle.Close()
  210. }
  211. }
  212. c.runLogParser("")
  213. // go func() {
  214. // ticker := time.NewTicker(gcInterval)
  215. // defer ticker.Stop()
  216. // for {
  217. // select {
  218. // case <-c.done:
  219. // return
  220. // case t := <-ticker.C:
  221. // c.gc(t)
  222. // }
  223. // }
  224. // }()
  225. return c, nil
  226. }
  227. func (c *Container) Close() {
  228. for _, p := range c.logParsers {
  229. p.Stop()
  230. }
  231. for _, ct := range c.lbConntracks {
  232. _ = ct.Close()
  233. }
  234. if c.nsConntrack != nil {
  235. _ = c.nsConntrack.Close()
  236. }
  237. close(c.done)
  238. }
  239. func (c *Container) Dead(now time.Time) bool {
  240. return !c.zombieAt.IsZero() && now.Sub(c.zombieAt) > gcInterval
  241. }
  242. func (c *Container) Describe(ch chan<- *prometheus.Desc) {
  243. // some fixed metric description is required here to register/unregister the collector correctly
  244. ch <- prometheus.NewDesc("container", "", nil, nil)
  245. }
  246. func (c *Container) Collect(ch chan<- prometheus.Metric) {
  247. c.registry.updateTrafficStatsIfNecessary()
  248. c.lock.RLock()
  249. defer c.lock.RUnlock()
  250. if c.metadata.image != "" || c.metadata.systemdTriggeredBy != "" {
  251. ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image, c.metadata.systemdTriggeredBy)
  252. }
  253. ch <- counter(metrics.Restarts, float64(c.restarts))
  254. if cpu, err := c.cgroup.CpuStat(); err == nil {
  255. if cpu.LimitCores > 0 {
  256. ch <- gauge(metrics.CPULimit, cpu.LimitCores)
  257. }
  258. ch <- counter(metrics.CPUUsage, cpu.UsageSeconds)
  259. ch <- counter(metrics.ThrottledTime, cpu.ThrottledTimeSeconds)
  260. }
  261. if taskstatsClient != nil {
  262. c.updateDelays()
  263. ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second))
  264. ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second))
  265. }
  266. if s, err := c.cgroup.MemoryStat(); err == nil {
  267. ch <- gauge(metrics.MemoryRss, float64(s.RSS))
  268. ch <- gauge(metrics.MemoryCache, float64(s.Cache))
  269. if s.Limit > 0 {
  270. ch <- gauge(metrics.MemoryLimit, float64(s.Limit))
  271. }
  272. }
  273. if c.oomKills > 0 {
  274. ch <- counter(metrics.OOMKills, float64(c.oomKills))
  275. }
  276. if disks, err := node.GetDisks(); err == nil {
  277. ioStat, _ := c.cgroup.IOStat()
  278. for majorMinor, mounts := range c.getMounts() {
  279. dev := disks.GetParentBlockDevice(majorMinor)
  280. if dev == nil {
  281. continue
  282. }
  283. for mountPoint, fsStat := range mounts {
  284. dls := []string{mountPoint, dev.Name, c.metadata.volumes[mountPoint]}
  285. ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...)
  286. ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...)
  287. ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...)
  288. if io, ok := ioStat[majorMinor]; ok {
  289. ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...)
  290. ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...)
  291. ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...)
  292. ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...)
  293. }
  294. }
  295. }
  296. }
  297. for addr, open := range c.getListens() {
  298. ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "")
  299. }
  300. for proxy, addrs := range c.getProxiedListens() {
  301. for addr := range addrs {
  302. ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy)
  303. }
  304. }
  305. for d, stats := range c.connectsSuccessful {
  306. ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), d.src.String(), d.dst.String())
  307. ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), d.src.String(), d.dst.String())
  308. if stats.Retransmissions > 0 {
  309. ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), d.src.String(), d.dst.String())
  310. }
  311. ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), d.dst.String(), d.dst.String(),stats.Src.String())
  312. ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), d.dst.String(), d.dst.String(),stats.Src.String())
  313. ch <- counter(metrics.NetBytesSentPer, float64(stats.PerBytesSent), d.dst.String(), d.dst.String(),stats.Src.String())
  314. ch <- counter(metrics.NetBytesReceivedPer, float64(stats.PerBytesReceived), d.dst.String(), d.dst.String(),stats.Src.String())
  315. ch <- counter(metrics.NetDataLatency, float64(stats.FirstReadTime-stats.FirstWriteTime), d.dst.String(), d.dst.String(),stats.Src.String())
  316. ch <- counter(metrics.NetDataDuration, float64(stats.NewReadTime-stats.FirstWriteTime), d.dst.String(), d.dst.String(),stats.Src.String())
  317. ch <- counter(metrics.NetEstTime, float64(stats.ConEstTime), d.dst.String(), d.dst.String(),stats.Src.String())
  318. klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime)
  319. stats.PerBytesReceived = 0
  320. stats.PerBytesSent = 0
  321. }
  322. // for d, stats := range c.acceptsSuccessful {
  323. // ch <- counter(metrics.NetAcceptsSuccessful, float64(0), d.src.String(), d.dst.String())
  324. // ch <- counter(metrics.NetAcceptBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String())
  325. // ch <- counter(metrics.NetAcceptBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String())
  326. // klog.Infof("c.acceptsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d", d.src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived)
  327. // }
  328. for dst, count := range c.connectsFailed {
  329. ch <- counter(metrics.NetConnectionsFailed, float64(count), dst.String())
  330. }
  331. connections := map[AddrPair]int{}
  332. for addrPair, conn := range c.connectionsActive {
  333. if !conn.Closed.IsZero() {
  334. continue
  335. }
  336. connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest}]++
  337. }
  338. for d, count := range connections {
  339. ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
  340. }
  341. for source, p := range c.logParsers {
  342. for _, c := range p.parser.GetCounters() {
  343. ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
  344. }
  345. }
  346. appTypes := map[string]struct{}{}
  347. seenJvms := map[string]bool{}
  348. seenDotNetApps := map[string]bool{}
  349. pids := maps.Keys(c.processes)
  350. sort.Slice(pids, func(i, j int) bool {
  351. return pids[i] < pids[j]
  352. })
  353. for _, pid := range pids {
  354. process := c.processes[pid]
  355. cmdline := proc.GetCmdline(pid)
  356. if len(cmdline) == 0 {
  357. continue
  358. }
  359. appType := guessApplicationType(cmdline)
  360. if appType != "" {
  361. appTypes[appType] = struct{}{}
  362. }
  363. if process.isGolangApp {
  364. appTypes["golang"] = struct{}{}
  365. }
  366. switch {
  367. case isJvm(cmdline):
  368. jvm, jMetrics := jvmMetrics(pid)
  369. if len(jMetrics) > 0 && !seenJvms[jvm] {
  370. seenJvms[jvm] = true
  371. for _, m := range jMetrics {
  372. ch <- m
  373. }
  374. }
  375. case process.dotNetMonitor != nil:
  376. appTypes["dotnet"] = struct{}{}
  377. appName := process.dotNetMonitor.AppName()
  378. if !seenDotNetApps[appName] {
  379. seenDotNetApps[appName] = true
  380. process.dotNetMonitor.Collect(ch)
  381. }
  382. }
  383. }
  384. for appType := range appTypes {
  385. ch <- gauge(metrics.ApplicationType, 1, appType)
  386. }
  387. if c.pythonThreadLockWaitTime > 0 {
  388. ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds())
  389. }
  390. if c.dnsStats.Requests != nil {
  391. c.dnsStats.Requests.Collect(ch)
  392. }
  393. if c.dnsStats.Latency != nil {
  394. c.dnsStats.Latency.Collect(ch)
  395. }
  396. c.l7Stats.collect(ch)
  397. if !*flags.DisablePinger {
  398. for ip, rtt := range c.ping() {
  399. ch <- gauge(metrics.NetLatency, rtt, ip.String())
  400. }
  401. }
  402. c.gc(time.Now())
  403. }
  404. func (c *Container) onProcessStart(pid uint32) *Process {
  405. c.lock.Lock()
  406. defer c.lock.Unlock()
  407. stats, err := TaskstatsPID(pid)
  408. if err != nil {
  409. return nil
  410. }
  411. c.zombieAt = time.Time{}
  412. p := NewProcess(pid, stats, c.registry.tracer)
  413. if p == nil {
  414. return nil
  415. }
  416. c.processes[pid] = p
  417. if c.startedAt.IsZero() {
  418. c.startedAt = stats.BeginTime
  419. } else {
  420. min := stats.BeginTime
  421. for _, p := range c.processes {
  422. if p.StartedAt.Before(min) {
  423. min = p.StartedAt
  424. }
  425. }
  426. if min.After(c.startedAt) {
  427. c.restarts++
  428. c.startedAt = min
  429. }
  430. }
  431. return p
  432. }
  433. func (c *Container) onProcessExit(pid uint32, oomKill bool) {
  434. c.lock.Lock()
  435. defer c.lock.Unlock()
  436. if p := c.processes[pid]; p != nil {
  437. p.Close()
  438. }
  439. delete(c.processes, pid)
  440. if len(c.processes) == 0 {
  441. c.zombieAt = time.Now()
  442. }
  443. delete(c.delaysByPid, pid)
  444. if oomKill {
  445. c.oomKills++
  446. }
  447. }
  448. func (c *Container) onFileOpen(pid uint32, fd uint64) {
  449. mntId, logPath := resolveFd(pid, fd)
  450. func() {
  451. if mntId == "" {
  452. return
  453. }
  454. c.lock.Lock()
  455. _, ok := c.mounts[mntId]
  456. c.lock.Unlock()
  457. if ok {
  458. return
  459. }
  460. byMountId := proc.GetMountInfo(pid)
  461. if byMountId == nil {
  462. return
  463. }
  464. if mi, ok := byMountId[mntId]; ok {
  465. c.lock.Lock()
  466. c.mounts[mntId] = mi
  467. c.lock.Unlock()
  468. }
  469. }()
  470. if logPath != "" {
  471. c.lock.Lock()
  472. c.runLogParser(logPath)
  473. c.lock.Unlock()
  474. }
  475. }
  476. // set
  477. func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
  478. klog.Infof("TCP listen open pid=%d id=%s addr=%s", pid, c.id, addr)
  479. if common.PortFilter.ShouldBeSkipped(addr.Port()) {
  480. return
  481. }
  482. if !safe {
  483. c.lock.Lock()
  484. defer c.lock.Unlock()
  485. }
  486. if _, ok := c.listens[addr]; !ok {
  487. c.listens[addr] = map[uint32]*ListenDetails{}
  488. }
  489. details := &ListenDetails{}
  490. c.listens[addr][pid] = details
  491. if addr.IP().IsUnspecified() {
  492. ns, err := proc.GetNetNs(pid)
  493. if err != nil {
  494. if !common.IsNotExist(err) {
  495. klog.Warningln(err)
  496. }
  497. return
  498. }
  499. defer ns.Close()
  500. ips, err := proc.GetNsIps(ns)
  501. if err != nil {
  502. klog.Warningln(err)
  503. return
  504. }
  505. klog.Infof("got IPs %s for %s", ips, ns.UniqueId())
  506. details.NsIPs = ips
  507. }
  508. }
  509. func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
  510. klog.Infof("TCP listen close pid=%d id=%s addr=%s", pid, c.id, addr)
  511. c.lock.Lock()
  512. defer c.lock.Unlock()
  513. if _, byAddr := c.listens[addr]; byAddr {
  514. if _, byPid := c.listens[addr][pid]; byPid {
  515. if details := c.listens[addr][pid]; details != nil {
  516. details.ClosedAt = time.Now()
  517. }
  518. }
  519. }
  520. }
  521. func (c *Container) onAcceptOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  522. klog.Infof("accept pid=%d id=%s dstaddr=%s srcaddr=%s", pid, c.id, dst.IP(), src.IP())
  523. // if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  524. // return
  525. // }
  526. p := c.processes[pid]
  527. if p == nil {
  528. return
  529. }
  530. if dst.IP().IsLoopback() && !p.isHostNs() {
  531. return
  532. }
  533. // actualDst, err := c.getActualDestination(p, src, dst)
  534. // if err != nil {
  535. // if !common.IsNotExist(err) {
  536. // klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  537. // }
  538. // return
  539. // }
  540. // switch {
  541. // case actualDst == nil:
  542. // actualDst = &dst
  543. // case actualDst.IP().IsLoopback() && !p.isHostNs():
  544. // return
  545. // }
  546. // if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  547. // return
  548. // }
  549. c.lock.Lock()
  550. defer c.lock.Unlock()
  551. if !failed {
  552. key := AddrPair{src: dst, dst: src}
  553. stats := c.acceptsSuccessful[key]
  554. if stats == nil {
  555. stats = &AcceptStats{}
  556. c.acceptsSuccessful[key] = stats
  557. }
  558. acceptCon := &ActiveAccept{
  559. Dest: src,
  560. Src: dst,
  561. Pid: pid,
  562. Fd: fd,
  563. Timestamp: timestamp,
  564. }
  565. c.acceptsActive[AddrPair{src: dst, dst: src}] = acceptCon
  566. c.acceptsByPidFd[PidFd{Pid: pid, Fd: fd}] = acceptCon
  567. }
  568. c.acceptLastAttempt[dst] = time.Now()
  569. }
  570. func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  571. if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  572. return
  573. }
  574. p := c.processes[pid]
  575. if p == nil {
  576. return
  577. }
  578. if dst.IP().IsLoopback() && !p.isHostNs() {
  579. return
  580. }
  581. actualDst, err := c.getActualDestination(p, src, dst)
  582. if err != nil {
  583. if !common.IsNotExist(err) {
  584. klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  585. }
  586. return
  587. }
  588. switch {
  589. case actualDst == nil:
  590. actualDst = &dst
  591. case actualDst.IP().IsLoopback() && !p.isHostNs():
  592. return
  593. }
  594. if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  595. return
  596. }
  597. c.lock.Lock()
  598. defer c.lock.Unlock()
  599. if failed {
  600. c.connectsFailed[dst]++
  601. } else {
  602. key := AddrPair{src: dst, dst: *actualDst}
  603. stats := c.connectsSuccessful[key]
  604. if stats == nil {
  605. stats = &ConnectionStats{}
  606. c.connectsSuccessful[key] = stats
  607. }
  608. stats.Count++
  609. stats.TotalTime += duration
  610. stats.Src = src
  611. stats.ConEstTime = duration
  612. connection := &ActiveConnection{
  613. Dest: dst,
  614. Src: src,
  615. ActualDest: *actualDst,
  616. Pid: pid,
  617. Fd: fd,
  618. Timestamp: timestamp,
  619. }
  620. c.connectionsActive[AddrPair{src: src, dst: dst}] = connection
  621. c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection
  622. }
  623. c.connectLastAttempt[dst] = time.Now()
  624. }
  625. func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) {
  626. if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil {
  627. return actualDst, nil
  628. }
  629. for _, lb := range c.lbConntracks {
  630. if actualDst := lb.GetActualDestination(src, dst); actualDst != nil {
  631. return actualDst, nil
  632. }
  633. }
  634. actualDst := c.hostConntrack.GetActualDestination(src, dst)
  635. if actualDst != nil {
  636. return actualDst, nil
  637. }
  638. if !p.isHostNs() {
  639. if c.nsConntrack == nil {
  640. netNs, err := proc.GetNetNs(p.Pid)
  641. if err != nil {
  642. return nil, err
  643. }
  644. defer netNs.Close()
  645. c.nsConntrack, err = NewConntrack(netNs)
  646. if err != nil {
  647. return nil, err
  648. }
  649. }
  650. return c.nsConntrack.GetActualDestination(src, dst), nil
  651. }
  652. return nil, nil
  653. }
  654. func (c *Container) onConnectionClose(e ebpftracer.Event) {
  655. c.lock.Lock()
  656. conn := c.connectionsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  657. c.lock.Unlock()
  658. if conn != nil {
  659. if conn.Closed.IsZero() {
  660. if e.TrafficStats != nil {
  661. c.lock.Lock()
  662. c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived, e.FirstReadTime, e.FirstWriteTime, e.NewReadTime)
  663. c.lock.Unlock()
  664. }
  665. conn.Closed = time.Now()
  666. }
  667. }
  668. }
  669. func (c *Container) onAcceptClose(e ebpftracer.Event) {
  670. c.lock.Lock()
  671. conn := c.acceptsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  672. c.lock.Unlock()
  673. if conn != nil {
  674. if conn.Closed.IsZero() {
  675. if e.TrafficStats != nil {
  676. c.lock.Lock()
  677. c.updateAcceptTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived)
  678. c.lock.Unlock()
  679. }
  680. conn.Closed = time.Now()
  681. }
  682. }
  683. }
  684. func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) {
  685. if u == nil {
  686. return
  687. }
  688. c.lock.Lock()
  689. defer c.lock.Unlock()
  690. c.updateConnectionTrafficStats(c.connectionsByPidFd[PidFd{Pid: u.Pid, Fd: u.FD}], u.BytesSent, u.BytesReceived, 0, 0, 0)
  691. }
  692. func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, received, firstreadtime, firstwritetime, newreadtime uint64) {
  693. if ac == nil {
  694. return
  695. }
  696. key := AddrPair{src: ac.Dest, dst: ac.ActualDest}
  697. stats := c.connectsSuccessful[key]
  698. if stats == nil {
  699. stats = &ConnectionStats{}
  700. c.connectsSuccessful[key] = stats
  701. }
  702. if sent > ac.BytesSent {
  703. stats.BytesSent += sent - ac.BytesSent
  704. stats.PerBytesSent = sent - ac.BytesSent
  705. }
  706. if received > ac.BytesReceived {
  707. stats.BytesReceived += received - ac.BytesReceived
  708. stats.PerBytesReceived = received - ac.BytesReceived
  709. }
  710. if firstreadtime != 0 && firstwritetime != 0 && newreadtime != 0 {
  711. stats.FirstReadTime = firstreadtime
  712. stats.FirstWriteTime = firstwritetime
  713. stats.NewReadTime = newreadtime
  714. }
  715. ac.BytesSent = sent
  716. ac.BytesReceived = received
  717. }
  718. func (c *Container) updateAcceptTrafficStats(ac *ActiveAccept, sent, received uint64) {
  719. if ac == nil {
  720. return
  721. }
  722. klog.Infoln("TCP onConnectionClose5", ac.BytesSent, ac.BytesReceived, ac)
  723. key := AddrPair{src: ac.Src, dst: ac.Dest}
  724. stats := c.acceptsSuccessful[key]
  725. if stats == nil {
  726. stats = &AcceptStats{}
  727. c.acceptsSuccessful[key] = stats
  728. }
  729. if sent > ac.BytesSent {
  730. stats.BytesSent += sent - ac.BytesSent
  731. }
  732. if received > ac.BytesReceived {
  733. stats.BytesReceived += received - ac.BytesReceived
  734. }
  735. ac.BytesSent = sent
  736. ac.BytesReceived = received
  737. }
  738. func (c *Container) onDNSRequest(r *l7.RequestData) (map[netaddr.IP]string, string, string) {
  739. status := r.Status.DNS()
  740. if status == "" {
  741. return nil, "", ""
  742. }
  743. t, fqdn, ips := l7.ParseDns(r.Payload)
  744. if t == "" {
  745. return nil, "", ""
  746. }
  747. if c.dnsStats.Requests == nil {
  748. dnsReq := L7Requests[l7.ProtocolDNS]
  749. c.dnsStats.Requests = prometheus.NewCounterVec(
  750. prometheus.CounterOpts{Name: dnsReq.Name, Help: dnsReq.Help},
  751. []string{"request_type", "domain", "status"},
  752. )
  753. }
  754. if m, _ := c.dnsStats.Requests.GetMetricWithLabelValues(t, fqdn, status); m != nil {
  755. m.Inc()
  756. }
  757. if r.Duration != 0 {
  758. if c.dnsStats.Latency == nil {
  759. dnsLatency := L7Latency[l7.ProtocolDNS]
  760. c.dnsStats.Latency = prometheus.NewHistogram(prometheus.HistogramOpts{Name: dnsLatency.Name, Help: dnsLatency.Help})
  761. }
  762. c.dnsStats.Latency.Observe(r.Duration.Seconds())
  763. }
  764. ip2fqdn := map[netaddr.IP]string{}
  765. if fqdn != "" {
  766. for _, ip := range ips {
  767. ip2fqdn[ip] = fqdn
  768. }
  769. }
  770. return ip2fqdn, t, fqdn
  771. }
  772. func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  773. c.lock.Lock()
  774. defer c.lock.Unlock()
  775. if r.Protocol == l7.ProtocolDNS {
  776. //return c.onDNSRequest(r)
  777. }
  778. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  779. if conn == nil {
  780. return nil
  781. }
  782. if timestamp != 0 && conn.Timestamp != timestamp {
  783. return nil
  784. }
  785. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  786. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  787. switch r.Protocol {
  788. case l7.ProtocolHTTP:
  789. stats.observe(r.Status.Http(), "", r.Duration)
  790. method, path := l7.ParseHttp(r.Payload)
  791. trace.HttpRequest(method, path, r.Status, r.Duration)
  792. case l7.ProtocolHTTP2:
  793. if conn.http2Parser == nil {
  794. conn.http2Parser = l7.NewHttp2Parser()
  795. }
  796. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  797. for _, req := range requests {
  798. stats.observe(req.Status.Http(), "", req.Duration)
  799. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  800. }
  801. case l7.ProtocolPostgres:
  802. if r.Method != l7.MethodStatementClose {
  803. stats.observe(r.Status.String(), "", r.Duration)
  804. }
  805. if conn.postgresParser == nil {
  806. conn.postgresParser = l7.NewPostgresParser()
  807. }
  808. query := conn.postgresParser.Parse(r.Payload)
  809. trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  810. case l7.ProtocolMysql:
  811. if r.Method != l7.MethodStatementClose {
  812. stats.observe(r.Status.String(), "", r.Duration)
  813. }
  814. if conn.mysqlParser == nil {
  815. conn.mysqlParser = l7.NewMysqlParser()
  816. }
  817. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  818. trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  819. case l7.ProtocolMemcached:
  820. stats.observe(r.Status.String(), "", r.Duration)
  821. cmd, items := l7.ParseMemcached(r.Payload)
  822. trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  823. case l7.ProtocolRedis:
  824. stats.observe(r.Status.String(), "", r.Duration)
  825. cmd, args := l7.ParseRedis(r.Payload)
  826. trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  827. case l7.ProtocolMongo:
  828. stats.observe(r.Status.String(), "", r.Duration)
  829. query := l7.ParseMongo(r.Payload)
  830. trace.MongoQuery(query, r.Status.Error(), r.Duration)
  831. case l7.ProtocolKafka, l7.ProtocolCassandra:
  832. stats.observe(r.Status.String(), "", r.Duration)
  833. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  834. stats.observe(r.Status.String(), r.Method.String(), 0)
  835. case l7.ProtocolDubbo2:
  836. stats.observe(r.Status.String(), "", r.Duration)
  837. }
  838. return nil
  839. }
  840. func (c *Container) onRetransmission(srcDst AddrPair) bool {
  841. c.lock.Lock()
  842. defer c.lock.Unlock()
  843. conn, ok := c.connectionsActive[srcDst]
  844. if !ok {
  845. return false
  846. }
  847. key := AddrPair{src: srcDst.dst, dst: conn.ActualDest}
  848. stats := c.connectsSuccessful[key]
  849. if stats == nil {
  850. stats = &ConnectionStats{}
  851. c.connectsSuccessful[key] = stats
  852. }
  853. stats.Retransmissions++
  854. return true
  855. }
  856. func (c *Container) updateDelays() {
  857. c.delaysLock.Lock()
  858. defer c.delaysLock.Unlock()
  859. for pid := range c.processes {
  860. stats, err := TaskstatsTGID(pid)
  861. if err != nil {
  862. continue
  863. }
  864. d := c.delaysByPid[pid]
  865. c.delays.cpu += stats.CPUDelay - d.cpu
  866. c.delays.disk += stats.BlockIODelay - d.disk
  867. d.cpu = stats.CPUDelay
  868. d.disk = stats.BlockIODelay
  869. c.delaysByPid[pid] = d
  870. }
  871. }
  872. func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
  873. if len(c.mounts) == 0 {
  874. return nil
  875. }
  876. res := map[string]map[string]*proc.FSStat{}
  877. for _, mi := range c.mounts {
  878. var stat *proc.FSStat
  879. for pid := range c.processes {
  880. s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
  881. if err == nil {
  882. stat = &s
  883. break
  884. }
  885. }
  886. if stat == nil {
  887. continue
  888. }
  889. if _, ok := res[mi.MajorMinor]; !ok {
  890. res[mi.MajorMinor] = map[string]*proc.FSStat{}
  891. }
  892. res[mi.MajorMinor][mi.MountPoint] = stat
  893. }
  894. return res
  895. }
  896. func (c *Container) getListens() map[netaddr.IPPort]int {
  897. res := map[netaddr.IPPort]int{}
  898. for addr, byPid := range c.listens {
  899. open := 0
  900. isHostNs := false
  901. ips := map[netaddr.IP]bool{}
  902. for pid, details := range byPid {
  903. p := c.processes[pid]
  904. if p == nil {
  905. continue
  906. }
  907. if p.isHostNs() {
  908. isHostNs = true
  909. }
  910. if details.ClosedAt.IsZero() {
  911. open = 1
  912. }
  913. for _, ip := range details.NsIPs {
  914. ips[ip] = true
  915. }
  916. }
  917. if !addr.IP().IsUnspecified() {
  918. ips = map[netaddr.IP]bool{addr.IP(): true}
  919. }
  920. for ip := range ips {
  921. if ip.IsLoopback() && !isHostNs {
  922. continue
  923. }
  924. res[netaddr.IPPortFrom(ip, addr.Port())] = open
  925. }
  926. }
  927. return res
  928. }
  929. func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {
  930. if len(c.metadata.hostListens) == 0 {
  931. return nil
  932. }
  933. hasUnspecified := false
  934. for _, addrs := range c.metadata.hostListens {
  935. for _, addr := range addrs {
  936. if addr.IP().IsUnspecified() {
  937. hasUnspecified = true
  938. break
  939. }
  940. }
  941. }
  942. var hostIps []netaddr.IP
  943. if hasUnspecified {
  944. if ns, err := proc.GetHostNetNs(); err != nil {
  945. klog.Warningln(err)
  946. } else {
  947. ips, err := proc.GetNsIps(ns)
  948. _ = ns.Close()
  949. if err != nil {
  950. klog.Warningln(err)
  951. } else {
  952. hostIps = ips
  953. }
  954. }
  955. }
  956. res := map[string]map[netaddr.IPPort]struct{}{}
  957. for proxy, addrs := range c.metadata.hostListens {
  958. res[proxy] = map[netaddr.IPPort]struct{}{}
  959. for _, addr := range addrs {
  960. if addr.IP().IsUnspecified() {
  961. for _, ip := range hostIps {
  962. if addr.IP().Is4() && ip.Is4() || addr.IP().Is6() && ip.Is6() {
  963. res[proxy][netaddr.IPPortFrom(ip, addr.Port())] = struct{}{}
  964. }
  965. }
  966. } else {
  967. res[proxy][addr] = struct{}{}
  968. }
  969. }
  970. }
  971. return res
  972. }
  973. func (c *Container) ping() map[netaddr.IP]float64 {
  974. netNs := netns.None()
  975. for pid := range c.processes {
  976. if pid == agentPid {
  977. netNs = selfNetNs
  978. break
  979. }
  980. ns, err := proc.GetNetNs(pid)
  981. if err != nil {
  982. if !common.IsNotExist(err) {
  983. klog.Warningln(err)
  984. }
  985. continue
  986. }
  987. netNs = ns
  988. defer netNs.Close()
  989. break
  990. }
  991. if !netNs.IsOpen() {
  992. return nil
  993. }
  994. ips := map[netaddr.IP]struct{}{}
  995. for d := range c.connectsSuccessful {
  996. ips[d.dst.IP()] = struct{}{}
  997. }
  998. for dst := range c.connectsFailed {
  999. ips[dst.IP()] = struct{}{}
  1000. }
  1001. if len(ips) == 0 {
  1002. return nil
  1003. }
  1004. targets := make([]netaddr.IP, 0, len(ips))
  1005. for ip := range ips {
  1006. if ip.IsLoopback() {
  1007. continue
  1008. }
  1009. if !ip.Is4() { // pinger doesn't support IPv6 yet
  1010. continue
  1011. }
  1012. targets = append(targets, ip)
  1013. }
  1014. rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout)
  1015. if err != nil {
  1016. klog.Warningln(err)
  1017. return nil
  1018. }
  1019. return rtt
  1020. }
  1021. func (c *Container) runLogParser(logPath string) {
  1022. if *flags.DisableLogParsing {
  1023. return
  1024. }
  1025. containerId := string(c.id)
  1026. if logPath != "" {
  1027. if c.logParsers[logPath] != nil {
  1028. return
  1029. }
  1030. ch := make(chan logparser.LogEntry)
  1031. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1032. reader, err := logs.NewTailReader(proc.HostPath(logPath), ch)
  1033. if err != nil {
  1034. klog.Warningln(err)
  1035. parser.Stop()
  1036. return
  1037. }
  1038. klog.Infoln("started varlog logparser", "cg", c.cgroup.Id, "log", logPath)
  1039. c.logParsers[logPath] = &LogParser{parser: parser, stop: reader.Stop}
  1040. return
  1041. }
  1042. switch c.cgroup.ContainerType {
  1043. case cgroup.ContainerTypeSystemdService:
  1044. ch := make(chan logparser.LogEntry)
  1045. if err := JournaldSubscribe(c.cgroup, ch); err != nil {
  1046. klog.Warningln(err)
  1047. return
  1048. }
  1049. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1050. stop := func() {
  1051. JournaldUnsubscribe(c.cgroup)
  1052. }
  1053. klog.Infoln("started journald logparser", "cg", c.cgroup.Id)
  1054. c.logParsers["journald"] = &LogParser{parser: parser, stop: stop}
  1055. case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd, cgroup.ContainerTypeCrio:
  1056. if c.metadata.logPath == "" {
  1057. return
  1058. }
  1059. if parser := c.logParsers["stdout/stderr"]; parser != nil {
  1060. parser.Stop()
  1061. delete(c.logParsers, "stdout/stderr")
  1062. }
  1063. ch := make(chan logparser.LogEntry)
  1064. parser := logparser.NewParser(ch, c.metadata.logDecoder, logs.OtelLogEmitter(containerId))
  1065. reader, err := logs.NewTailReader(proc.HostPath(c.metadata.logPath), ch)
  1066. if err != nil {
  1067. klog.Warningln(err)
  1068. parser.Stop()
  1069. return
  1070. }
  1071. klog.Infoln("started container logparser", "cg", c.cgroup.Id)
  1072. c.logParsers["stdout/stderr"] = &LogParser{parser: parser, stop: reader.Stop}
  1073. }
  1074. }
  1075. func (c *Container) gc(now time.Time) {
  1076. // c.lock.Lock()
  1077. // defer c.lock.Unlock()
  1078. established := map[AddrPair]struct{}{}
  1079. establishedDst := map[netaddr.IPPort]struct{}{}
  1080. listens := map[netaddr.IPPort]string{}
  1081. seenNamespaces := map[string]bool{}
  1082. fdMap := map[uint64]struct{}{}
  1083. for _, p := range c.processes {
  1084. if seenNamespaces[p.NetNsId()] {
  1085. continue
  1086. }
  1087. sockets, err := proc.GetSockets(p.Pid)
  1088. if err != nil {
  1089. continue
  1090. }
  1091. fds, err := proc.ReadFds(p.Pid)
  1092. if err == nil {
  1093. for _, fd := range fds {
  1094. fdMap[fd.Fd] = struct{}{}
  1095. }
  1096. }
  1097. for _, s := range sockets {
  1098. if s.Listen {
  1099. listens[s.SAddr] = s.Inode
  1100. } else {
  1101. established[AddrPair{src: s.SAddr, dst: s.DAddr}] = struct{}{}
  1102. establishedDst[s.DAddr] = struct{}{}
  1103. }
  1104. }
  1105. seenNamespaces[p.NetNsId()] = true
  1106. }
  1107. c.revalidateListens(now, listens)
  1108. for srcDst, conn := range c.connectionsActive {
  1109. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1110. if _, ok := established[srcDst]; !ok {
  1111. delete(c.connectionsActive, srcDst)
  1112. if conn == c.connectionsByPidFd[pidFd] {
  1113. delete(c.connectionsByPidFd, pidFd)
  1114. }
  1115. continue
  1116. }
  1117. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1118. delete(c.connectionsActive, srcDst)
  1119. if conn == c.connectionsByPidFd[pidFd] {
  1120. delete(c.connectionsByPidFd, pidFd)
  1121. }
  1122. }
  1123. }
  1124. for srcDst, conn := range c.acceptsActive {
  1125. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1126. if _, ok := established[srcDst]; !ok {
  1127. delete(c.acceptsActive, srcDst)
  1128. if conn == c.acceptsByPidFd[pidFd] {
  1129. delete(c.acceptsByPidFd, pidFd)
  1130. }
  1131. continue
  1132. }
  1133. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1134. delete(c.acceptsActive, srcDst)
  1135. if conn == c.acceptsByPidFd[pidFd] {
  1136. delete(c.acceptsByPidFd, pidFd)
  1137. }
  1138. }
  1139. }
  1140. for _, conn := range c.connectionsByPidFd {
  1141. if _, ok := fdMap[conn.Fd]; !ok {
  1142. delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1143. }
  1144. }
  1145. for _, conn := range c.acceptsByPidFd {
  1146. if _, ok := fdMap[conn.Fd]; !ok {
  1147. delete(c.acceptsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1148. }
  1149. }
  1150. for dst, at := range c.connectLastAttempt {
  1151. _, active := establishedDst[dst]
  1152. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1153. delete(c.connectLastAttempt, dst)
  1154. delete(c.connectsFailed, dst)
  1155. for d := range c.connectsSuccessful {
  1156. if d.src == dst {
  1157. delete(c.connectsSuccessful, d)
  1158. }
  1159. }
  1160. c.l7Stats.delete(dst)
  1161. }
  1162. }
  1163. for dst, at := range c.acceptLastAttempt {
  1164. _, active := establishedDst[dst]
  1165. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1166. delete(c.acceptLastAttempt, dst)
  1167. for d := range c.acceptsSuccessful {
  1168. if d.src == dst {
  1169. delete(c.acceptsSuccessful, d)
  1170. }
  1171. }
  1172. c.l7Stats.delete(dst)
  1173. }
  1174. }
  1175. }
  1176. func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.IPPort]string) {
  1177. for addr, byPid := range c.listens {
  1178. if _, open := actualListens[addr]; open {
  1179. continue
  1180. }
  1181. klog.Warningln("deleting the outdated listen:", addr)
  1182. for _, details := range byPid {
  1183. if details.ClosedAt.IsZero() {
  1184. details.ClosedAt = now
  1185. }
  1186. }
  1187. }
  1188. missingListens := map[netaddr.IPPort]string{}
  1189. for addr, inode := range actualListens {
  1190. byPids, found := c.listens[addr]
  1191. if !found {
  1192. missingListens[addr] = inode
  1193. continue
  1194. }
  1195. open := false
  1196. for _, details := range byPids {
  1197. if details.ClosedAt.IsZero() {
  1198. open = true
  1199. break
  1200. }
  1201. }
  1202. if !open {
  1203. missingListens[addr] = inode
  1204. }
  1205. }
  1206. if len(missingListens) > 0 {
  1207. inodeToPid := map[string]uint32{}
  1208. for pid := range c.processes {
  1209. fds, err := proc.ReadFds(pid)
  1210. if err != nil {
  1211. klog.Warningln(err)
  1212. continue
  1213. }
  1214. for _, fd := range fds {
  1215. if fd.SocketInode != "" {
  1216. inodeToPid[fd.SocketInode] = pid
  1217. }
  1218. }
  1219. }
  1220. for addr, inode := range missingListens {
  1221. pid, found := inodeToPid[inode]
  1222. if !found {
  1223. continue
  1224. }
  1225. //klog.Warningln("missing listen found:", addr, pid)
  1226. c.onListenOpen(pid, addr, true)
  1227. }
  1228. }
  1229. for addr, pids := range c.listens {
  1230. for pid, details := range pids {
  1231. if !details.ClosedAt.IsZero() && now.Sub(details.ClosedAt) > gcInterval {
  1232. delete(c.listens[addr], pid)
  1233. }
  1234. }
  1235. if len(c.listens[addr]) == 0 {
  1236. delete(c.listens, addr)
  1237. }
  1238. }
  1239. }
  1240. func (c *Container) attachUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1241. klog.Infoln("[attach] attachUprobes start")
  1242. if tracer.DisableL7Tracing() {
  1243. return nil
  1244. }
  1245. codeType := c.GetCodeTypeFromCache(pid)
  1246. if codeType.IsUnknownCode() {
  1247. return nil
  1248. }
  1249. var err error
  1250. switch codeType {
  1251. case CodeTypeJava:
  1252. err = c.attachJVMUprobes(tracer, pid)
  1253. case CodeTypeJavaAot:
  1254. err = c.attachJVMUprobes(tracer, pid)
  1255. case CodeTypeGo:
  1256. err = c.attachTlsUprobes(tracer, pid)
  1257. case CodeTypeNetCoreAot:
  1258. err = c.attachNetCoreUprobes(tracer, pid)
  1259. }
  1260. if err != nil {
  1261. c.errorClose(pid)
  1262. return err
  1263. }
  1264. c.l7AttachSuccess()
  1265. return nil
  1266. }
  1267. func (c *Container) GetCodeTypeFromCache(pid uint32) CodeType {
  1268. p := c.processes[pid]
  1269. if p == nil {
  1270. return CodeTypeUnknown
  1271. }
  1272. if p.codeType.IsWaitCheck() {
  1273. p.codeType = GetExeType(pid)
  1274. }
  1275. return p.codeType
  1276. }
  1277. func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1278. p := c.processes[pid]
  1279. if p == nil {
  1280. return nil
  1281. }
  1282. if !p.openSslUprobesChecked {
  1283. sslProbes, err := tracer.AttachOpenSslUprobes(pid)
  1284. if err != nil {
  1285. return err
  1286. }
  1287. p.uprobes = append(p.uprobes, sslProbes...)
  1288. p.openSslUprobesChecked = true
  1289. }
  1290. if !p.goTlsUprobesChecked {
  1291. codeType := c.GetCodeTypeFromCache(pid)
  1292. goProbes, err := tracer.AttachGoTlsUprobes(pid, &c.AppInfo, uint16(codeType))
  1293. if err != nil {
  1294. return err
  1295. }
  1296. p.uprobes = append(p.uprobes, goProbes...)
  1297. p.goTlsUprobesChecked = true
  1298. }
  1299. return nil
  1300. }
  1301. func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1302. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1303. return nil
  1304. }
  1305. p := c.processes[pid]
  1306. if p == nil {
  1307. return nil
  1308. }
  1309. codeType := c.GetCodeTypeFromCache(pid)
  1310. if !p.jvmUprobesChecked {
  1311. tracer.InitKProcInfo(pid, &c.AppInfo)
  1312. libNioProbes, err := tracer.AttachJavaNioReadUprobes(pid, codeType)
  1313. if err != nil {
  1314. klog.Error(err)
  1315. return err
  1316. }
  1317. p.uprobes = append(p.uprobes, libNioProbes...)
  1318. libNetProbes, err := tracer.AttachJavaNetWriteUprobes(pid)
  1319. if err != nil {
  1320. klog.Error(err)
  1321. return err
  1322. }
  1323. p.uprobes = append(p.uprobes, libNetProbes...)
  1324. p.jvmUprobesChecked = true
  1325. } else {
  1326. klog.Infof("[attach] %s-%d already attach", codeType.String(), pid)
  1327. }
  1328. return nil
  1329. }
  1330. func (c *Container) errorClose(pid uint32) {
  1331. p := c.processes[pid]
  1332. if p != nil {
  1333. p.DynamicClose()
  1334. }
  1335. }
  1336. func (c *Container) attachNetCoreUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1337. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1338. return nil
  1339. }
  1340. p := c.processes[pid]
  1341. if p == nil {
  1342. return nil
  1343. }
  1344. if !p.jvmUprobesChecked {
  1345. //codeType := c.GetCodeTypeFromCache(pid)
  1346. tracer.InitKProcInfo(pid, &c.AppInfo)
  1347. p.uprobes = append(p.uprobes, tracer.AttachNetCoreNetThreadUprobes(pid)...)
  1348. readProbes, err := tracer.AttachNetCoreNetReadUprobes(pid)
  1349. if err != nil {
  1350. klog.Error(err)
  1351. return err
  1352. }
  1353. p.uprobes = append(p.uprobes, readProbes...)
  1354. WriteProbes, err := tracer.AttachNetCoreNetWriteUprobes(pid)
  1355. if err != nil {
  1356. klog.Error(err)
  1357. return err
  1358. }
  1359. p.uprobes = append(p.uprobes, WriteProbes...)
  1360. p.jvmUprobesChecked = true
  1361. }
  1362. return nil
  1363. }
  1364. func resolveFd(pid uint32, fd uint64) (mntId string, logPath string) {
  1365. info := proc.GetFdInfo(pid, fd)
  1366. if info == nil {
  1367. return
  1368. }
  1369. switch {
  1370. case info.Flags&os.O_WRONLY == 0 && info.Flags&os.O_RDWR == 0,
  1371. !strings.HasPrefix(info.Dest, "/"),
  1372. strings.HasPrefix(info.Dest, "/proc/"),
  1373. strings.HasPrefix(info.Dest, "/dev/"),
  1374. strings.HasPrefix(info.Dest, "/sys/"),
  1375. strings.HasSuffix(info.Dest, "(deleted)"):
  1376. return
  1377. }
  1378. mntId = info.MntId
  1379. if info.Flags&os.O_WRONLY != 0 && strings.HasPrefix(info.Dest, "/var/log/") &&
  1380. !strings.HasPrefix(info.Dest, "/var/log/pods/") &&
  1381. !strings.HasPrefix(info.Dest, "/var/log/containers/") &&
  1382. !strings.HasPrefix(info.Dest, "/var/log/journal/") {
  1383. logPath = info.Dest
  1384. }
  1385. return
  1386. }
  1387. func counter(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1388. return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labelValues...)
  1389. }
  1390. func gauge(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1391. return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...)
  1392. }