container.go 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626
  1. package containers
  2. import (
  3. debugelf "debug/elf"
  4. "fmt"
  5. "os"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/coroot/coroot-node-agent/utils"
  12. . "github.com/coroot/coroot-node-agent/utils/modelse"
  13. "github.com/coroot/coroot-node-agent/cgroup"
  14. "github.com/coroot/coroot-node-agent/common"
  15. "github.com/coroot/coroot-node-agent/ebpftracer"
  16. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  18. "github.com/coroot/coroot-node-agent/flags"
  19. "github.com/coroot/coroot-node-agent/logs"
  20. "github.com/coroot/coroot-node-agent/node"
  21. "github.com/coroot/coroot-node-agent/pinger"
  22. "github.com/coroot/coroot-node-agent/proc"
  23. "github.com/coroot/coroot-node-agent/tracing"
  24. "github.com/coroot/logparser"
  25. "github.com/prometheus/client_golang/prometheus"
  26. klog "github.com/sirupsen/logrus"
  27. "github.com/vishvananda/netns"
  28. "golang.org/x/exp/maps"
  29. "inet.af/netaddr"
  30. )
  31. var (
  32. gcInterval = 10 * time.Second
  33. AllAppInfoInterval = 1 * time.Minute
  34. pingTimeout = 300 * time.Millisecond
  35. )
  36. type ContainerID string
  37. type ContainerNetwork struct {
  38. NetworkID string
  39. }
  40. type ContainerMetadata struct {
  41. name string
  42. labels map[string]string
  43. volumes map[string]string
  44. logPath string
  45. image string
  46. logDecoder logparser.Decoder
  47. hostListens map[string][]netaddr.IPPort
  48. networks map[string]ContainerNetwork
  49. env map[string]string
  50. systemdTriggeredBy string
  51. rootfs string
  52. }
  53. type Delays struct {
  54. cpu time.Duration
  55. disk time.Duration
  56. }
  57. type LogParser struct {
  58. parser *logparser.Parser
  59. stop func()
  60. }
  61. func (p *LogParser) Stop() {
  62. if p.stop != nil {
  63. p.stop()
  64. }
  65. p.parser.Stop()
  66. }
  67. type AddrPair struct {
  68. src netaddr.IPPort
  69. dst netaddr.IPPort
  70. }
  71. type ActiveConnection struct {
  72. Dest netaddr.IPPort
  73. Src netaddr.IPPort
  74. ActualDest netaddr.IPPort
  75. Pid uint32
  76. Fd uint64
  77. Timestamp uint64
  78. Closed time.Time
  79. BytesSent uint64
  80. BytesReceived uint64
  81. http2Parser *l7.Http2Parser
  82. postgresParser *l7.PostgresParser
  83. mysqlParser *l7.MysqlParser
  84. dmParser *l7.DmParser
  85. }
  86. type ActiveAccept struct {
  87. Dest netaddr.IPPort
  88. Src netaddr.IPPort
  89. Pid uint32
  90. Fd uint64
  91. Timestamp uint64
  92. Closed time.Time
  93. BytesSent uint64
  94. BytesReceived uint64
  95. }
  96. type ListenDetails struct {
  97. ClosedAt time.Time
  98. NsIPs []netaddr.IP
  99. }
  100. type PidFd struct {
  101. Pid uint32
  102. Fd uint64
  103. }
  104. type K8sContainer struct {
  105. ns string
  106. podName string
  107. podId string
  108. workload string
  109. containerName string
  110. pid string
  111. }
  112. type ConnectionStats struct {
  113. Count uint64
  114. TotalTime time.Duration
  115. Retransmissions uint64
  116. BytesSent uint64
  117. PerBytesSent uint64
  118. BytesReceived uint64
  119. PerBytesReceived uint64
  120. Src netaddr.IPPort
  121. ConEstTime time.Duration
  122. FirstReadTime uint64
  123. FirstWriteTime uint64
  124. NewReadTime uint64
  125. }
  126. type AcceptStats struct {
  127. BytesSent uint64
  128. BytesReceived uint64
  129. }
  130. type Container struct {
  131. id ContainerID
  132. cgroup *cgroup.Cgroup
  133. metadata *ContainerMetadata
  134. K8sContainer
  135. processes map[uint32]*Process
  136. startedAt time.Time
  137. zombieAt time.Time
  138. restarts int
  139. delays Delays
  140. delaysByPid map[uint32]Delays
  141. delaysLock sync.Mutex
  142. listens map[netaddr.IPPort]map[uint32]*ListenDetails
  143. connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count
  144. connectsFailed map[netaddr.IPPort]int64 // dst -> count
  145. connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  146. connectionsActive map[AddrPair]*ActiveConnection
  147. connectionsByPidFd map[PidFd]*ActiveConnection
  148. acceptsSuccessful map[AddrPair]*AcceptStats
  149. acceptLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  150. acceptsActive map[AddrPair]*ActiveAccept
  151. acceptsByPidFd map[PidFd]*ActiveAccept
  152. l7Stats L7Stats
  153. dnsStats *L7Metrics
  154. oomKills int
  155. pythonThreadLockWaitTime time.Duration
  156. mounts map[string]proc.MountInfo
  157. logParsers map[string]*LogParser
  158. hostConntrack *Conntrack
  159. nsConntrack *Conntrack
  160. lbConntracks []*Conntrack
  161. registry *Registry
  162. lock sync.RWMutex
  163. done chan struct{}
  164. traceMap map[uint64]*tracing.Trace
  165. //instanceID utils.ID
  166. Symbols []debugelf.Symbol
  167. Uprobes []tracer.Uprobe
  168. UprobesMap map[string]tracer.Uprobe
  169. l7EventReady bool
  170. l7Attach bool
  171. // 白名单详情
  172. WhiteSettingInfo WhiteSettingInfo
  173. // 应用详情
  174. AppInfo AppInfo
  175. }
  176. func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) {
  177. netNs, err := proc.GetNetNs(pid)
  178. if err != nil {
  179. return nil, err
  180. }
  181. defer netNs.Close()
  182. c := &Container{
  183. id: id,
  184. cgroup: cg,
  185. metadata: md,
  186. processes: map[uint32]*Process{},
  187. delaysByPid: map[uint32]Delays{},
  188. listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
  189. connectsSuccessful: map[AddrPair]*ConnectionStats{},
  190. connectsFailed: map[netaddr.IPPort]int64{},
  191. connectLastAttempt: map[netaddr.IPPort]time.Time{},
  192. connectionsActive: map[AddrPair]*ActiveConnection{},
  193. connectionsByPidFd: map[PidFd]*ActiveConnection{},
  194. acceptsSuccessful: map[AddrPair]*AcceptStats{},
  195. acceptLastAttempt: map[netaddr.IPPort]time.Time{},
  196. acceptsActive: map[AddrPair]*ActiveAccept{},
  197. acceptsByPidFd: map[PidFd]*ActiveAccept{},
  198. l7Stats: L7Stats{},
  199. dnsStats: &L7Metrics{},
  200. mounts: map[string]proc.MountInfo{},
  201. logParsers: map[string]*LogParser{},
  202. hostConntrack: hostConntrack,
  203. done: make(chan struct{}),
  204. traceMap: make(map[uint64]*tracing.Trace),
  205. registry: registry,
  206. }
  207. for _, n := range md.networks {
  208. if nsHandle := FindNetworkLoadBalancerNs(n.NetworkID); nsHandle.IsOpen() {
  209. if ct, err := NewConntrack(nsHandle); err != nil {
  210. klog.Warningln(err)
  211. } else {
  212. c.lbConntracks = append(c.lbConntracks, ct)
  213. }
  214. _ = nsHandle.Close()
  215. }
  216. }
  217. c.runLogParser("")
  218. // go func() {
  219. // ticker := time.NewTicker(gcInterval)
  220. // defer ticker.Stop()
  221. // for {
  222. // select {
  223. // case <-c.done:
  224. // return
  225. // case t := <-ticker.C:
  226. // c.gc(t)
  227. // }
  228. // }
  229. // }()
  230. return c, nil
  231. }
  232. func (c *Container) Close() {
  233. for _, p := range c.logParsers {
  234. p.Stop()
  235. }
  236. for _, ct := range c.lbConntracks {
  237. _ = ct.Close()
  238. }
  239. if c.nsConntrack != nil {
  240. _ = c.nsConntrack.Close()
  241. }
  242. close(c.done)
  243. }
  244. func (c *Container) Dead(now time.Time) bool {
  245. return !c.zombieAt.IsZero() && now.Sub(c.zombieAt) > gcInterval
  246. }
  247. func (c *Container) Describe(ch chan<- *prometheus.Desc) {
  248. // some fixed metric description is required here to register/unregister the collector correctly
  249. ch <- prometheus.NewDesc("container", "", nil, nil)
  250. }
  251. func (c *Container) Collect(ch chan<- prometheus.Metric) {
  252. c.registry.updateTrafficStatsIfNecessary()
  253. c.lock.RLock()
  254. defer c.lock.RUnlock()
  255. if c.metadata.image != "" || c.metadata.systemdTriggeredBy != "" {
  256. ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image, c.metadata.systemdTriggeredBy)
  257. }
  258. ch <- counter(metrics.Restarts, float64(c.restarts))
  259. if cpu, err := c.cgroup.CpuStat(); err == nil {
  260. if cpu.LimitCores > 0 {
  261. ch <- gauge(metrics.CPULimit, cpu.LimitCores)
  262. }
  263. ch <- counter(metrics.CPUUsage, cpu.UsageSeconds)
  264. ch <- counter(metrics.ThrottledTime, cpu.ThrottledTimeSeconds)
  265. }
  266. if taskstatsClient != nil {
  267. c.updateDelays()
  268. ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second))
  269. ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second))
  270. }
  271. if s, err := c.cgroup.MemoryStat(); err == nil {
  272. ch <- gauge(metrics.MemoryRss, float64(s.RSS))
  273. ch <- gauge(metrics.MemoryCache, float64(s.Cache))
  274. if s.Limit > 0 {
  275. ch <- gauge(metrics.MemoryLimit, float64(s.Limit))
  276. }
  277. }
  278. if c.oomKills > 0 {
  279. ch <- counter(metrics.OOMKills, float64(c.oomKills))
  280. }
  281. if disks, err := node.GetDisks(); err == nil {
  282. ioStat, _ := c.cgroup.IOStat()
  283. for majorMinor, mounts := range c.getMounts() {
  284. dev := disks.GetParentBlockDevice(majorMinor)
  285. if dev == nil {
  286. continue
  287. }
  288. for mountPoint, fsStat := range mounts {
  289. dls := []string{mountPoint, dev.Name, c.metadata.volumes[mountPoint]}
  290. ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...)
  291. ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...)
  292. ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...)
  293. if io, ok := ioStat[majorMinor]; ok {
  294. ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...)
  295. ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...)
  296. ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...)
  297. ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...)
  298. }
  299. }
  300. }
  301. }
  302. for addr, open := range c.getListens() {
  303. ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "")
  304. }
  305. for proxy, addrs := range c.getProxiedListens() {
  306. for addr := range addrs {
  307. ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy)
  308. }
  309. }
  310. strInstanceID := strconv.FormatInt(c.AppInfo.InstanceIdHash.IntVal, 10)
  311. strAppId := strconv.FormatInt(c.AppInfo.AppIdHash.IntVal, 10)
  312. strAppName := c.AppInfo.AppName
  313. apps := c.registry.GetAllAppInfoFromServer()
  314. for d, stats := range c.connectsSuccessful {
  315. targetAppId := apps[d.dst.String()]
  316. ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  317. ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  318. if stats.Retransmissions > 0 {
  319. ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  320. }
  321. ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  322. ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  323. ch <- counter(metrics.NetBytesSentPer, float64(stats.PerBytesSent), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  324. ch <- counter(metrics.NetBytesReceivedPer, float64(stats.PerBytesReceived), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  325. ch <- counter(metrics.NetDataLatency, float64(stats.FirstReadTime-stats.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  326. ch <- counter(metrics.NetDataDuration, float64(stats.NewReadTime-stats.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  327. ch <- counter(metrics.NetEstTime, float64(stats.ConEstTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  328. klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime)
  329. stats.PerBytesReceived = 0
  330. stats.PerBytesSent = 0
  331. }
  332. // for d, stats := range c.acceptsSuccessful {
  333. // ch <- counter(metrics.NetAcceptsSuccessful, float64(0), d.src.String(), d.dst.String())
  334. // ch <- counter(metrics.NetAcceptBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String())
  335. // ch <- counter(metrics.NetAcceptBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String())
  336. // klog.Infof("c.acceptsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d", d.src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived)
  337. // }
  338. for dst, count := range c.connectsFailed {
  339. targetAppId := apps[dst.String()]
  340. ch <- counter(metrics.NetConnectionsFailed, float64(count), strInstanceID, strAppId,targetAppId, strAppName, dst.String())
  341. }
  342. connections := map[AddrPair]int{}
  343. for addrPair, conn := range c.connectionsActive {
  344. if !conn.Closed.IsZero() {
  345. continue
  346. }
  347. connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest}]++
  348. }
  349. for d, count := range connections {
  350. ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
  351. }
  352. for source, p := range c.logParsers {
  353. for _, c := range p.parser.GetCounters() {
  354. ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
  355. }
  356. }
  357. appTypes := map[string]struct{}{}
  358. seenJvms := map[string]bool{}
  359. seenDotNetApps := map[string]bool{}
  360. pids := maps.Keys(c.processes)
  361. sort.Slice(pids, func(i, j int) bool {
  362. return pids[i] < pids[j]
  363. })
  364. for _, pid := range pids {
  365. process := c.processes[pid]
  366. cmdline := proc.GetCmdline(pid)
  367. if len(cmdline) == 0 {
  368. continue
  369. }
  370. appType := guessApplicationType(cmdline)
  371. if appType != "" {
  372. appTypes[appType] = struct{}{}
  373. }
  374. if process.isGolangApp {
  375. appTypes["golang"] = struct{}{}
  376. }
  377. switch {
  378. case isJvm(cmdline):
  379. jvm, jMetrics := jvmMetrics(pid)
  380. if len(jMetrics) > 0 && !seenJvms[jvm] {
  381. seenJvms[jvm] = true
  382. for _, m := range jMetrics {
  383. ch <- m
  384. }
  385. }
  386. case process.dotNetMonitor != nil:
  387. appTypes["dotnet"] = struct{}{}
  388. appName := process.dotNetMonitor.AppName()
  389. if !seenDotNetApps[appName] {
  390. seenDotNetApps[appName] = true
  391. process.dotNetMonitor.Collect(ch)
  392. }
  393. }
  394. }
  395. for appType := range appTypes {
  396. ch <- gauge(metrics.ApplicationType, 1, appType)
  397. }
  398. if c.pythonThreadLockWaitTime > 0 {
  399. ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds())
  400. }
  401. if c.dnsStats.Requests != nil {
  402. c.dnsStats.Requests.Collect(ch)
  403. }
  404. if c.dnsStats.Latency != nil {
  405. c.dnsStats.Latency.Collect(ch)
  406. }
  407. c.l7Stats.collect(ch)
  408. if !*flags.DisablePinger {
  409. for ip, rtt := range c.ping() {
  410. ch <- gauge(metrics.NetLatency, rtt, ip.String())
  411. }
  412. }
  413. c.gc(time.Now())
  414. }
  415. func (c *Container) onProcessStart(pid uint32) *Process {
  416. c.lock.Lock()
  417. defer c.lock.Unlock()
  418. stats, err := TaskstatsPID(pid)
  419. if err != nil {
  420. klog.WithError(err).Errorf("Failed onProcessStart [%d]", pid)
  421. return nil
  422. }
  423. c.zombieAt = time.Time{}
  424. p := NewProcess(pid, stats, c.registry.tracer)
  425. if p == nil {
  426. return nil
  427. }
  428. c.processes[pid] = p
  429. if c.startedAt.IsZero() {
  430. c.startedAt = stats.BeginTime
  431. } else {
  432. min := stats.BeginTime
  433. for _, p := range c.processes {
  434. if p.StartedAt.Before(min) {
  435. min = p.StartedAt
  436. }
  437. }
  438. if min.After(c.startedAt) {
  439. c.restarts++
  440. c.startedAt = min
  441. }
  442. }
  443. return p
  444. }
  445. func (c *Container) onProcessExit(pid uint32, oomKill bool) {
  446. c.lock.Lock()
  447. defer c.lock.Unlock()
  448. if p := c.processes[pid]; p != nil {
  449. p.Close()
  450. }
  451. delete(c.processes, pid)
  452. if len(c.processes) == 0 {
  453. c.zombieAt = time.Now()
  454. }
  455. delete(c.delaysByPid, pid)
  456. if oomKill {
  457. c.oomKills++
  458. }
  459. }
  460. func (c *Container) onFileOpen(pid uint32, fd uint64) {
  461. mntId, logPath := resolveFd(pid, fd)
  462. func() {
  463. if mntId == "" {
  464. return
  465. }
  466. c.lock.Lock()
  467. _, ok := c.mounts[mntId]
  468. c.lock.Unlock()
  469. if ok {
  470. return
  471. }
  472. byMountId := proc.GetMountInfo(pid)
  473. if byMountId == nil {
  474. return
  475. }
  476. if mi, ok := byMountId[mntId]; ok {
  477. c.lock.Lock()
  478. c.mounts[mntId] = mi
  479. c.lock.Unlock()
  480. }
  481. }()
  482. if logPath != "" {
  483. c.lock.Lock()
  484. c.runLogParser(logPath)
  485. c.lock.Unlock()
  486. }
  487. }
  488. // set
  489. func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
  490. klog.Infof("TCP listen open pid=%d id=%s addr=%s", pid, c.id, addr)
  491. if common.PortFilter.ShouldBeSkipped(addr.Port()) {
  492. return
  493. }
  494. if !safe {
  495. c.lock.Lock()
  496. defer c.lock.Unlock()
  497. }
  498. if _, ok := c.listens[addr]; !ok {
  499. c.listens[addr] = map[uint32]*ListenDetails{}
  500. }
  501. details := &ListenDetails{}
  502. c.listens[addr][pid] = details
  503. if addr.IP().IsUnspecified() {
  504. ns, err := proc.GetNetNs(pid)
  505. if err != nil {
  506. if !common.IsNotExist(err) {
  507. klog.Warningln(err)
  508. }
  509. return
  510. }
  511. defer ns.Close()
  512. ips, err := proc.GetNsIps(ns)
  513. if err != nil {
  514. klog.Warningln(err)
  515. return
  516. }
  517. klog.Infof("got IPs %s for %s", ips, ns.UniqueId())
  518. details.NsIPs = ips
  519. }
  520. }
  521. func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
  522. klog.Infof("TCP listen close pid=%d id=%s addr=%s", pid, c.id, addr)
  523. c.lock.Lock()
  524. defer c.lock.Unlock()
  525. if _, byAddr := c.listens[addr]; byAddr {
  526. if _, byPid := c.listens[addr][pid]; byPid {
  527. if details := c.listens[addr][pid]; details != nil {
  528. details.ClosedAt = time.Now()
  529. }
  530. }
  531. }
  532. }
  533. func (c *Container) onAcceptOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  534. klog.Infof("accept pid=%d id=%s dstaddr=%s srcaddr=%s", pid, c.id, dst.IP(), src.IP())
  535. // if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  536. // return
  537. // }
  538. p := c.processes[pid]
  539. if p == nil {
  540. return
  541. }
  542. if dst.IP().IsLoopback() && !p.isHostNs() {
  543. return
  544. }
  545. // actualDst, err := c.getActualDestination(p, src, dst)
  546. // if err != nil {
  547. // if !common.IsNotExist(err) {
  548. // klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  549. // }
  550. // return
  551. // }
  552. // switch {
  553. // case actualDst == nil:
  554. // actualDst = &dst
  555. // case actualDst.IP().IsLoopback() && !p.isHostNs():
  556. // return
  557. // }
  558. // if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  559. // return
  560. // }
  561. c.lock.Lock()
  562. defer c.lock.Unlock()
  563. if !failed {
  564. key := AddrPair{src: dst, dst: src}
  565. stats := c.acceptsSuccessful[key]
  566. if stats == nil {
  567. stats = &AcceptStats{}
  568. c.acceptsSuccessful[key] = stats
  569. }
  570. acceptCon := &ActiveAccept{
  571. Dest: src,
  572. Src: dst,
  573. Pid: pid,
  574. Fd: fd,
  575. Timestamp: timestamp,
  576. }
  577. c.acceptsActive[AddrPair{src: dst, dst: src}] = acceptCon
  578. c.acceptsByPidFd[PidFd{Pid: pid, Fd: fd}] = acceptCon
  579. }
  580. c.acceptLastAttempt[dst] = time.Now()
  581. }
  582. func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  583. if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  584. return
  585. }
  586. p := c.processes[pid]
  587. if p == nil {
  588. return
  589. }
  590. if dst.IP().IsLoopback() && !p.isHostNs() {
  591. return
  592. }
  593. actualDst, err := c.getActualDestination(p, src, dst)
  594. if err != nil {
  595. if !common.IsNotExist(err) {
  596. klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  597. }
  598. return
  599. }
  600. switch {
  601. case actualDst == nil:
  602. actualDst = &dst
  603. case actualDst.IP().IsLoopback() && !p.isHostNs():
  604. return
  605. }
  606. if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  607. return
  608. }
  609. c.lock.Lock()
  610. defer c.lock.Unlock()
  611. if failed {
  612. c.connectsFailed[dst]++
  613. } else {
  614. key := AddrPair{src: dst, dst: *actualDst}
  615. stats := c.connectsSuccessful[key]
  616. if stats == nil {
  617. stats = &ConnectionStats{}
  618. c.connectsSuccessful[key] = stats
  619. }
  620. stats.Count++
  621. stats.TotalTime += duration
  622. stats.Src = src
  623. stats.ConEstTime = duration
  624. connection := &ActiveConnection{
  625. Dest: dst,
  626. Src: src,
  627. ActualDest: *actualDst,
  628. Pid: pid,
  629. Fd: fd,
  630. Timestamp: timestamp,
  631. }
  632. c.connectionsActive[AddrPair{src: src, dst: dst}] = connection
  633. c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection
  634. }
  635. c.connectLastAttempt[dst] = time.Now()
  636. }
  637. func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) {
  638. if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil {
  639. return actualDst, nil
  640. }
  641. for _, lb := range c.lbConntracks {
  642. if actualDst := lb.GetActualDestination(src, dst); actualDst != nil {
  643. return actualDst, nil
  644. }
  645. }
  646. actualDst := c.hostConntrack.GetActualDestination(src, dst)
  647. if actualDst != nil {
  648. return actualDst, nil
  649. }
  650. if !p.isHostNs() {
  651. if c.nsConntrack == nil {
  652. netNs, err := proc.GetNetNs(p.Pid)
  653. if err != nil {
  654. return nil, err
  655. }
  656. defer netNs.Close()
  657. c.nsConntrack, err = NewConntrack(netNs)
  658. if err != nil {
  659. return nil, err
  660. }
  661. }
  662. return c.nsConntrack.GetActualDestination(src, dst), nil
  663. }
  664. return nil, nil
  665. }
  666. func (c *Container) onConnectionClose(e ebpftracer.Event) {
  667. c.lock.Lock()
  668. conn := c.connectionsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  669. c.lock.Unlock()
  670. if conn != nil {
  671. if conn.Closed.IsZero() {
  672. if e.TrafficStats != nil {
  673. c.lock.Lock()
  674. c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived, e.FirstReadTime, e.FirstWriteTime, e.NewReadTime)
  675. c.lock.Unlock()
  676. }
  677. conn.Closed = time.Now()
  678. }
  679. }
  680. }
  681. func (c *Container) onAcceptClose(e ebpftracer.Event) {
  682. c.lock.Lock()
  683. conn := c.acceptsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  684. c.lock.Unlock()
  685. if conn != nil {
  686. if conn.Closed.IsZero() {
  687. if e.TrafficStats != nil {
  688. c.lock.Lock()
  689. c.updateAcceptTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived)
  690. c.lock.Unlock()
  691. }
  692. conn.Closed = time.Now()
  693. }
  694. }
  695. }
  696. func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) {
  697. if u == nil {
  698. return
  699. }
  700. c.lock.Lock()
  701. defer c.lock.Unlock()
  702. c.updateConnectionTrafficStats(c.connectionsByPidFd[PidFd{Pid: u.Pid, Fd: u.FD}], u.BytesSent, u.BytesReceived, 0, 0, 0)
  703. }
  704. func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, received, firstreadtime, firstwritetime, newreadtime uint64) {
  705. if ac == nil {
  706. return
  707. }
  708. key := AddrPair{src: ac.Dest, dst: ac.ActualDest}
  709. stats := c.connectsSuccessful[key]
  710. if stats == nil {
  711. stats = &ConnectionStats{}
  712. c.connectsSuccessful[key] = stats
  713. }
  714. if sent > ac.BytesSent {
  715. stats.BytesSent += sent - ac.BytesSent
  716. stats.PerBytesSent = sent - ac.BytesSent
  717. }
  718. if received > ac.BytesReceived {
  719. stats.BytesReceived += received - ac.BytesReceived
  720. stats.PerBytesReceived = received - ac.BytesReceived
  721. }
  722. if firstreadtime != 0 && firstwritetime != 0 && newreadtime != 0 {
  723. stats.FirstReadTime = firstreadtime
  724. stats.FirstWriteTime = firstwritetime
  725. stats.NewReadTime = newreadtime
  726. }
  727. ac.BytesSent = sent
  728. ac.BytesReceived = received
  729. }
  730. func (c *Container) updateAcceptTrafficStats(ac *ActiveAccept, sent, received uint64) {
  731. if ac == nil {
  732. return
  733. }
  734. klog.Infoln("TCP onConnectionClose5", ac.BytesSent, ac.BytesReceived, ac)
  735. key := AddrPair{src: ac.Src, dst: ac.Dest}
  736. stats := c.acceptsSuccessful[key]
  737. if stats == nil {
  738. stats = &AcceptStats{}
  739. c.acceptsSuccessful[key] = stats
  740. }
  741. if sent > ac.BytesSent {
  742. stats.BytesSent += sent - ac.BytesSent
  743. }
  744. if received > ac.BytesReceived {
  745. stats.BytesReceived += received - ac.BytesReceived
  746. }
  747. ac.BytesSent = sent
  748. ac.BytesReceived = received
  749. }
  750. func (c *Container) onDNSRequest(r *l7.RequestData) (map[netaddr.IP]string, string, string) {
  751. status := r.Status.DNS()
  752. if status == "" {
  753. return nil, "", ""
  754. }
  755. t, fqdn, ips := l7.ParseDns(r.Payload)
  756. if t == "" {
  757. return nil, "", ""
  758. }
  759. if c.dnsStats.Requests == nil {
  760. dnsReq := L7Requests[l7.ProtocolDNS]
  761. c.dnsStats.Requests = prometheus.NewCounterVec(
  762. prometheus.CounterOpts{Name: dnsReq.Name, Help: dnsReq.Help},
  763. []string{"request_type", "domain", "status"},
  764. )
  765. }
  766. if m, _ := c.dnsStats.Requests.GetMetricWithLabelValues(t, fqdn, status); m != nil {
  767. m.Inc()
  768. }
  769. if r.Duration != 0 {
  770. if c.dnsStats.Latency == nil {
  771. dnsLatency := L7Latency[l7.ProtocolDNS]
  772. c.dnsStats.Latency = prometheus.NewHistogram(prometheus.HistogramOpts{Name: dnsLatency.Name, Help: dnsLatency.Help})
  773. }
  774. c.dnsStats.Latency.Observe(r.Duration.Seconds())
  775. }
  776. ip2fqdn := map[netaddr.IP]string{}
  777. if fqdn != "" {
  778. for _, ip := range ips {
  779. ip2fqdn[ip] = fqdn
  780. }
  781. }
  782. return ip2fqdn, t, fqdn
  783. }
  784. func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  785. c.lock.Lock()
  786. defer c.lock.Unlock()
  787. if r.Protocol == l7.ProtocolDNS {
  788. //return c.onDNSRequest(r)
  789. }
  790. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  791. if conn == nil {
  792. return nil
  793. }
  794. if timestamp != 0 && conn.Timestamp != timestamp {
  795. return nil
  796. }
  797. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  798. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  799. switch r.Protocol {
  800. case l7.ProtocolHTTP:
  801. stats.observe(r.Status.Http(), "", r.Duration)
  802. method, path := l7.ParseHttp(r.Payload)
  803. trace.HttpRequest(method, path, r.Status, r.Duration)
  804. case l7.ProtocolHTTP2:
  805. if conn.http2Parser == nil {
  806. conn.http2Parser = l7.NewHttp2Parser()
  807. }
  808. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  809. for _, req := range requests {
  810. stats.observe(req.Status.Http(), "", req.Duration)
  811. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  812. }
  813. case l7.ProtocolPostgres:
  814. if r.Method != l7.MethodStatementClose {
  815. stats.observe(r.Status.String(), "", r.Duration)
  816. }
  817. if conn.postgresParser == nil {
  818. conn.postgresParser = l7.NewPostgresParser()
  819. }
  820. query := conn.postgresParser.Parse(r.Payload)
  821. trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  822. case l7.ProtocolMysql:
  823. if r.Method != l7.MethodStatementClose {
  824. stats.observe(r.Status.String(), "", r.Duration)
  825. }
  826. if conn.mysqlParser == nil {
  827. conn.mysqlParser = l7.NewMysqlParser()
  828. }
  829. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  830. trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  831. case l7.ProtocolMemcached:
  832. stats.observe(r.Status.String(), "", r.Duration)
  833. cmd, items := l7.ParseMemcached(r.Payload)
  834. trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  835. case l7.ProtocolRedis:
  836. stats.observe(r.Status.String(), "", r.Duration)
  837. cmd, args := l7.ParseRedis(r.Payload)
  838. trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  839. case l7.ProtocolMongo:
  840. stats.observe(r.Status.String(), "", r.Duration)
  841. query := l7.ParseMongo(r.Payload)
  842. trace.MongoQuery(query, r.Status.Error(), r.Duration)
  843. case l7.ProtocolKafka, l7.ProtocolCassandra:
  844. stats.observe(r.Status.String(), "", r.Duration)
  845. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  846. stats.observe(r.Status.String(), r.Method.String(), 0)
  847. case l7.ProtocolDubbo2:
  848. stats.observe(r.Status.String(), "", r.Duration)
  849. }
  850. return nil
  851. }
  852. func (c *Container) onRetransmission(srcDst AddrPair) bool {
  853. c.lock.Lock()
  854. defer c.lock.Unlock()
  855. conn, ok := c.connectionsActive[srcDst]
  856. if !ok {
  857. return false
  858. }
  859. key := AddrPair{src: srcDst.dst, dst: conn.ActualDest}
  860. stats := c.connectsSuccessful[key]
  861. if stats == nil {
  862. stats = &ConnectionStats{}
  863. c.connectsSuccessful[key] = stats
  864. }
  865. stats.Retransmissions++
  866. return true
  867. }
  868. func (c *Container) updateDelays() {
  869. c.delaysLock.Lock()
  870. defer c.delaysLock.Unlock()
  871. for pid := range c.processes {
  872. stats, err := TaskstatsTGID(pid)
  873. if err != nil {
  874. continue
  875. }
  876. d := c.delaysByPid[pid]
  877. c.delays.cpu += stats.CPUDelay - d.cpu
  878. c.delays.disk += stats.BlockIODelay - d.disk
  879. d.cpu = stats.CPUDelay
  880. d.disk = stats.BlockIODelay
  881. c.delaysByPid[pid] = d
  882. }
  883. }
  884. func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
  885. if len(c.mounts) == 0 {
  886. return nil
  887. }
  888. res := map[string]map[string]*proc.FSStat{}
  889. for _, mi := range c.mounts {
  890. var stat *proc.FSStat
  891. for pid := range c.processes {
  892. s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
  893. if err == nil {
  894. stat = &s
  895. break
  896. }
  897. }
  898. if stat == nil {
  899. continue
  900. }
  901. if _, ok := res[mi.MajorMinor]; !ok {
  902. res[mi.MajorMinor] = map[string]*proc.FSStat{}
  903. }
  904. res[mi.MajorMinor][mi.MountPoint] = stat
  905. }
  906. return res
  907. }
  908. func (c *Container) getListens() map[netaddr.IPPort]int {
  909. res := map[netaddr.IPPort]int{}
  910. for addr, byPid := range c.listens {
  911. open := 0
  912. isHostNs := false
  913. ips := map[netaddr.IP]bool{}
  914. for pid, details := range byPid {
  915. p := c.processes[pid]
  916. if p == nil {
  917. continue
  918. }
  919. if p.isHostNs() {
  920. isHostNs = true
  921. }
  922. if details.ClosedAt.IsZero() {
  923. open = 1
  924. }
  925. for _, ip := range details.NsIPs {
  926. ips[ip] = true
  927. }
  928. }
  929. if !addr.IP().IsUnspecified() {
  930. ips = map[netaddr.IP]bool{addr.IP(): true}
  931. }
  932. for ip := range ips {
  933. if ip.IsLoopback() && !isHostNs {
  934. continue
  935. }
  936. res[netaddr.IPPortFrom(ip, addr.Port())] = open
  937. }
  938. }
  939. return res
  940. }
  941. func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {
  942. if len(c.metadata.hostListens) == 0 {
  943. return nil
  944. }
  945. hasUnspecified := false
  946. for _, addrs := range c.metadata.hostListens {
  947. for _, addr := range addrs {
  948. if addr.IP().IsUnspecified() {
  949. hasUnspecified = true
  950. break
  951. }
  952. }
  953. }
  954. var hostIps []netaddr.IP
  955. if hasUnspecified {
  956. if ns, err := proc.GetHostNetNs(); err != nil {
  957. klog.Warningln(err)
  958. } else {
  959. ips, err := proc.GetNsIps(ns)
  960. _ = ns.Close()
  961. if err != nil {
  962. klog.Warningln(err)
  963. } else {
  964. hostIps = ips
  965. }
  966. }
  967. }
  968. res := map[string]map[netaddr.IPPort]struct{}{}
  969. for proxy, addrs := range c.metadata.hostListens {
  970. res[proxy] = map[netaddr.IPPort]struct{}{}
  971. for _, addr := range addrs {
  972. if addr.IP().IsUnspecified() {
  973. for _, ip := range hostIps {
  974. if addr.IP().Is4() && ip.Is4() || addr.IP().Is6() && ip.Is6() {
  975. res[proxy][netaddr.IPPortFrom(ip, addr.Port())] = struct{}{}
  976. }
  977. }
  978. } else {
  979. res[proxy][addr] = struct{}{}
  980. }
  981. }
  982. }
  983. return res
  984. }
  985. func (c *Container) ping() map[netaddr.IP]float64 {
  986. netNs := netns.None()
  987. for pid := range c.processes {
  988. if pid == agentPid {
  989. netNs = selfNetNs
  990. break
  991. }
  992. ns, err := proc.GetNetNs(pid)
  993. if err != nil {
  994. if !common.IsNotExist(err) {
  995. klog.Warningln(err)
  996. }
  997. continue
  998. }
  999. netNs = ns
  1000. defer netNs.Close()
  1001. break
  1002. }
  1003. if !netNs.IsOpen() {
  1004. return nil
  1005. }
  1006. ips := map[netaddr.IP]struct{}{}
  1007. for d := range c.connectsSuccessful {
  1008. ips[d.dst.IP()] = struct{}{}
  1009. }
  1010. for dst := range c.connectsFailed {
  1011. ips[dst.IP()] = struct{}{}
  1012. }
  1013. if len(ips) == 0 {
  1014. return nil
  1015. }
  1016. targets := make([]netaddr.IP, 0, len(ips))
  1017. for ip := range ips {
  1018. if ip.IsLoopback() {
  1019. continue
  1020. }
  1021. if !ip.Is4() { // pinger doesn't support IPv6 yet
  1022. continue
  1023. }
  1024. targets = append(targets, ip)
  1025. }
  1026. rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout)
  1027. if err != nil {
  1028. klog.Warningln(err)
  1029. return nil
  1030. }
  1031. return rtt
  1032. }
  1033. func (c *Container) runLogParser(logPath string) {
  1034. if *flags.DisableLogParsing {
  1035. return
  1036. }
  1037. containerId := string(c.id)
  1038. if logPath != "" {
  1039. if c.logParsers[logPath] != nil {
  1040. return
  1041. }
  1042. ch := make(chan logparser.LogEntry)
  1043. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1044. reader, err := logs.NewTailReader(proc.HostPath(logPath), ch)
  1045. if err != nil {
  1046. klog.Warningln(err)
  1047. parser.Stop()
  1048. return
  1049. }
  1050. klog.Infoln("started varlog logparser", "cg", c.cgroup.Id, "log", logPath)
  1051. c.logParsers[logPath] = &LogParser{parser: parser, stop: reader.Stop}
  1052. return
  1053. }
  1054. switch c.cgroup.ContainerType {
  1055. case cgroup.ContainerTypeSystemdService:
  1056. ch := make(chan logparser.LogEntry)
  1057. if err := JournaldSubscribe(c.cgroup, ch); err != nil {
  1058. klog.Warningln(err)
  1059. return
  1060. }
  1061. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1062. stop := func() {
  1063. JournaldUnsubscribe(c.cgroup)
  1064. }
  1065. klog.Infoln("started journald logparser", "cg", c.cgroup.Id)
  1066. c.logParsers["journald"] = &LogParser{parser: parser, stop: stop}
  1067. case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd, cgroup.ContainerTypeCrio:
  1068. if c.metadata.logPath == "" {
  1069. return
  1070. }
  1071. if parser := c.logParsers["stdout/stderr"]; parser != nil {
  1072. parser.Stop()
  1073. delete(c.logParsers, "stdout/stderr")
  1074. }
  1075. ch := make(chan logparser.LogEntry)
  1076. parser := logparser.NewParser(ch, c.metadata.logDecoder, logs.OtelLogEmitter(containerId))
  1077. reader, err := logs.NewTailReader(proc.HostPath(c.metadata.logPath), ch)
  1078. if err != nil {
  1079. klog.Warningln(err)
  1080. parser.Stop()
  1081. return
  1082. }
  1083. klog.Infoln("started container logparser", "cg", c.cgroup.Id)
  1084. c.logParsers["stdout/stderr"] = &LogParser{parser: parser, stop: reader.Stop}
  1085. }
  1086. }
  1087. func (c *Container) gc(now time.Time) {
  1088. // c.lock.Lock()
  1089. // defer c.lock.Unlock()
  1090. established := map[AddrPair]struct{}{}
  1091. establishedDst := map[netaddr.IPPort]struct{}{}
  1092. listens := map[netaddr.IPPort]string{}
  1093. seenNamespaces := map[string]bool{}
  1094. fdMap := map[uint64]struct{}{}
  1095. for _, p := range c.processes {
  1096. if seenNamespaces[p.NetNsId()] {
  1097. continue
  1098. }
  1099. sockets, err := proc.GetSockets(p.Pid)
  1100. if err != nil {
  1101. continue
  1102. }
  1103. fds, err := proc.ReadFds(p.Pid)
  1104. if err == nil {
  1105. for _, fd := range fds {
  1106. fdMap[fd.Fd] = struct{}{}
  1107. }
  1108. }
  1109. for _, s := range sockets {
  1110. if s.Listen {
  1111. listens[s.SAddr] = s.Inode
  1112. } else {
  1113. established[AddrPair{src: s.SAddr, dst: s.DAddr}] = struct{}{}
  1114. establishedDst[s.DAddr] = struct{}{}
  1115. }
  1116. }
  1117. seenNamespaces[p.NetNsId()] = true
  1118. }
  1119. c.revalidateListens(now, listens)
  1120. for srcDst, conn := range c.connectionsActive {
  1121. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1122. if _, ok := established[srcDst]; !ok {
  1123. delete(c.connectionsActive, srcDst)
  1124. if conn == c.connectionsByPidFd[pidFd] {
  1125. delete(c.connectionsByPidFd, pidFd)
  1126. }
  1127. continue
  1128. }
  1129. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1130. delete(c.connectionsActive, srcDst)
  1131. if conn == c.connectionsByPidFd[pidFd] {
  1132. delete(c.connectionsByPidFd, pidFd)
  1133. }
  1134. }
  1135. }
  1136. for srcDst, conn := range c.acceptsActive {
  1137. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1138. if _, ok := established[srcDst]; !ok {
  1139. delete(c.acceptsActive, srcDst)
  1140. if conn == c.acceptsByPidFd[pidFd] {
  1141. delete(c.acceptsByPidFd, pidFd)
  1142. }
  1143. continue
  1144. }
  1145. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1146. delete(c.acceptsActive, srcDst)
  1147. if conn == c.acceptsByPidFd[pidFd] {
  1148. delete(c.acceptsByPidFd, pidFd)
  1149. }
  1150. }
  1151. }
  1152. for _, conn := range c.connectionsByPidFd {
  1153. if _, ok := fdMap[conn.Fd]; !ok {
  1154. delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1155. }
  1156. }
  1157. for _, conn := range c.acceptsByPidFd {
  1158. if _, ok := fdMap[conn.Fd]; !ok {
  1159. delete(c.acceptsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1160. }
  1161. }
  1162. for dst, at := range c.connectLastAttempt {
  1163. _, active := establishedDst[dst]
  1164. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1165. delete(c.connectLastAttempt, dst)
  1166. delete(c.connectsFailed, dst)
  1167. for d := range c.connectsSuccessful {
  1168. if d.src == dst {
  1169. delete(c.connectsSuccessful, d)
  1170. }
  1171. }
  1172. c.l7Stats.delete(dst)
  1173. }
  1174. }
  1175. for dst, at := range c.acceptLastAttempt {
  1176. _, active := establishedDst[dst]
  1177. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1178. delete(c.acceptLastAttempt, dst)
  1179. for d := range c.acceptsSuccessful {
  1180. if d.src == dst {
  1181. delete(c.acceptsSuccessful, d)
  1182. }
  1183. }
  1184. c.l7Stats.delete(dst)
  1185. }
  1186. }
  1187. }
  1188. func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.IPPort]string) {
  1189. for addr, byPid := range c.listens {
  1190. if _, open := actualListens[addr]; open {
  1191. continue
  1192. }
  1193. klog.Warningln("deleting the outdated listen:", addr)
  1194. for _, details := range byPid {
  1195. if details.ClosedAt.IsZero() {
  1196. details.ClosedAt = now
  1197. }
  1198. }
  1199. }
  1200. missingListens := map[netaddr.IPPort]string{}
  1201. for addr, inode := range actualListens {
  1202. byPids, found := c.listens[addr]
  1203. if !found {
  1204. missingListens[addr] = inode
  1205. continue
  1206. }
  1207. open := false
  1208. for _, details := range byPids {
  1209. if details.ClosedAt.IsZero() {
  1210. open = true
  1211. break
  1212. }
  1213. }
  1214. if !open {
  1215. missingListens[addr] = inode
  1216. }
  1217. }
  1218. if len(missingListens) > 0 {
  1219. inodeToPid := map[string]uint32{}
  1220. for pid := range c.processes {
  1221. fds, err := proc.ReadFds(pid)
  1222. if err != nil {
  1223. klog.Warningln(err)
  1224. continue
  1225. }
  1226. for _, fd := range fds {
  1227. if fd.SocketInode != "" {
  1228. inodeToPid[fd.SocketInode] = pid
  1229. }
  1230. }
  1231. }
  1232. for addr, inode := range missingListens {
  1233. pid, found := inodeToPid[inode]
  1234. if !found {
  1235. continue
  1236. }
  1237. //klog.Warningln("missing listen found:", addr, pid)
  1238. c.onListenOpen(pid, addr, true)
  1239. }
  1240. }
  1241. for addr, pids := range c.listens {
  1242. for pid, details := range pids {
  1243. if !details.ClosedAt.IsZero() && now.Sub(details.ClosedAt) > gcInterval {
  1244. delete(c.listens[addr], pid)
  1245. }
  1246. }
  1247. if len(c.listens[addr]) == 0 {
  1248. delete(c.listens, addr)
  1249. }
  1250. }
  1251. }
  1252. func (c *Container) AttachUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1253. klog.Infoln("[attach] attachUprobes start")
  1254. if tracer.DisableL7Tracing() {
  1255. return nil
  1256. }
  1257. codeType := c.GetCodeTypeFromCache(pid)
  1258. if codeType.IsUnknownCode() {
  1259. return nil
  1260. }
  1261. var err error
  1262. switch codeType {
  1263. case CodeTypeJava:
  1264. err = c.attachJVMUprobes(tracer, pid)
  1265. case CodeTypeJavaAot:
  1266. err = c.attachJavaAotUprobes(tracer, pid)
  1267. case CodeTypeGo:
  1268. err = c.attachTlsUprobes(tracer, pid)
  1269. case CodeTypeNetCoreAot:
  1270. err = c.attachNetCoreUprobes(tracer, pid)
  1271. }
  1272. if err != nil {
  1273. c.DetachUprobes(pid, APP_UPROBE_ERROR)
  1274. return err
  1275. }
  1276. c.l7AttachSuccess()
  1277. return nil
  1278. }
  1279. func (c *Container) GetCodeTypeFromCache(pid uint32) CodeType {
  1280. p := c.processes[pid]
  1281. if p == nil {
  1282. return CodeTypeUnknown
  1283. }
  1284. if p.codeType.IsWaitCheck() {
  1285. p.codeType = GetExeType(pid, c.getRootfs())
  1286. }
  1287. return p.codeType
  1288. }
  1289. func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1290. p := c.processes[pid]
  1291. if p == nil {
  1292. return nil
  1293. }
  1294. if !p.openSslUprobesChecked {
  1295. sslProbes, err := tracer.AttachOpenSslUprobes(pid)
  1296. if err != nil {
  1297. return err
  1298. }
  1299. p.uprobes = append(p.uprobes, sslProbes...)
  1300. p.openSslUprobesChecked = true
  1301. }
  1302. if !p.goTlsUprobesChecked {
  1303. codeType := c.GetCodeTypeFromCache(pid)
  1304. goProbes, err := tracer.AttachGoTlsUprobes(pid, &c.AppInfo, uint16(codeType))
  1305. if err != nil {
  1306. return err
  1307. }
  1308. p.uprobes = append(p.uprobes, goProbes...)
  1309. p.goTlsUprobesChecked = true
  1310. }
  1311. return nil
  1312. }
  1313. func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1314. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1315. return nil
  1316. }
  1317. p := c.processes[pid]
  1318. if p == nil {
  1319. return nil
  1320. }
  1321. codeType := c.GetCodeTypeFromCache(pid)
  1322. if !p.jvmAttachOnce {
  1323. p.jvmAttachOnce = true
  1324. rootfs := c.getRootfs()
  1325. tracer.InitKProcInfo(pid, &c.AppInfo)
  1326. // TODO java Aot
  1327. if codeType.IsJvmCode() {
  1328. // check version
  1329. libjavaso, err := utils.GetSoPath(pid, "libjava.so", rootfs)
  1330. if err != nil {
  1331. klog.WithError(err).Errorf("[attach] Failed get so path")
  1332. return err
  1333. }
  1334. v, err := ebpftracer.GetJvmVersion(libjavaso)
  1335. if err != nil {
  1336. klog.WithError(err).Errorf("[attach] Failed get Java version")
  1337. return err
  1338. }
  1339. c.AppInfo.Version = v
  1340. major, minor, patch, err := ebpftracer.ParseVersion(v)
  1341. klog.Infof("[attach] version: %s (Major: %d, Minor: %d, Patch: %d)", v, major, minor, patch)
  1342. if major != 1 || minor != 8 {
  1343. return fmt.Errorf("[attach] Unsupported Java version")
  1344. }
  1345. }
  1346. libNioProbes, err := tracer.AttachJavaNioReadUprobes(pid, codeType, rootfs)
  1347. if err != nil {
  1348. klog.Error(err)
  1349. return err
  1350. }
  1351. p.uprobes = append(p.uprobes, libNioProbes...)
  1352. libNetProbes, err := tracer.AttachJavaNetWriteUprobes(pid, rootfs)
  1353. if err != nil {
  1354. klog.Error(err)
  1355. return err
  1356. }
  1357. p.uprobes = append(p.uprobes, libNetProbes...)
  1358. //p.jvmUprobesChecked = true
  1359. } else {
  1360. klog.Infof("[attach] %s-%d already attach", codeType.String(), pid)
  1361. }
  1362. return nil
  1363. }
  1364. func (c *Container) attachJavaAotUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1365. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1366. return nil
  1367. }
  1368. p := c.processes[pid]
  1369. if p == nil {
  1370. return nil
  1371. }
  1372. codeType := c.GetCodeTypeFromCache(pid)
  1373. if !p.jvmAttachOnce {
  1374. p.jvmAttachOnce = true
  1375. rootfs := c.getRootfs()
  1376. tracer.InitKProcInfo(pid, &c.AppInfo)
  1377. // // TODO java Aot
  1378. // if codeType.IsJavaAotCode() {
  1379. // // check version
  1380. // libjavaso, err := utils.GetSoPath(pid, "libjava.so", rootfs)
  1381. // if err != nil {
  1382. // klog.WithError(err).Errorf("[attach] Failed get so path")
  1383. // return err
  1384. // }
  1385. // v, err := ebpftracer.GetJvmVersion(libjavaso)
  1386. // if err != nil {
  1387. // klog.WithError(err).Errorf("[attach] Failed get Java version")
  1388. // return err
  1389. // }
  1390. // c.AppInfo.Version = v
  1391. // major, minor, patch, err := ebpftracer.ParseVersion(v)
  1392. // klog.Infof("[attach] version: %s (Major: %d, Minor: %d, Patch: %d)", v, major, minor, patch)
  1393. // if major != 1 || minor != 8 {
  1394. // return fmt.Errorf("[attach] Unsupported Java version")
  1395. // }
  1396. // }
  1397. libNioProbes, err := tracer.AttachJavaAotNioReadUprobes(pid, codeType, rootfs)
  1398. if err != nil {
  1399. klog.Error(err)
  1400. return err
  1401. }
  1402. p.uprobes = append(p.uprobes, libNioProbes...)
  1403. libNetProbes, err := tracer.AttachJavaAotNetWriteUprobes(pid, rootfs)
  1404. if err != nil {
  1405. klog.Error(err)
  1406. return err
  1407. }
  1408. p.uprobes = append(p.uprobes, libNetProbes...)
  1409. //p.jvmUprobesChecked = true
  1410. } else {
  1411. klog.Infof("[attach] %s-%d already attach", codeType.String(), pid)
  1412. }
  1413. return nil
  1414. }
  1415. func (c *Container) errorClose(pid uint32, closeType int) {
  1416. p := c.processes[pid]
  1417. if p != nil {
  1418. p.DynamicClose(closeType)
  1419. }
  1420. }
  1421. func (c *Container) attachNetCoreUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1422. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1423. return nil
  1424. }
  1425. p := c.processes[pid]
  1426. if p == nil {
  1427. return nil
  1428. }
  1429. if !p.jvmAttachOnce {
  1430. p.jvmAttachOnce = true
  1431. //codeType := c.GetCodeTypeFromCache(pid)
  1432. tracer.InitKProcInfo(pid, &c.AppInfo)
  1433. p.uprobes = append(p.uprobes, tracer.AttachNetCoreNetThreadUprobes(pid)...)
  1434. readProbes, err := tracer.AttachNetCoreNetReadUprobes(pid)
  1435. if err != nil {
  1436. klog.Error(err)
  1437. return err
  1438. }
  1439. p.uprobes = append(p.uprobes, readProbes...)
  1440. WriteProbes, err := tracer.AttachNetCoreNetWriteUprobes(pid)
  1441. if err != nil {
  1442. klog.Error(err)
  1443. return err
  1444. }
  1445. p.uprobes = append(p.uprobes, WriteProbes...)
  1446. }
  1447. return nil
  1448. }
  1449. func resolveFd(pid uint32, fd uint64) (mntId string, logPath string) {
  1450. info := proc.GetFdInfo(pid, fd)
  1451. if info == nil {
  1452. return
  1453. }
  1454. switch {
  1455. case info.Flags&os.O_WRONLY == 0 && info.Flags&os.O_RDWR == 0,
  1456. !strings.HasPrefix(info.Dest, "/"),
  1457. strings.HasPrefix(info.Dest, "/proc/"),
  1458. strings.HasPrefix(info.Dest, "/dev/"),
  1459. strings.HasPrefix(info.Dest, "/sys/"),
  1460. strings.HasSuffix(info.Dest, "(deleted)"):
  1461. return
  1462. }
  1463. mntId = info.MntId
  1464. if info.Flags&os.O_WRONLY != 0 && strings.HasPrefix(info.Dest, "/var/log/") &&
  1465. !strings.HasPrefix(info.Dest, "/var/log/pods/") &&
  1466. !strings.HasPrefix(info.Dest, "/var/log/containers/") &&
  1467. !strings.HasPrefix(info.Dest, "/var/log/journal/") {
  1468. logPath = info.Dest
  1469. }
  1470. return
  1471. }
  1472. func counter(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1473. return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labelValues...)
  1474. }
  1475. func gauge(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1476. return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...)
  1477. }