container.go 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588
  1. package containers
  2. import (
  3. debugelf "debug/elf"
  4. "fmt"
  5. "github.com/coroot/coroot-node-agent/utils"
  6. "os"
  7. "sort"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. . "github.com/coroot/coroot-node-agent/utils/modelse"
  13. "github.com/coroot/coroot-node-agent/cgroup"
  14. "github.com/coroot/coroot-node-agent/common"
  15. "github.com/coroot/coroot-node-agent/ebpftracer"
  16. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  18. "github.com/coroot/coroot-node-agent/flags"
  19. "github.com/coroot/coroot-node-agent/logs"
  20. "github.com/coroot/coroot-node-agent/node"
  21. "github.com/coroot/coroot-node-agent/pinger"
  22. "github.com/coroot/coroot-node-agent/proc"
  23. "github.com/coroot/coroot-node-agent/tracing"
  24. "github.com/coroot/logparser"
  25. "github.com/prometheus/client_golang/prometheus"
  26. klog "github.com/sirupsen/logrus"
  27. "github.com/vishvananda/netns"
  28. "golang.org/x/exp/maps"
  29. "inet.af/netaddr"
  30. )
  31. var (
  32. gcInterval = 10 * time.Second
  33. pingTimeout = 300 * time.Millisecond
  34. )
  35. type ContainerID string
  36. type ContainerNetwork struct {
  37. NetworkID string
  38. }
  39. type ContainerMetadata struct {
  40. name string
  41. labels map[string]string
  42. volumes map[string]string
  43. logPath string
  44. image string
  45. logDecoder logparser.Decoder
  46. hostListens map[string][]netaddr.IPPort
  47. networks map[string]ContainerNetwork
  48. env map[string]string
  49. systemdTriggeredBy string
  50. rootfs string
  51. }
  52. type Delays struct {
  53. cpu time.Duration
  54. disk time.Duration
  55. }
  56. type LogParser struct {
  57. parser *logparser.Parser
  58. stop func()
  59. }
  60. func (p *LogParser) Stop() {
  61. if p.stop != nil {
  62. p.stop()
  63. }
  64. p.parser.Stop()
  65. }
  66. type AddrPair struct {
  67. src netaddr.IPPort
  68. dst netaddr.IPPort
  69. }
  70. type ActiveConnection struct {
  71. Dest netaddr.IPPort
  72. Src netaddr.IPPort
  73. ActualDest netaddr.IPPort
  74. Pid uint32
  75. Fd uint64
  76. Timestamp uint64
  77. Closed time.Time
  78. BytesSent uint64
  79. BytesReceived uint64
  80. http2Parser *l7.Http2Parser
  81. postgresParser *l7.PostgresParser
  82. mysqlParser *l7.MysqlParser
  83. dmParser *l7.DmParser
  84. }
  85. type ActiveAccept struct {
  86. Dest netaddr.IPPort
  87. Src netaddr.IPPort
  88. Pid uint32
  89. Fd uint64
  90. Timestamp uint64
  91. Closed time.Time
  92. BytesSent uint64
  93. BytesReceived uint64
  94. }
  95. type ListenDetails struct {
  96. ClosedAt time.Time
  97. NsIPs []netaddr.IP
  98. }
  99. type PidFd struct {
  100. Pid uint32
  101. Fd uint64
  102. }
  103. type K8sContainer struct {
  104. ns string
  105. podName string
  106. podId string
  107. workload string
  108. containerName string
  109. pid string
  110. }
  111. type ConnectionStats struct {
  112. Count uint64
  113. TotalTime time.Duration
  114. Retransmissions uint64
  115. BytesSent uint64
  116. PerBytesSent uint64
  117. BytesReceived uint64
  118. PerBytesReceived uint64
  119. Src netaddr.IPPort
  120. ConEstTime time.Duration
  121. FirstReadTime uint64
  122. FirstWriteTime uint64
  123. NewReadTime uint64
  124. }
  125. type AcceptStats struct {
  126. BytesSent uint64
  127. BytesReceived uint64
  128. }
  129. type Container struct {
  130. id ContainerID
  131. cgroup *cgroup.Cgroup
  132. metadata *ContainerMetadata
  133. K8sContainer
  134. processes map[uint32]*Process
  135. startedAt time.Time
  136. zombieAt time.Time
  137. restarts int
  138. delays Delays
  139. delaysByPid map[uint32]Delays
  140. delaysLock sync.Mutex
  141. listens map[netaddr.IPPort]map[uint32]*ListenDetails
  142. connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count
  143. connectsFailed map[netaddr.IPPort]int64 // dst -> count
  144. connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  145. connectionsActive map[AddrPair]*ActiveConnection
  146. connectionsByPidFd map[PidFd]*ActiveConnection
  147. acceptsSuccessful map[AddrPair]*AcceptStats
  148. acceptLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  149. acceptsActive map[AddrPair]*ActiveAccept
  150. acceptsByPidFd map[PidFd]*ActiveAccept
  151. l7Stats L7Stats
  152. dnsStats *L7Metrics
  153. oomKills int
  154. pythonThreadLockWaitTime time.Duration
  155. mounts map[string]proc.MountInfo
  156. logParsers map[string]*LogParser
  157. hostConntrack *Conntrack
  158. nsConntrack *Conntrack
  159. lbConntracks []*Conntrack
  160. registry *Registry
  161. lock sync.RWMutex
  162. done chan struct{}
  163. traceMap map[uint64]*tracing.Trace
  164. //instanceID utils.ID
  165. Symbols []debugelf.Symbol
  166. Uprobes []tracer.Uprobe
  167. UprobesMap map[string]tracer.Uprobe
  168. l7EventReady bool
  169. l7Attach bool
  170. // 白名单详情
  171. WhiteSettingInfo WhiteSettingInfo
  172. // 应用详情
  173. AppInfo AppInfo
  174. }
  175. func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) {
  176. netNs, err := proc.GetNetNs(pid)
  177. if err != nil {
  178. return nil, err
  179. }
  180. defer netNs.Close()
  181. c := &Container{
  182. id: id,
  183. cgroup: cg,
  184. metadata: md,
  185. processes: map[uint32]*Process{},
  186. delaysByPid: map[uint32]Delays{},
  187. listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
  188. connectsSuccessful: map[AddrPair]*ConnectionStats{},
  189. connectsFailed: map[netaddr.IPPort]int64{},
  190. connectLastAttempt: map[netaddr.IPPort]time.Time{},
  191. connectionsActive: map[AddrPair]*ActiveConnection{},
  192. connectionsByPidFd: map[PidFd]*ActiveConnection{},
  193. acceptsSuccessful: map[AddrPair]*AcceptStats{},
  194. acceptLastAttempt: map[netaddr.IPPort]time.Time{},
  195. acceptsActive: map[AddrPair]*ActiveAccept{},
  196. acceptsByPidFd: map[PidFd]*ActiveAccept{},
  197. l7Stats: L7Stats{},
  198. dnsStats: &L7Metrics{},
  199. mounts: map[string]proc.MountInfo{},
  200. logParsers: map[string]*LogParser{},
  201. hostConntrack: hostConntrack,
  202. done: make(chan struct{}),
  203. traceMap: make(map[uint64]*tracing.Trace),
  204. registry: registry,
  205. }
  206. for _, n := range md.networks {
  207. if nsHandle := FindNetworkLoadBalancerNs(n.NetworkID); nsHandle.IsOpen() {
  208. if ct, err := NewConntrack(nsHandle); err != nil {
  209. klog.Warningln(err)
  210. } else {
  211. c.lbConntracks = append(c.lbConntracks, ct)
  212. }
  213. _ = nsHandle.Close()
  214. }
  215. }
  216. c.runLogParser("")
  217. // go func() {
  218. // ticker := time.NewTicker(gcInterval)
  219. // defer ticker.Stop()
  220. // for {
  221. // select {
  222. // case <-c.done:
  223. // return
  224. // case t := <-ticker.C:
  225. // c.gc(t)
  226. // }
  227. // }
  228. // }()
  229. return c, nil
  230. }
  231. func (c *Container) Close() {
  232. for _, p := range c.logParsers {
  233. p.Stop()
  234. }
  235. for _, ct := range c.lbConntracks {
  236. _ = ct.Close()
  237. }
  238. if c.nsConntrack != nil {
  239. _ = c.nsConntrack.Close()
  240. }
  241. close(c.done)
  242. }
  243. func (c *Container) Dead(now time.Time) bool {
  244. return !c.zombieAt.IsZero() && now.Sub(c.zombieAt) > gcInterval
  245. }
  246. func (c *Container) Describe(ch chan<- *prometheus.Desc) {
  247. // some fixed metric description is required here to register/unregister the collector correctly
  248. ch <- prometheus.NewDesc("container", "", nil, nil)
  249. }
  250. func (c *Container) Collect(ch chan<- prometheus.Metric) {
  251. c.registry.updateTrafficStatsIfNecessary()
  252. c.lock.RLock()
  253. defer c.lock.RUnlock()
  254. if c.metadata.image != "" || c.metadata.systemdTriggeredBy != "" {
  255. ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image, c.metadata.systemdTriggeredBy)
  256. }
  257. ch <- counter(metrics.Restarts, float64(c.restarts))
  258. if cpu, err := c.cgroup.CpuStat(); err == nil {
  259. if cpu.LimitCores > 0 {
  260. ch <- gauge(metrics.CPULimit, cpu.LimitCores)
  261. }
  262. ch <- counter(metrics.CPUUsage, cpu.UsageSeconds)
  263. ch <- counter(metrics.ThrottledTime, cpu.ThrottledTimeSeconds)
  264. }
  265. if taskstatsClient != nil {
  266. c.updateDelays()
  267. ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second))
  268. ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second))
  269. }
  270. if s, err := c.cgroup.MemoryStat(); err == nil {
  271. ch <- gauge(metrics.MemoryRss, float64(s.RSS))
  272. ch <- gauge(metrics.MemoryCache, float64(s.Cache))
  273. if s.Limit > 0 {
  274. ch <- gauge(metrics.MemoryLimit, float64(s.Limit))
  275. }
  276. }
  277. if c.oomKills > 0 {
  278. ch <- counter(metrics.OOMKills, float64(c.oomKills))
  279. }
  280. if disks, err := node.GetDisks(); err == nil {
  281. ioStat, _ := c.cgroup.IOStat()
  282. for majorMinor, mounts := range c.getMounts() {
  283. dev := disks.GetParentBlockDevice(majorMinor)
  284. if dev == nil {
  285. continue
  286. }
  287. for mountPoint, fsStat := range mounts {
  288. dls := []string{mountPoint, dev.Name, c.metadata.volumes[mountPoint]}
  289. ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...)
  290. ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...)
  291. ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...)
  292. if io, ok := ioStat[majorMinor]; ok {
  293. ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...)
  294. ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...)
  295. ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...)
  296. ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...)
  297. }
  298. }
  299. }
  300. }
  301. for addr, open := range c.getListens() {
  302. ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "")
  303. }
  304. for proxy, addrs := range c.getProxiedListens() {
  305. for addr := range addrs {
  306. ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy)
  307. }
  308. }
  309. type AppInfo struct {
  310. AppName string `json:"app_name"`
  311. AppIdHash INT_HASH_ID `json:"app_id_hash"`
  312. InstanceIdHash INT_HASH_ID `json:"instance_id_hash"`
  313. AgentId int64 `json:"agent_id"`
  314. Sn string `json:"sn"`
  315. Sport int `json:"sport"`
  316. ServiceName string `json:"service_name"`
  317. CodeType CodeType `json:"code_type"`
  318. EBPFProcInfo *EbpfProcInfo `json:"ebpf_proc_info"`
  319. RegisterAt int64 `json:"register_at"`
  320. UpdateAt int64 `json:"update_at"`
  321. Status APP_TYPE `json:"status"`
  322. }
  323. type INT_HASH_ID struct {
  324. IntVal int64
  325. HashtVal HashByte
  326. }
  327. strInstanceID := strconv.FormatInt(c.AppInfo.InstanceIdHash.IntVal, 10)
  328. strAppId := strconv.FormatInt(c.AppInfo.AppIdHash.IntVal, 10)
  329. strAppName := c.AppInfo.AppName
  330. for d, stats := range c.connectsSuccessful {
  331. ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), strInstanceID, strAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  332. ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), strInstanceID, strAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  333. if stats.Retransmissions > 0 {
  334. ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), strInstanceID, strAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  335. }
  336. ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  337. ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  338. ch <- counter(metrics.NetBytesSentPer, float64(stats.PerBytesSent), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  339. ch <- counter(metrics.NetBytesReceivedPer, float64(stats.PerBytesReceived), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  340. ch <- counter(metrics.NetDataLatency, float64(stats.FirstReadTime-stats.FirstWriteTime), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  341. ch <- counter(metrics.NetDataDuration, float64(stats.NewReadTime-stats.FirstWriteTime), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  342. ch <- counter(metrics.NetEstTime, float64(stats.ConEstTime), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  343. klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime)
  344. stats.PerBytesReceived = 0
  345. stats.PerBytesSent = 0
  346. }
  347. // for d, stats := range c.acceptsSuccessful {
  348. // ch <- counter(metrics.NetAcceptsSuccessful, float64(0), d.src.String(), d.dst.String())
  349. // ch <- counter(metrics.NetAcceptBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String())
  350. // ch <- counter(metrics.NetAcceptBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String())
  351. // klog.Infof("c.acceptsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d", d.src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived)
  352. // }
  353. for dst, count := range c.connectsFailed {
  354. ch <- counter(metrics.NetConnectionsFailed, float64(count), strInstanceID, strAppId, strAppName, dst.String())
  355. }
  356. connections := map[AddrPair]int{}
  357. for addrPair, conn := range c.connectionsActive {
  358. if !conn.Closed.IsZero() {
  359. continue
  360. }
  361. connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest}]++
  362. }
  363. for d, count := range connections {
  364. ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
  365. }
  366. for source, p := range c.logParsers {
  367. for _, c := range p.parser.GetCounters() {
  368. ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
  369. }
  370. }
  371. appTypes := map[string]struct{}{}
  372. seenJvms := map[string]bool{}
  373. seenDotNetApps := map[string]bool{}
  374. pids := maps.Keys(c.processes)
  375. sort.Slice(pids, func(i, j int) bool {
  376. return pids[i] < pids[j]
  377. })
  378. for _, pid := range pids {
  379. process := c.processes[pid]
  380. cmdline := proc.GetCmdline(pid)
  381. if len(cmdline) == 0 {
  382. continue
  383. }
  384. appType := guessApplicationType(cmdline)
  385. if appType != "" {
  386. appTypes[appType] = struct{}{}
  387. }
  388. if process.isGolangApp {
  389. appTypes["golang"] = struct{}{}
  390. }
  391. switch {
  392. case isJvm(cmdline):
  393. jvm, jMetrics := jvmMetrics(pid)
  394. if len(jMetrics) > 0 && !seenJvms[jvm] {
  395. seenJvms[jvm] = true
  396. for _, m := range jMetrics {
  397. ch <- m
  398. }
  399. }
  400. case process.dotNetMonitor != nil:
  401. appTypes["dotnet"] = struct{}{}
  402. appName := process.dotNetMonitor.AppName()
  403. if !seenDotNetApps[appName] {
  404. seenDotNetApps[appName] = true
  405. process.dotNetMonitor.Collect(ch)
  406. }
  407. }
  408. }
  409. for appType := range appTypes {
  410. ch <- gauge(metrics.ApplicationType, 1, appType)
  411. }
  412. if c.pythonThreadLockWaitTime > 0 {
  413. ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds())
  414. }
  415. if c.dnsStats.Requests != nil {
  416. c.dnsStats.Requests.Collect(ch)
  417. }
  418. if c.dnsStats.Latency != nil {
  419. c.dnsStats.Latency.Collect(ch)
  420. }
  421. c.l7Stats.collect(ch)
  422. if !*flags.DisablePinger {
  423. for ip, rtt := range c.ping() {
  424. ch <- gauge(metrics.NetLatency, rtt, ip.String())
  425. }
  426. }
  427. c.gc(time.Now())
  428. }
  429. func (c *Container) onProcessStart(pid uint32) *Process {
  430. c.lock.Lock()
  431. defer c.lock.Unlock()
  432. stats, err := TaskstatsPID(pid)
  433. if err != nil {
  434. klog.WithError(err).Errorf("Failed onProcessStart [%d]", pid)
  435. return nil
  436. }
  437. c.zombieAt = time.Time{}
  438. p := NewProcess(pid, stats, c.registry.tracer)
  439. if p == nil {
  440. return nil
  441. }
  442. c.processes[pid] = p
  443. if c.startedAt.IsZero() {
  444. c.startedAt = stats.BeginTime
  445. } else {
  446. min := stats.BeginTime
  447. for _, p := range c.processes {
  448. if p.StartedAt.Before(min) {
  449. min = p.StartedAt
  450. }
  451. }
  452. if min.After(c.startedAt) {
  453. c.restarts++
  454. c.startedAt = min
  455. }
  456. }
  457. return p
  458. }
  459. func (c *Container) onProcessExit(pid uint32, oomKill bool) {
  460. c.lock.Lock()
  461. defer c.lock.Unlock()
  462. if p := c.processes[pid]; p != nil {
  463. p.Close()
  464. }
  465. delete(c.processes, pid)
  466. if len(c.processes) == 0 {
  467. c.zombieAt = time.Now()
  468. }
  469. delete(c.delaysByPid, pid)
  470. if oomKill {
  471. c.oomKills++
  472. }
  473. }
  474. func (c *Container) onFileOpen(pid uint32, fd uint64) {
  475. mntId, logPath := resolveFd(pid, fd)
  476. func() {
  477. if mntId == "" {
  478. return
  479. }
  480. c.lock.Lock()
  481. _, ok := c.mounts[mntId]
  482. c.lock.Unlock()
  483. if ok {
  484. return
  485. }
  486. byMountId := proc.GetMountInfo(pid)
  487. if byMountId == nil {
  488. return
  489. }
  490. if mi, ok := byMountId[mntId]; ok {
  491. c.lock.Lock()
  492. c.mounts[mntId] = mi
  493. c.lock.Unlock()
  494. }
  495. }()
  496. if logPath != "" {
  497. c.lock.Lock()
  498. c.runLogParser(logPath)
  499. c.lock.Unlock()
  500. }
  501. }
  502. // set
  503. func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
  504. klog.Infof("TCP listen open pid=%d id=%s addr=%s", pid, c.id, addr)
  505. if common.PortFilter.ShouldBeSkipped(addr.Port()) {
  506. return
  507. }
  508. if !safe {
  509. c.lock.Lock()
  510. defer c.lock.Unlock()
  511. }
  512. if _, ok := c.listens[addr]; !ok {
  513. c.listens[addr] = map[uint32]*ListenDetails{}
  514. }
  515. details := &ListenDetails{}
  516. c.listens[addr][pid] = details
  517. if addr.IP().IsUnspecified() {
  518. ns, err := proc.GetNetNs(pid)
  519. if err != nil {
  520. if !common.IsNotExist(err) {
  521. klog.Warningln(err)
  522. }
  523. return
  524. }
  525. defer ns.Close()
  526. ips, err := proc.GetNsIps(ns)
  527. if err != nil {
  528. klog.Warningln(err)
  529. return
  530. }
  531. klog.Infof("got IPs %s for %s", ips, ns.UniqueId())
  532. details.NsIPs = ips
  533. }
  534. }
  535. func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
  536. klog.Infof("TCP listen close pid=%d id=%s addr=%s", pid, c.id, addr)
  537. c.lock.Lock()
  538. defer c.lock.Unlock()
  539. if _, byAddr := c.listens[addr]; byAddr {
  540. if _, byPid := c.listens[addr][pid]; byPid {
  541. if details := c.listens[addr][pid]; details != nil {
  542. details.ClosedAt = time.Now()
  543. }
  544. }
  545. }
  546. }
  547. func (c *Container) onAcceptOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  548. klog.Infof("accept pid=%d id=%s dstaddr=%s srcaddr=%s", pid, c.id, dst.IP(), src.IP())
  549. // if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  550. // return
  551. // }
  552. p := c.processes[pid]
  553. if p == nil {
  554. return
  555. }
  556. if dst.IP().IsLoopback() && !p.isHostNs() {
  557. return
  558. }
  559. // actualDst, err := c.getActualDestination(p, src, dst)
  560. // if err != nil {
  561. // if !common.IsNotExist(err) {
  562. // klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  563. // }
  564. // return
  565. // }
  566. // switch {
  567. // case actualDst == nil:
  568. // actualDst = &dst
  569. // case actualDst.IP().IsLoopback() && !p.isHostNs():
  570. // return
  571. // }
  572. // if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  573. // return
  574. // }
  575. c.lock.Lock()
  576. defer c.lock.Unlock()
  577. if !failed {
  578. key := AddrPair{src: dst, dst: src}
  579. stats := c.acceptsSuccessful[key]
  580. if stats == nil {
  581. stats = &AcceptStats{}
  582. c.acceptsSuccessful[key] = stats
  583. }
  584. acceptCon := &ActiveAccept{
  585. Dest: src,
  586. Src: dst,
  587. Pid: pid,
  588. Fd: fd,
  589. Timestamp: timestamp,
  590. }
  591. c.acceptsActive[AddrPair{src: dst, dst: src}] = acceptCon
  592. c.acceptsByPidFd[PidFd{Pid: pid, Fd: fd}] = acceptCon
  593. }
  594. c.acceptLastAttempt[dst] = time.Now()
  595. }
  596. func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  597. if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  598. return
  599. }
  600. p := c.processes[pid]
  601. if p == nil {
  602. return
  603. }
  604. if dst.IP().IsLoopback() && !p.isHostNs() {
  605. return
  606. }
  607. actualDst, err := c.getActualDestination(p, src, dst)
  608. if err != nil {
  609. if !common.IsNotExist(err) {
  610. klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  611. }
  612. return
  613. }
  614. switch {
  615. case actualDst == nil:
  616. actualDst = &dst
  617. case actualDst.IP().IsLoopback() && !p.isHostNs():
  618. return
  619. }
  620. if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  621. return
  622. }
  623. c.lock.Lock()
  624. defer c.lock.Unlock()
  625. if failed {
  626. c.connectsFailed[dst]++
  627. } else {
  628. key := AddrPair{src: dst, dst: *actualDst}
  629. stats := c.connectsSuccessful[key]
  630. if stats == nil {
  631. stats = &ConnectionStats{}
  632. c.connectsSuccessful[key] = stats
  633. }
  634. stats.Count++
  635. stats.TotalTime += duration
  636. stats.Src = src
  637. stats.ConEstTime = duration
  638. connection := &ActiveConnection{
  639. Dest: dst,
  640. Src: src,
  641. ActualDest: *actualDst,
  642. Pid: pid,
  643. Fd: fd,
  644. Timestamp: timestamp,
  645. }
  646. c.connectionsActive[AddrPair{src: src, dst: dst}] = connection
  647. c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection
  648. }
  649. c.connectLastAttempt[dst] = time.Now()
  650. }
  651. func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) {
  652. if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil {
  653. return actualDst, nil
  654. }
  655. for _, lb := range c.lbConntracks {
  656. if actualDst := lb.GetActualDestination(src, dst); actualDst != nil {
  657. return actualDst, nil
  658. }
  659. }
  660. actualDst := c.hostConntrack.GetActualDestination(src, dst)
  661. if actualDst != nil {
  662. return actualDst, nil
  663. }
  664. if !p.isHostNs() {
  665. if c.nsConntrack == nil {
  666. netNs, err := proc.GetNetNs(p.Pid)
  667. if err != nil {
  668. return nil, err
  669. }
  670. defer netNs.Close()
  671. c.nsConntrack, err = NewConntrack(netNs)
  672. if err != nil {
  673. return nil, err
  674. }
  675. }
  676. return c.nsConntrack.GetActualDestination(src, dst), nil
  677. }
  678. return nil, nil
  679. }
  680. func (c *Container) onConnectionClose(e ebpftracer.Event) {
  681. c.lock.Lock()
  682. conn := c.connectionsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  683. c.lock.Unlock()
  684. if conn != nil {
  685. if conn.Closed.IsZero() {
  686. if e.TrafficStats != nil {
  687. c.lock.Lock()
  688. c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived, e.FirstReadTime, e.FirstWriteTime, e.NewReadTime)
  689. c.lock.Unlock()
  690. }
  691. conn.Closed = time.Now()
  692. }
  693. }
  694. }
  695. func (c *Container) onAcceptClose(e ebpftracer.Event) {
  696. c.lock.Lock()
  697. conn := c.acceptsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  698. c.lock.Unlock()
  699. if conn != nil {
  700. if conn.Closed.IsZero() {
  701. if e.TrafficStats != nil {
  702. c.lock.Lock()
  703. c.updateAcceptTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived)
  704. c.lock.Unlock()
  705. }
  706. conn.Closed = time.Now()
  707. }
  708. }
  709. }
  710. func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) {
  711. if u == nil {
  712. return
  713. }
  714. c.lock.Lock()
  715. defer c.lock.Unlock()
  716. c.updateConnectionTrafficStats(c.connectionsByPidFd[PidFd{Pid: u.Pid, Fd: u.FD}], u.BytesSent, u.BytesReceived, 0, 0, 0)
  717. }
  718. func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, received, firstreadtime, firstwritetime, newreadtime uint64) {
  719. if ac == nil {
  720. return
  721. }
  722. key := AddrPair{src: ac.Dest, dst: ac.ActualDest}
  723. stats := c.connectsSuccessful[key]
  724. if stats == nil {
  725. stats = &ConnectionStats{}
  726. c.connectsSuccessful[key] = stats
  727. }
  728. if sent > ac.BytesSent {
  729. stats.BytesSent += sent - ac.BytesSent
  730. stats.PerBytesSent = sent - ac.BytesSent
  731. }
  732. if received > ac.BytesReceived {
  733. stats.BytesReceived += received - ac.BytesReceived
  734. stats.PerBytesReceived = received - ac.BytesReceived
  735. }
  736. if firstreadtime != 0 && firstwritetime != 0 && newreadtime != 0 {
  737. stats.FirstReadTime = firstreadtime
  738. stats.FirstWriteTime = firstwritetime
  739. stats.NewReadTime = newreadtime
  740. }
  741. ac.BytesSent = sent
  742. ac.BytesReceived = received
  743. }
  744. func (c *Container) updateAcceptTrafficStats(ac *ActiveAccept, sent, received uint64) {
  745. if ac == nil {
  746. return
  747. }
  748. klog.Infoln("TCP onConnectionClose5", ac.BytesSent, ac.BytesReceived, ac)
  749. key := AddrPair{src: ac.Src, dst: ac.Dest}
  750. stats := c.acceptsSuccessful[key]
  751. if stats == nil {
  752. stats = &AcceptStats{}
  753. c.acceptsSuccessful[key] = stats
  754. }
  755. if sent > ac.BytesSent {
  756. stats.BytesSent += sent - ac.BytesSent
  757. }
  758. if received > ac.BytesReceived {
  759. stats.BytesReceived += received - ac.BytesReceived
  760. }
  761. ac.BytesSent = sent
  762. ac.BytesReceived = received
  763. }
  764. func (c *Container) onDNSRequest(r *l7.RequestData) (map[netaddr.IP]string, string, string) {
  765. status := r.Status.DNS()
  766. if status == "" {
  767. return nil, "", ""
  768. }
  769. t, fqdn, ips := l7.ParseDns(r.Payload)
  770. if t == "" {
  771. return nil, "", ""
  772. }
  773. if c.dnsStats.Requests == nil {
  774. dnsReq := L7Requests[l7.ProtocolDNS]
  775. c.dnsStats.Requests = prometheus.NewCounterVec(
  776. prometheus.CounterOpts{Name: dnsReq.Name, Help: dnsReq.Help},
  777. []string{"request_type", "domain", "status"},
  778. )
  779. }
  780. if m, _ := c.dnsStats.Requests.GetMetricWithLabelValues(t, fqdn, status); m != nil {
  781. m.Inc()
  782. }
  783. if r.Duration != 0 {
  784. if c.dnsStats.Latency == nil {
  785. dnsLatency := L7Latency[l7.ProtocolDNS]
  786. c.dnsStats.Latency = prometheus.NewHistogram(prometheus.HistogramOpts{Name: dnsLatency.Name, Help: dnsLatency.Help})
  787. }
  788. c.dnsStats.Latency.Observe(r.Duration.Seconds())
  789. }
  790. ip2fqdn := map[netaddr.IP]string{}
  791. if fqdn != "" {
  792. for _, ip := range ips {
  793. ip2fqdn[ip] = fqdn
  794. }
  795. }
  796. return ip2fqdn, t, fqdn
  797. }
  798. func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  799. c.lock.Lock()
  800. defer c.lock.Unlock()
  801. if r.Protocol == l7.ProtocolDNS {
  802. //return c.onDNSRequest(r)
  803. }
  804. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  805. if conn == nil {
  806. return nil
  807. }
  808. if timestamp != 0 && conn.Timestamp != timestamp {
  809. return nil
  810. }
  811. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  812. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  813. switch r.Protocol {
  814. case l7.ProtocolHTTP:
  815. stats.observe(r.Status.Http(), "", r.Duration)
  816. method, path := l7.ParseHttp(r.Payload)
  817. trace.HttpRequest(method, path, r.Status, r.Duration)
  818. case l7.ProtocolHTTP2:
  819. if conn.http2Parser == nil {
  820. conn.http2Parser = l7.NewHttp2Parser()
  821. }
  822. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  823. for _, req := range requests {
  824. stats.observe(req.Status.Http(), "", req.Duration)
  825. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  826. }
  827. case l7.ProtocolPostgres:
  828. if r.Method != l7.MethodStatementClose {
  829. stats.observe(r.Status.String(), "", r.Duration)
  830. }
  831. if conn.postgresParser == nil {
  832. conn.postgresParser = l7.NewPostgresParser()
  833. }
  834. query := conn.postgresParser.Parse(r.Payload)
  835. trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  836. case l7.ProtocolMysql:
  837. if r.Method != l7.MethodStatementClose {
  838. stats.observe(r.Status.String(), "", r.Duration)
  839. }
  840. if conn.mysqlParser == nil {
  841. conn.mysqlParser = l7.NewMysqlParser()
  842. }
  843. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  844. trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  845. case l7.ProtocolMemcached:
  846. stats.observe(r.Status.String(), "", r.Duration)
  847. cmd, items := l7.ParseMemcached(r.Payload)
  848. trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  849. case l7.ProtocolRedis:
  850. stats.observe(r.Status.String(), "", r.Duration)
  851. cmd, args := l7.ParseRedis(r.Payload)
  852. trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  853. case l7.ProtocolMongo:
  854. stats.observe(r.Status.String(), "", r.Duration)
  855. query := l7.ParseMongo(r.Payload)
  856. trace.MongoQuery(query, r.Status.Error(), r.Duration)
  857. case l7.ProtocolKafka, l7.ProtocolCassandra:
  858. stats.observe(r.Status.String(), "", r.Duration)
  859. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  860. stats.observe(r.Status.String(), r.Method.String(), 0)
  861. case l7.ProtocolDubbo2:
  862. stats.observe(r.Status.String(), "", r.Duration)
  863. }
  864. return nil
  865. }
  866. func (c *Container) onRetransmission(srcDst AddrPair) bool {
  867. c.lock.Lock()
  868. defer c.lock.Unlock()
  869. conn, ok := c.connectionsActive[srcDst]
  870. if !ok {
  871. return false
  872. }
  873. key := AddrPair{src: srcDst.dst, dst: conn.ActualDest}
  874. stats := c.connectsSuccessful[key]
  875. if stats == nil {
  876. stats = &ConnectionStats{}
  877. c.connectsSuccessful[key] = stats
  878. }
  879. stats.Retransmissions++
  880. return true
  881. }
  882. func (c *Container) updateDelays() {
  883. c.delaysLock.Lock()
  884. defer c.delaysLock.Unlock()
  885. for pid := range c.processes {
  886. stats, err := TaskstatsTGID(pid)
  887. if err != nil {
  888. continue
  889. }
  890. d := c.delaysByPid[pid]
  891. c.delays.cpu += stats.CPUDelay - d.cpu
  892. c.delays.disk += stats.BlockIODelay - d.disk
  893. d.cpu = stats.CPUDelay
  894. d.disk = stats.BlockIODelay
  895. c.delaysByPid[pid] = d
  896. }
  897. }
  898. func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
  899. if len(c.mounts) == 0 {
  900. return nil
  901. }
  902. res := map[string]map[string]*proc.FSStat{}
  903. for _, mi := range c.mounts {
  904. var stat *proc.FSStat
  905. for pid := range c.processes {
  906. s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
  907. if err == nil {
  908. stat = &s
  909. break
  910. }
  911. }
  912. if stat == nil {
  913. continue
  914. }
  915. if _, ok := res[mi.MajorMinor]; !ok {
  916. res[mi.MajorMinor] = map[string]*proc.FSStat{}
  917. }
  918. res[mi.MajorMinor][mi.MountPoint] = stat
  919. }
  920. return res
  921. }
  922. func (c *Container) getListens() map[netaddr.IPPort]int {
  923. res := map[netaddr.IPPort]int{}
  924. for addr, byPid := range c.listens {
  925. open := 0
  926. isHostNs := false
  927. ips := map[netaddr.IP]bool{}
  928. for pid, details := range byPid {
  929. p := c.processes[pid]
  930. if p == nil {
  931. continue
  932. }
  933. if p.isHostNs() {
  934. isHostNs = true
  935. }
  936. if details.ClosedAt.IsZero() {
  937. open = 1
  938. }
  939. for _, ip := range details.NsIPs {
  940. ips[ip] = true
  941. }
  942. }
  943. if !addr.IP().IsUnspecified() {
  944. ips = map[netaddr.IP]bool{addr.IP(): true}
  945. }
  946. for ip := range ips {
  947. if ip.IsLoopback() && !isHostNs {
  948. continue
  949. }
  950. res[netaddr.IPPortFrom(ip, addr.Port())] = open
  951. }
  952. }
  953. return res
  954. }
  955. func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {
  956. if len(c.metadata.hostListens) == 0 {
  957. return nil
  958. }
  959. hasUnspecified := false
  960. for _, addrs := range c.metadata.hostListens {
  961. for _, addr := range addrs {
  962. if addr.IP().IsUnspecified() {
  963. hasUnspecified = true
  964. break
  965. }
  966. }
  967. }
  968. var hostIps []netaddr.IP
  969. if hasUnspecified {
  970. if ns, err := proc.GetHostNetNs(); err != nil {
  971. klog.Warningln(err)
  972. } else {
  973. ips, err := proc.GetNsIps(ns)
  974. _ = ns.Close()
  975. if err != nil {
  976. klog.Warningln(err)
  977. } else {
  978. hostIps = ips
  979. }
  980. }
  981. }
  982. res := map[string]map[netaddr.IPPort]struct{}{}
  983. for proxy, addrs := range c.metadata.hostListens {
  984. res[proxy] = map[netaddr.IPPort]struct{}{}
  985. for _, addr := range addrs {
  986. if addr.IP().IsUnspecified() {
  987. for _, ip := range hostIps {
  988. if addr.IP().Is4() && ip.Is4() || addr.IP().Is6() && ip.Is6() {
  989. res[proxy][netaddr.IPPortFrom(ip, addr.Port())] = struct{}{}
  990. }
  991. }
  992. } else {
  993. res[proxy][addr] = struct{}{}
  994. }
  995. }
  996. }
  997. return res
  998. }
  999. func (c *Container) ping() map[netaddr.IP]float64 {
  1000. netNs := netns.None()
  1001. for pid := range c.processes {
  1002. if pid == agentPid {
  1003. netNs = selfNetNs
  1004. break
  1005. }
  1006. ns, err := proc.GetNetNs(pid)
  1007. if err != nil {
  1008. if !common.IsNotExist(err) {
  1009. klog.Warningln(err)
  1010. }
  1011. continue
  1012. }
  1013. netNs = ns
  1014. defer netNs.Close()
  1015. break
  1016. }
  1017. if !netNs.IsOpen() {
  1018. return nil
  1019. }
  1020. ips := map[netaddr.IP]struct{}{}
  1021. for d := range c.connectsSuccessful {
  1022. ips[d.dst.IP()] = struct{}{}
  1023. }
  1024. for dst := range c.connectsFailed {
  1025. ips[dst.IP()] = struct{}{}
  1026. }
  1027. if len(ips) == 0 {
  1028. return nil
  1029. }
  1030. targets := make([]netaddr.IP, 0, len(ips))
  1031. for ip := range ips {
  1032. if ip.IsLoopback() {
  1033. continue
  1034. }
  1035. if !ip.Is4() { // pinger doesn't support IPv6 yet
  1036. continue
  1037. }
  1038. targets = append(targets, ip)
  1039. }
  1040. rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout)
  1041. if err != nil {
  1042. klog.Warningln(err)
  1043. return nil
  1044. }
  1045. return rtt
  1046. }
  1047. func (c *Container) runLogParser(logPath string) {
  1048. if *flags.DisableLogParsing {
  1049. return
  1050. }
  1051. containerId := string(c.id)
  1052. if logPath != "" {
  1053. if c.logParsers[logPath] != nil {
  1054. return
  1055. }
  1056. ch := make(chan logparser.LogEntry)
  1057. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1058. reader, err := logs.NewTailReader(proc.HostPath(logPath), ch)
  1059. if err != nil {
  1060. klog.Warningln(err)
  1061. parser.Stop()
  1062. return
  1063. }
  1064. klog.Infoln("started varlog logparser", "cg", c.cgroup.Id, "log", logPath)
  1065. c.logParsers[logPath] = &LogParser{parser: parser, stop: reader.Stop}
  1066. return
  1067. }
  1068. switch c.cgroup.ContainerType {
  1069. case cgroup.ContainerTypeSystemdService:
  1070. ch := make(chan logparser.LogEntry)
  1071. if err := JournaldSubscribe(c.cgroup, ch); err != nil {
  1072. klog.Warningln(err)
  1073. return
  1074. }
  1075. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1076. stop := func() {
  1077. JournaldUnsubscribe(c.cgroup)
  1078. }
  1079. klog.Infoln("started journald logparser", "cg", c.cgroup.Id)
  1080. c.logParsers["journald"] = &LogParser{parser: parser, stop: stop}
  1081. case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd, cgroup.ContainerTypeCrio:
  1082. if c.metadata.logPath == "" {
  1083. return
  1084. }
  1085. if parser := c.logParsers["stdout/stderr"]; parser != nil {
  1086. parser.Stop()
  1087. delete(c.logParsers, "stdout/stderr")
  1088. }
  1089. ch := make(chan logparser.LogEntry)
  1090. parser := logparser.NewParser(ch, c.metadata.logDecoder, logs.OtelLogEmitter(containerId))
  1091. reader, err := logs.NewTailReader(proc.HostPath(c.metadata.logPath), ch)
  1092. if err != nil {
  1093. klog.Warningln(err)
  1094. parser.Stop()
  1095. return
  1096. }
  1097. klog.Infoln("started container logparser", "cg", c.cgroup.Id)
  1098. c.logParsers["stdout/stderr"] = &LogParser{parser: parser, stop: reader.Stop}
  1099. }
  1100. }
  1101. func (c *Container) gc(now time.Time) {
  1102. // c.lock.Lock()
  1103. // defer c.lock.Unlock()
  1104. established := map[AddrPair]struct{}{}
  1105. establishedDst := map[netaddr.IPPort]struct{}{}
  1106. listens := map[netaddr.IPPort]string{}
  1107. seenNamespaces := map[string]bool{}
  1108. fdMap := map[uint64]struct{}{}
  1109. for _, p := range c.processes {
  1110. if seenNamespaces[p.NetNsId()] {
  1111. continue
  1112. }
  1113. sockets, err := proc.GetSockets(p.Pid)
  1114. if err != nil {
  1115. continue
  1116. }
  1117. fds, err := proc.ReadFds(p.Pid)
  1118. if err == nil {
  1119. for _, fd := range fds {
  1120. fdMap[fd.Fd] = struct{}{}
  1121. }
  1122. }
  1123. for _, s := range sockets {
  1124. if s.Listen {
  1125. listens[s.SAddr] = s.Inode
  1126. } else {
  1127. established[AddrPair{src: s.SAddr, dst: s.DAddr}] = struct{}{}
  1128. establishedDst[s.DAddr] = struct{}{}
  1129. }
  1130. }
  1131. seenNamespaces[p.NetNsId()] = true
  1132. }
  1133. c.revalidateListens(now, listens)
  1134. for srcDst, conn := range c.connectionsActive {
  1135. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1136. if _, ok := established[srcDst]; !ok {
  1137. delete(c.connectionsActive, srcDst)
  1138. if conn == c.connectionsByPidFd[pidFd] {
  1139. delete(c.connectionsByPidFd, pidFd)
  1140. }
  1141. continue
  1142. }
  1143. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1144. delete(c.connectionsActive, srcDst)
  1145. if conn == c.connectionsByPidFd[pidFd] {
  1146. delete(c.connectionsByPidFd, pidFd)
  1147. }
  1148. }
  1149. }
  1150. for srcDst, conn := range c.acceptsActive {
  1151. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1152. if _, ok := established[srcDst]; !ok {
  1153. delete(c.acceptsActive, srcDst)
  1154. if conn == c.acceptsByPidFd[pidFd] {
  1155. delete(c.acceptsByPidFd, pidFd)
  1156. }
  1157. continue
  1158. }
  1159. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1160. delete(c.acceptsActive, srcDst)
  1161. if conn == c.acceptsByPidFd[pidFd] {
  1162. delete(c.acceptsByPidFd, pidFd)
  1163. }
  1164. }
  1165. }
  1166. for _, conn := range c.connectionsByPidFd {
  1167. if _, ok := fdMap[conn.Fd]; !ok {
  1168. delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1169. }
  1170. }
  1171. for _, conn := range c.acceptsByPidFd {
  1172. if _, ok := fdMap[conn.Fd]; !ok {
  1173. delete(c.acceptsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1174. }
  1175. }
  1176. for dst, at := range c.connectLastAttempt {
  1177. _, active := establishedDst[dst]
  1178. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1179. delete(c.connectLastAttempt, dst)
  1180. delete(c.connectsFailed, dst)
  1181. for d := range c.connectsSuccessful {
  1182. if d.src == dst {
  1183. delete(c.connectsSuccessful, d)
  1184. }
  1185. }
  1186. c.l7Stats.delete(dst)
  1187. }
  1188. }
  1189. for dst, at := range c.acceptLastAttempt {
  1190. _, active := establishedDst[dst]
  1191. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1192. delete(c.acceptLastAttempt, dst)
  1193. for d := range c.acceptsSuccessful {
  1194. if d.src == dst {
  1195. delete(c.acceptsSuccessful, d)
  1196. }
  1197. }
  1198. c.l7Stats.delete(dst)
  1199. }
  1200. }
  1201. }
  1202. func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.IPPort]string) {
  1203. for addr, byPid := range c.listens {
  1204. if _, open := actualListens[addr]; open {
  1205. continue
  1206. }
  1207. klog.Warningln("deleting the outdated listen:", addr)
  1208. for _, details := range byPid {
  1209. if details.ClosedAt.IsZero() {
  1210. details.ClosedAt = now
  1211. }
  1212. }
  1213. }
  1214. missingListens := map[netaddr.IPPort]string{}
  1215. for addr, inode := range actualListens {
  1216. byPids, found := c.listens[addr]
  1217. if !found {
  1218. missingListens[addr] = inode
  1219. continue
  1220. }
  1221. open := false
  1222. for _, details := range byPids {
  1223. if details.ClosedAt.IsZero() {
  1224. open = true
  1225. break
  1226. }
  1227. }
  1228. if !open {
  1229. missingListens[addr] = inode
  1230. }
  1231. }
  1232. if len(missingListens) > 0 {
  1233. inodeToPid := map[string]uint32{}
  1234. for pid := range c.processes {
  1235. fds, err := proc.ReadFds(pid)
  1236. if err != nil {
  1237. klog.Warningln(err)
  1238. continue
  1239. }
  1240. for _, fd := range fds {
  1241. if fd.SocketInode != "" {
  1242. inodeToPid[fd.SocketInode] = pid
  1243. }
  1244. }
  1245. }
  1246. for addr, inode := range missingListens {
  1247. pid, found := inodeToPid[inode]
  1248. if !found {
  1249. continue
  1250. }
  1251. //klog.Warningln("missing listen found:", addr, pid)
  1252. c.onListenOpen(pid, addr, true)
  1253. }
  1254. }
  1255. for addr, pids := range c.listens {
  1256. for pid, details := range pids {
  1257. if !details.ClosedAt.IsZero() && now.Sub(details.ClosedAt) > gcInterval {
  1258. delete(c.listens[addr], pid)
  1259. }
  1260. }
  1261. if len(c.listens[addr]) == 0 {
  1262. delete(c.listens, addr)
  1263. }
  1264. }
  1265. }
  1266. func (c *Container) attachUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1267. klog.Infoln("[attach] attachUprobes start")
  1268. if tracer.DisableL7Tracing() {
  1269. return nil
  1270. }
  1271. codeType := c.GetCodeTypeFromCache(pid)
  1272. if codeType.IsUnknownCode() {
  1273. return nil
  1274. }
  1275. var err error
  1276. switch codeType {
  1277. case CodeTypeJava:
  1278. err = c.attachJVMUprobes(tracer, pid)
  1279. case CodeTypeJavaAot:
  1280. err = c.attachJVMUprobes(tracer, pid)
  1281. case CodeTypeGo:
  1282. err = c.attachTlsUprobes(tracer, pid)
  1283. case CodeTypeNetCoreAot:
  1284. err = c.attachNetCoreUprobes(tracer, pid)
  1285. }
  1286. if err != nil {
  1287. // TODO 条件卸载
  1288. c.errorClose(pid, 0)
  1289. return err
  1290. }
  1291. c.l7AttachSuccess()
  1292. return nil
  1293. }
  1294. func (c *Container) GetCodeTypeFromCache(pid uint32) CodeType {
  1295. p := c.processes[pid]
  1296. if p == nil {
  1297. return CodeTypeUnknown
  1298. }
  1299. if p.codeType.IsWaitCheck() {
  1300. p.codeType = GetExeType(pid, c.getRootfs())
  1301. }
  1302. return p.codeType
  1303. }
  1304. func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1305. p := c.processes[pid]
  1306. if p == nil {
  1307. return nil
  1308. }
  1309. if !p.openSslUprobesChecked {
  1310. sslProbes, err := tracer.AttachOpenSslUprobes(pid)
  1311. if err != nil {
  1312. return err
  1313. }
  1314. p.uprobes = append(p.uprobes, sslProbes...)
  1315. p.openSslUprobesChecked = true
  1316. }
  1317. if !p.goTlsUprobesChecked {
  1318. codeType := c.GetCodeTypeFromCache(pid)
  1319. goProbes, err := tracer.AttachGoTlsUprobes(pid, &c.AppInfo, uint16(codeType))
  1320. if err != nil {
  1321. return err
  1322. }
  1323. p.uprobes = append(p.uprobes, goProbes...)
  1324. p.goTlsUprobesChecked = true
  1325. }
  1326. return nil
  1327. }
  1328. func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1329. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1330. return nil
  1331. }
  1332. p := c.processes[pid]
  1333. if p == nil {
  1334. return nil
  1335. }
  1336. codeType := c.GetCodeTypeFromCache(pid)
  1337. if !p.jvmUprobesChecked {
  1338. p.jvmUprobesChecked = true
  1339. rootfs := c.getRootfs()
  1340. tracer.InitKProcInfo(pid, &c.AppInfo)
  1341. // TODO java Aot
  1342. if codeType.IsJvmCode() {
  1343. // check version
  1344. libjavaso, err := utils.GetSoPath(pid, "libjava.so", rootfs)
  1345. if err != nil {
  1346. klog.WithError(err).Errorf("[attach] Failed get so path")
  1347. return err
  1348. }
  1349. v, err := ebpftracer.GetJvmVersion(libjavaso)
  1350. if err != nil {
  1351. klog.WithError(err).Errorf("[attach] Failed get Java version")
  1352. return err
  1353. }
  1354. c.AppInfo.Version = v
  1355. major, minor, patch, err := ebpftracer.ParseVersion(v)
  1356. klog.Infof("[attach] version: %s (Major: %d, Minor: %d, Patch: %d)", v, major, minor, patch)
  1357. if major != 1 || minor != 8 {
  1358. return fmt.Errorf("[attach] Unsupported Java version")
  1359. }
  1360. }
  1361. libNioProbes, err := tracer.AttachJavaNioReadUprobes(pid, codeType, rootfs)
  1362. if err != nil {
  1363. klog.Error(err)
  1364. return err
  1365. }
  1366. p.uprobes = append(p.uprobes, libNioProbes...)
  1367. libNetProbes, err := tracer.AttachJavaNetWriteUprobes(pid, rootfs)
  1368. if err != nil {
  1369. klog.Error(err)
  1370. return err
  1371. }
  1372. p.uprobes = append(p.uprobes, libNetProbes...)
  1373. //p.jvmUprobesChecked = true
  1374. } else {
  1375. klog.Infof("[attach] %s-%d already attach", codeType.String(), pid)
  1376. }
  1377. return nil
  1378. }
  1379. func (c *Container) errorClose(pid uint32, closeType int) {
  1380. p := c.processes[pid]
  1381. if p != nil {
  1382. p.DynamicClose(closeType)
  1383. }
  1384. }
  1385. func (c *Container) attachNetCoreUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1386. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1387. return nil
  1388. }
  1389. p := c.processes[pid]
  1390. if p == nil {
  1391. return nil
  1392. }
  1393. if !p.jvmUprobesChecked {
  1394. //codeType := c.GetCodeTypeFromCache(pid)
  1395. tracer.InitKProcInfo(pid, &c.AppInfo)
  1396. p.uprobes = append(p.uprobes, tracer.AttachNetCoreNetThreadUprobes(pid)...)
  1397. readProbes, err := tracer.AttachNetCoreNetReadUprobes(pid)
  1398. if err != nil {
  1399. klog.Error(err)
  1400. return err
  1401. }
  1402. p.uprobes = append(p.uprobes, readProbes...)
  1403. WriteProbes, err := tracer.AttachNetCoreNetWriteUprobes(pid)
  1404. if err != nil {
  1405. klog.Error(err)
  1406. return err
  1407. }
  1408. p.uprobes = append(p.uprobes, WriteProbes...)
  1409. p.jvmUprobesChecked = true
  1410. }
  1411. return nil
  1412. }
  1413. func resolveFd(pid uint32, fd uint64) (mntId string, logPath string) {
  1414. info := proc.GetFdInfo(pid, fd)
  1415. if info == nil {
  1416. return
  1417. }
  1418. switch {
  1419. case info.Flags&os.O_WRONLY == 0 && info.Flags&os.O_RDWR == 0,
  1420. !strings.HasPrefix(info.Dest, "/"),
  1421. strings.HasPrefix(info.Dest, "/proc/"),
  1422. strings.HasPrefix(info.Dest, "/dev/"),
  1423. strings.HasPrefix(info.Dest, "/sys/"),
  1424. strings.HasSuffix(info.Dest, "(deleted)"):
  1425. return
  1426. }
  1427. mntId = info.MntId
  1428. if info.Flags&os.O_WRONLY != 0 && strings.HasPrefix(info.Dest, "/var/log/") &&
  1429. !strings.HasPrefix(info.Dest, "/var/log/pods/") &&
  1430. !strings.HasPrefix(info.Dest, "/var/log/containers/") &&
  1431. !strings.HasPrefix(info.Dest, "/var/log/journal/") {
  1432. logPath = info.Dest
  1433. }
  1434. return
  1435. }
  1436. func counter(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1437. return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labelValues...)
  1438. }
  1439. func gauge(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1440. return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...)
  1441. }