container.go 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623
  1. package containers
  2. import (
  3. debugelf "debug/elf"
  4. "fmt"
  5. "os"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/coroot/coroot-node-agent/utils"
  12. . "github.com/coroot/coroot-node-agent/utils/modelse"
  13. "github.com/coroot/coroot-node-agent/cgroup"
  14. "github.com/coroot/coroot-node-agent/common"
  15. "github.com/coroot/coroot-node-agent/ebpftracer"
  16. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  18. "github.com/coroot/coroot-node-agent/flags"
  19. "github.com/coroot/coroot-node-agent/logs"
  20. "github.com/coroot/coroot-node-agent/node"
  21. "github.com/coroot/coroot-node-agent/pinger"
  22. "github.com/coroot/coroot-node-agent/proc"
  23. "github.com/coroot/coroot-node-agent/tracing"
  24. "github.com/coroot/logparser"
  25. "github.com/prometheus/client_golang/prometheus"
  26. klog "github.com/sirupsen/logrus"
  27. "github.com/vishvananda/netns"
  28. "golang.org/x/exp/maps"
  29. "inet.af/netaddr"
  30. )
  31. var (
  32. gcInterval = 10 * time.Second
  33. pingTimeout = 300 * time.Millisecond
  34. )
  35. type ContainerID string
  36. type ContainerNetwork struct {
  37. NetworkID string
  38. }
  39. type ContainerMetadata struct {
  40. name string
  41. labels map[string]string
  42. volumes map[string]string
  43. logPath string
  44. image string
  45. logDecoder logparser.Decoder
  46. hostListens map[string][]netaddr.IPPort
  47. networks map[string]ContainerNetwork
  48. env map[string]string
  49. systemdTriggeredBy string
  50. rootfs string
  51. }
  52. type Delays struct {
  53. cpu time.Duration
  54. disk time.Duration
  55. }
  56. type LogParser struct {
  57. parser *logparser.Parser
  58. stop func()
  59. }
  60. func (p *LogParser) Stop() {
  61. if p.stop != nil {
  62. p.stop()
  63. }
  64. p.parser.Stop()
  65. }
  66. type AddrPair struct {
  67. src netaddr.IPPort
  68. dst netaddr.IPPort
  69. }
  70. type ActiveConnection struct {
  71. Dest netaddr.IPPort
  72. Src netaddr.IPPort
  73. ActualDest netaddr.IPPort
  74. Pid uint32
  75. Fd uint64
  76. Timestamp uint64
  77. Closed time.Time
  78. BytesSent uint64
  79. BytesReceived uint64
  80. http2Parser *l7.Http2Parser
  81. postgresParser *l7.PostgresParser
  82. mysqlParser *l7.MysqlParser
  83. dmParser *l7.DmParser
  84. }
  85. type ActiveAccept struct {
  86. Dest netaddr.IPPort
  87. Src netaddr.IPPort
  88. Pid uint32
  89. Fd uint64
  90. Timestamp uint64
  91. Closed time.Time
  92. BytesSent uint64
  93. BytesReceived uint64
  94. }
  95. type ListenDetails struct {
  96. ClosedAt time.Time
  97. NsIPs []netaddr.IP
  98. }
  99. type PidFd struct {
  100. Pid uint32
  101. Fd uint64
  102. }
  103. type K8sContainer struct {
  104. ns string
  105. podName string
  106. podId string
  107. workload string
  108. containerName string
  109. pid string
  110. }
  111. type ConnectionStats struct {
  112. Count uint64
  113. TotalTime time.Duration
  114. Retransmissions uint64
  115. BytesSent uint64
  116. PerBytesSent uint64
  117. BytesReceived uint64
  118. PerBytesReceived uint64
  119. Src netaddr.IPPort
  120. ConEstTime time.Duration
  121. FirstReadTime uint64
  122. FirstWriteTime uint64
  123. NewReadTime uint64
  124. }
  125. type AcceptStats struct {
  126. BytesSent uint64
  127. BytesReceived uint64
  128. }
  129. type Container struct {
  130. id ContainerID
  131. cgroup *cgroup.Cgroup
  132. metadata *ContainerMetadata
  133. K8sContainer
  134. processes map[uint32]*Process
  135. startedAt time.Time
  136. zombieAt time.Time
  137. restarts int
  138. delays Delays
  139. delaysByPid map[uint32]Delays
  140. delaysLock sync.Mutex
  141. listens map[netaddr.IPPort]map[uint32]*ListenDetails
  142. connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count
  143. connectsFailed map[netaddr.IPPort]int64 // dst -> count
  144. connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  145. connectionsActive map[AddrPair]*ActiveConnection
  146. connectionsByPidFd map[PidFd]*ActiveConnection
  147. acceptsSuccessful map[AddrPair]*AcceptStats
  148. acceptLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  149. acceptsActive map[AddrPair]*ActiveAccept
  150. acceptsByPidFd map[PidFd]*ActiveAccept
  151. l7Stats L7Stats
  152. dnsStats *L7Metrics
  153. oomKills int
  154. pythonThreadLockWaitTime time.Duration
  155. mounts map[string]proc.MountInfo
  156. logParsers map[string]*LogParser
  157. hostConntrack *Conntrack
  158. nsConntrack *Conntrack
  159. lbConntracks []*Conntrack
  160. registry *Registry
  161. lock sync.RWMutex
  162. done chan struct{}
  163. traceMap map[uint64]*tracing.Trace
  164. //instanceID utils.ID
  165. Symbols []debugelf.Symbol
  166. Uprobes []tracer.Uprobe
  167. UprobesMap map[string]tracer.Uprobe
  168. l7EventReady bool
  169. l7Attach bool
  170. // 白名单详情
  171. WhiteSettingInfo WhiteSettingInfo
  172. // 应用详情
  173. AppInfo AppInfo
  174. }
  175. func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) {
  176. netNs, err := proc.GetNetNs(pid)
  177. if err != nil {
  178. return nil, err
  179. }
  180. defer netNs.Close()
  181. c := &Container{
  182. id: id,
  183. cgroup: cg,
  184. metadata: md,
  185. processes: map[uint32]*Process{},
  186. delaysByPid: map[uint32]Delays{},
  187. listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
  188. connectsSuccessful: map[AddrPair]*ConnectionStats{},
  189. connectsFailed: map[netaddr.IPPort]int64{},
  190. connectLastAttempt: map[netaddr.IPPort]time.Time{},
  191. connectionsActive: map[AddrPair]*ActiveConnection{},
  192. connectionsByPidFd: map[PidFd]*ActiveConnection{},
  193. acceptsSuccessful: map[AddrPair]*AcceptStats{},
  194. acceptLastAttempt: map[netaddr.IPPort]time.Time{},
  195. acceptsActive: map[AddrPair]*ActiveAccept{},
  196. acceptsByPidFd: map[PidFd]*ActiveAccept{},
  197. l7Stats: L7Stats{},
  198. dnsStats: &L7Metrics{},
  199. mounts: map[string]proc.MountInfo{},
  200. logParsers: map[string]*LogParser{},
  201. hostConntrack: hostConntrack,
  202. done: make(chan struct{}),
  203. traceMap: make(map[uint64]*tracing.Trace),
  204. registry: registry,
  205. }
  206. for _, n := range md.networks {
  207. if nsHandle := FindNetworkLoadBalancerNs(n.NetworkID); nsHandle.IsOpen() {
  208. if ct, err := NewConntrack(nsHandle); err != nil {
  209. klog.Warningln(err)
  210. } else {
  211. c.lbConntracks = append(c.lbConntracks, ct)
  212. }
  213. _ = nsHandle.Close()
  214. }
  215. }
  216. c.runLogParser("")
  217. // go func() {
  218. // ticker := time.NewTicker(gcInterval)
  219. // defer ticker.Stop()
  220. // for {
  221. // select {
  222. // case <-c.done:
  223. // return
  224. // case t := <-ticker.C:
  225. // c.gc(t)
  226. // }
  227. // }
  228. // }()
  229. return c, nil
  230. }
  231. func (c *Container) Close() {
  232. for _, p := range c.logParsers {
  233. p.Stop()
  234. }
  235. for _, ct := range c.lbConntracks {
  236. _ = ct.Close()
  237. }
  238. if c.nsConntrack != nil {
  239. _ = c.nsConntrack.Close()
  240. }
  241. close(c.done)
  242. }
  243. func (c *Container) Dead(now time.Time) bool {
  244. return !c.zombieAt.IsZero() && now.Sub(c.zombieAt) > gcInterval
  245. }
  246. func (c *Container) Describe(ch chan<- *prometheus.Desc) {
  247. // some fixed metric description is required here to register/unregister the collector correctly
  248. ch <- prometheus.NewDesc("container", "", nil, nil)
  249. }
  250. func (c *Container) Collect(ch chan<- prometheus.Metric) {
  251. c.registry.updateTrafficStatsIfNecessary()
  252. c.lock.RLock()
  253. defer c.lock.RUnlock()
  254. if c.metadata.image != "" || c.metadata.systemdTriggeredBy != "" {
  255. ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image, c.metadata.systemdTriggeredBy)
  256. }
  257. ch <- counter(metrics.Restarts, float64(c.restarts))
  258. if cpu, err := c.cgroup.CpuStat(); err == nil {
  259. if cpu.LimitCores > 0 {
  260. ch <- gauge(metrics.CPULimit, cpu.LimitCores)
  261. }
  262. ch <- counter(metrics.CPUUsage, cpu.UsageSeconds)
  263. ch <- counter(metrics.ThrottledTime, cpu.ThrottledTimeSeconds)
  264. }
  265. if taskstatsClient != nil {
  266. c.updateDelays()
  267. ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second))
  268. ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second))
  269. }
  270. if s, err := c.cgroup.MemoryStat(); err == nil {
  271. ch <- gauge(metrics.MemoryRss, float64(s.RSS))
  272. ch <- gauge(metrics.MemoryCache, float64(s.Cache))
  273. if s.Limit > 0 {
  274. ch <- gauge(metrics.MemoryLimit, float64(s.Limit))
  275. }
  276. }
  277. if c.oomKills > 0 {
  278. ch <- counter(metrics.OOMKills, float64(c.oomKills))
  279. }
  280. if disks, err := node.GetDisks(); err == nil {
  281. ioStat, _ := c.cgroup.IOStat()
  282. for majorMinor, mounts := range c.getMounts() {
  283. dev := disks.GetParentBlockDevice(majorMinor)
  284. if dev == nil {
  285. continue
  286. }
  287. for mountPoint, fsStat := range mounts {
  288. dls := []string{mountPoint, dev.Name, c.metadata.volumes[mountPoint]}
  289. ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...)
  290. ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...)
  291. ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...)
  292. if io, ok := ioStat[majorMinor]; ok {
  293. ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...)
  294. ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...)
  295. ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...)
  296. ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...)
  297. }
  298. }
  299. }
  300. }
  301. for addr, open := range c.getListens() {
  302. ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "")
  303. }
  304. for proxy, addrs := range c.getProxiedListens() {
  305. for addr := range addrs {
  306. ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy)
  307. }
  308. }
  309. strInstanceID := strconv.FormatInt(c.AppInfo.InstanceIdHash.IntVal, 10)
  310. strAppId := strconv.FormatInt(c.AppInfo.AppIdHash.IntVal, 10)
  311. strAppName := c.AppInfo.AppName
  312. for d, stats := range c.connectsSuccessful {
  313. ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), strInstanceID, strAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  314. ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), strInstanceID, strAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  315. if stats.Retransmissions > 0 {
  316. ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), strInstanceID, strAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  317. }
  318. ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  319. ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  320. ch <- counter(metrics.NetBytesSentPer, float64(stats.PerBytesSent), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  321. ch <- counter(metrics.NetBytesReceivedPer, float64(stats.PerBytesReceived), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  322. ch <- counter(metrics.NetDataLatency, float64(stats.FirstReadTime-stats.FirstWriteTime), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  323. ch <- counter(metrics.NetDataDuration, float64(stats.NewReadTime-stats.FirstWriteTime), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  324. ch <- counter(metrics.NetEstTime, float64(stats.ConEstTime), strInstanceID, strAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  325. klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime)
  326. stats.PerBytesReceived = 0
  327. stats.PerBytesSent = 0
  328. }
  329. // for d, stats := range c.acceptsSuccessful {
  330. // ch <- counter(metrics.NetAcceptsSuccessful, float64(0), d.src.String(), d.dst.String())
  331. // ch <- counter(metrics.NetAcceptBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String())
  332. // ch <- counter(metrics.NetAcceptBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String())
  333. // klog.Infof("c.acceptsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d", d.src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived)
  334. // }
  335. for dst, count := range c.connectsFailed {
  336. ch <- counter(metrics.NetConnectionsFailed, float64(count), strInstanceID, strAppId, strAppName, dst.String())
  337. }
  338. connections := map[AddrPair]int{}
  339. for addrPair, conn := range c.connectionsActive {
  340. if !conn.Closed.IsZero() {
  341. continue
  342. }
  343. connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest}]++
  344. }
  345. for d, count := range connections {
  346. ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
  347. }
  348. for source, p := range c.logParsers {
  349. for _, c := range p.parser.GetCounters() {
  350. ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
  351. }
  352. }
  353. appTypes := map[string]struct{}{}
  354. seenJvms := map[string]bool{}
  355. seenDotNetApps := map[string]bool{}
  356. pids := maps.Keys(c.processes)
  357. sort.Slice(pids, func(i, j int) bool {
  358. return pids[i] < pids[j]
  359. })
  360. for _, pid := range pids {
  361. process := c.processes[pid]
  362. cmdline := proc.GetCmdline(pid)
  363. if len(cmdline) == 0 {
  364. continue
  365. }
  366. appType := guessApplicationType(cmdline)
  367. if appType != "" {
  368. appTypes[appType] = struct{}{}
  369. }
  370. if process.isGolangApp {
  371. appTypes["golang"] = struct{}{}
  372. }
  373. switch {
  374. case isJvm(cmdline):
  375. jvm, jMetrics := jvmMetrics(pid)
  376. if len(jMetrics) > 0 && !seenJvms[jvm] {
  377. seenJvms[jvm] = true
  378. for _, m := range jMetrics {
  379. ch <- m
  380. }
  381. }
  382. case process.dotNetMonitor != nil:
  383. appTypes["dotnet"] = struct{}{}
  384. appName := process.dotNetMonitor.AppName()
  385. if !seenDotNetApps[appName] {
  386. seenDotNetApps[appName] = true
  387. process.dotNetMonitor.Collect(ch)
  388. }
  389. }
  390. }
  391. for appType := range appTypes {
  392. ch <- gauge(metrics.ApplicationType, 1, appType)
  393. }
  394. if c.pythonThreadLockWaitTime > 0 {
  395. ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds())
  396. }
  397. if c.dnsStats.Requests != nil {
  398. c.dnsStats.Requests.Collect(ch)
  399. }
  400. if c.dnsStats.Latency != nil {
  401. c.dnsStats.Latency.Collect(ch)
  402. }
  403. c.l7Stats.collect(ch)
  404. if !*flags.DisablePinger {
  405. for ip, rtt := range c.ping() {
  406. ch <- gauge(metrics.NetLatency, rtt, ip.String())
  407. }
  408. }
  409. c.gc(time.Now())
  410. }
  411. func (c *Container) onProcessStart(pid uint32) *Process {
  412. c.lock.Lock()
  413. defer c.lock.Unlock()
  414. stats, err := TaskstatsPID(pid)
  415. if err != nil {
  416. klog.WithError(err).Errorf("Failed onProcessStart [%d]", pid)
  417. return nil
  418. }
  419. c.zombieAt = time.Time{}
  420. p := NewProcess(pid, stats, c.registry.tracer)
  421. if p == nil {
  422. return nil
  423. }
  424. c.processes[pid] = p
  425. if c.startedAt.IsZero() {
  426. c.startedAt = stats.BeginTime
  427. } else {
  428. min := stats.BeginTime
  429. for _, p := range c.processes {
  430. if p.StartedAt.Before(min) {
  431. min = p.StartedAt
  432. }
  433. }
  434. if min.After(c.startedAt) {
  435. c.restarts++
  436. c.startedAt = min
  437. }
  438. }
  439. return p
  440. }
  441. func (c *Container) onProcessExit(pid uint32, oomKill bool) {
  442. c.lock.Lock()
  443. defer c.lock.Unlock()
  444. if p := c.processes[pid]; p != nil {
  445. p.Close()
  446. }
  447. delete(c.processes, pid)
  448. if len(c.processes) == 0 {
  449. c.zombieAt = time.Now()
  450. }
  451. delete(c.delaysByPid, pid)
  452. if oomKill {
  453. c.oomKills++
  454. }
  455. }
  456. func (c *Container) onFileOpen(pid uint32, fd uint64) {
  457. mntId, logPath := resolveFd(pid, fd)
  458. func() {
  459. if mntId == "" {
  460. return
  461. }
  462. c.lock.Lock()
  463. _, ok := c.mounts[mntId]
  464. c.lock.Unlock()
  465. if ok {
  466. return
  467. }
  468. byMountId := proc.GetMountInfo(pid)
  469. if byMountId == nil {
  470. return
  471. }
  472. if mi, ok := byMountId[mntId]; ok {
  473. c.lock.Lock()
  474. c.mounts[mntId] = mi
  475. c.lock.Unlock()
  476. }
  477. }()
  478. if logPath != "" {
  479. c.lock.Lock()
  480. c.runLogParser(logPath)
  481. c.lock.Unlock()
  482. }
  483. }
  484. // set
  485. func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
  486. klog.Infof("TCP listen open pid=%d id=%s addr=%s", pid, c.id, addr)
  487. if common.PortFilter.ShouldBeSkipped(addr.Port()) {
  488. return
  489. }
  490. if !safe {
  491. c.lock.Lock()
  492. defer c.lock.Unlock()
  493. }
  494. if _, ok := c.listens[addr]; !ok {
  495. c.listens[addr] = map[uint32]*ListenDetails{}
  496. }
  497. details := &ListenDetails{}
  498. c.listens[addr][pid] = details
  499. if addr.IP().IsUnspecified() {
  500. ns, err := proc.GetNetNs(pid)
  501. if err != nil {
  502. if !common.IsNotExist(err) {
  503. klog.Warningln(err)
  504. }
  505. return
  506. }
  507. defer ns.Close()
  508. ips, err := proc.GetNsIps(ns)
  509. if err != nil {
  510. klog.Warningln(err)
  511. return
  512. }
  513. klog.Infof("got IPs %s for %s", ips, ns.UniqueId())
  514. details.NsIPs = ips
  515. }
  516. }
  517. func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
  518. klog.Infof("TCP listen close pid=%d id=%s addr=%s", pid, c.id, addr)
  519. c.lock.Lock()
  520. defer c.lock.Unlock()
  521. if _, byAddr := c.listens[addr]; byAddr {
  522. if _, byPid := c.listens[addr][pid]; byPid {
  523. if details := c.listens[addr][pid]; details != nil {
  524. details.ClosedAt = time.Now()
  525. }
  526. }
  527. }
  528. }
  529. func (c *Container) onAcceptOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  530. klog.Infof("accept pid=%d id=%s dstaddr=%s srcaddr=%s", pid, c.id, dst.IP(), src.IP())
  531. // if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  532. // return
  533. // }
  534. p := c.processes[pid]
  535. if p == nil {
  536. return
  537. }
  538. if dst.IP().IsLoopback() && !p.isHostNs() {
  539. return
  540. }
  541. // actualDst, err := c.getActualDestination(p, src, dst)
  542. // if err != nil {
  543. // if !common.IsNotExist(err) {
  544. // klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  545. // }
  546. // return
  547. // }
  548. // switch {
  549. // case actualDst == nil:
  550. // actualDst = &dst
  551. // case actualDst.IP().IsLoopback() && !p.isHostNs():
  552. // return
  553. // }
  554. // if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  555. // return
  556. // }
  557. c.lock.Lock()
  558. defer c.lock.Unlock()
  559. if !failed {
  560. key := AddrPair{src: dst, dst: src}
  561. stats := c.acceptsSuccessful[key]
  562. if stats == nil {
  563. stats = &AcceptStats{}
  564. c.acceptsSuccessful[key] = stats
  565. }
  566. acceptCon := &ActiveAccept{
  567. Dest: src,
  568. Src: dst,
  569. Pid: pid,
  570. Fd: fd,
  571. Timestamp: timestamp,
  572. }
  573. c.acceptsActive[AddrPair{src: dst, dst: src}] = acceptCon
  574. c.acceptsByPidFd[PidFd{Pid: pid, Fd: fd}] = acceptCon
  575. }
  576. c.acceptLastAttempt[dst] = time.Now()
  577. }
  578. func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  579. if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  580. return
  581. }
  582. p := c.processes[pid]
  583. if p == nil {
  584. return
  585. }
  586. if dst.IP().IsLoopback() && !p.isHostNs() {
  587. return
  588. }
  589. actualDst, err := c.getActualDestination(p, src, dst)
  590. if err != nil {
  591. if !common.IsNotExist(err) {
  592. klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  593. }
  594. return
  595. }
  596. switch {
  597. case actualDst == nil:
  598. actualDst = &dst
  599. case actualDst.IP().IsLoopback() && !p.isHostNs():
  600. return
  601. }
  602. if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  603. return
  604. }
  605. c.lock.Lock()
  606. defer c.lock.Unlock()
  607. if failed {
  608. c.connectsFailed[dst]++
  609. } else {
  610. key := AddrPair{src: dst, dst: *actualDst}
  611. stats := c.connectsSuccessful[key]
  612. if stats == nil {
  613. stats = &ConnectionStats{}
  614. c.connectsSuccessful[key] = stats
  615. }
  616. stats.Count++
  617. stats.TotalTime += duration
  618. stats.Src = src
  619. stats.ConEstTime = duration
  620. connection := &ActiveConnection{
  621. Dest: dst,
  622. Src: src,
  623. ActualDest: *actualDst,
  624. Pid: pid,
  625. Fd: fd,
  626. Timestamp: timestamp,
  627. }
  628. c.connectionsActive[AddrPair{src: src, dst: dst}] = connection
  629. c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection
  630. }
  631. c.connectLastAttempt[dst] = time.Now()
  632. }
  633. func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) {
  634. if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil {
  635. return actualDst, nil
  636. }
  637. for _, lb := range c.lbConntracks {
  638. if actualDst := lb.GetActualDestination(src, dst); actualDst != nil {
  639. return actualDst, nil
  640. }
  641. }
  642. actualDst := c.hostConntrack.GetActualDestination(src, dst)
  643. if actualDst != nil {
  644. return actualDst, nil
  645. }
  646. if !p.isHostNs() {
  647. if c.nsConntrack == nil {
  648. netNs, err := proc.GetNetNs(p.Pid)
  649. if err != nil {
  650. return nil, err
  651. }
  652. defer netNs.Close()
  653. c.nsConntrack, err = NewConntrack(netNs)
  654. if err != nil {
  655. return nil, err
  656. }
  657. }
  658. return c.nsConntrack.GetActualDestination(src, dst), nil
  659. }
  660. return nil, nil
  661. }
  662. func (c *Container) onConnectionClose(e ebpftracer.Event) {
  663. c.lock.Lock()
  664. conn := c.connectionsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  665. c.lock.Unlock()
  666. if conn != nil {
  667. if conn.Closed.IsZero() {
  668. if e.TrafficStats != nil {
  669. c.lock.Lock()
  670. c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived, e.FirstReadTime, e.FirstWriteTime, e.NewReadTime)
  671. c.lock.Unlock()
  672. }
  673. conn.Closed = time.Now()
  674. }
  675. }
  676. }
  677. func (c *Container) onAcceptClose(e ebpftracer.Event) {
  678. c.lock.Lock()
  679. conn := c.acceptsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  680. c.lock.Unlock()
  681. if conn != nil {
  682. if conn.Closed.IsZero() {
  683. if e.TrafficStats != nil {
  684. c.lock.Lock()
  685. c.updateAcceptTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived)
  686. c.lock.Unlock()
  687. }
  688. conn.Closed = time.Now()
  689. }
  690. }
  691. }
  692. func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) {
  693. if u == nil {
  694. return
  695. }
  696. c.lock.Lock()
  697. defer c.lock.Unlock()
  698. c.updateConnectionTrafficStats(c.connectionsByPidFd[PidFd{Pid: u.Pid, Fd: u.FD}], u.BytesSent, u.BytesReceived, 0, 0, 0)
  699. }
  700. func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, received, firstreadtime, firstwritetime, newreadtime uint64) {
  701. if ac == nil {
  702. return
  703. }
  704. key := AddrPair{src: ac.Dest, dst: ac.ActualDest}
  705. stats := c.connectsSuccessful[key]
  706. if stats == nil {
  707. stats = &ConnectionStats{}
  708. c.connectsSuccessful[key] = stats
  709. }
  710. if sent > ac.BytesSent {
  711. stats.BytesSent += sent - ac.BytesSent
  712. stats.PerBytesSent = sent - ac.BytesSent
  713. }
  714. if received > ac.BytesReceived {
  715. stats.BytesReceived += received - ac.BytesReceived
  716. stats.PerBytesReceived = received - ac.BytesReceived
  717. }
  718. if firstreadtime != 0 && firstwritetime != 0 && newreadtime != 0 {
  719. stats.FirstReadTime = firstreadtime
  720. stats.FirstWriteTime = firstwritetime
  721. stats.NewReadTime = newreadtime
  722. }
  723. ac.BytesSent = sent
  724. ac.BytesReceived = received
  725. }
  726. func (c *Container) updateAcceptTrafficStats(ac *ActiveAccept, sent, received uint64) {
  727. if ac == nil {
  728. return
  729. }
  730. klog.Infoln("TCP onConnectionClose5", ac.BytesSent, ac.BytesReceived, ac)
  731. key := AddrPair{src: ac.Src, dst: ac.Dest}
  732. stats := c.acceptsSuccessful[key]
  733. if stats == nil {
  734. stats = &AcceptStats{}
  735. c.acceptsSuccessful[key] = stats
  736. }
  737. if sent > ac.BytesSent {
  738. stats.BytesSent += sent - ac.BytesSent
  739. }
  740. if received > ac.BytesReceived {
  741. stats.BytesReceived += received - ac.BytesReceived
  742. }
  743. ac.BytesSent = sent
  744. ac.BytesReceived = received
  745. }
  746. func (c *Container) onDNSRequest(r *l7.RequestData) (map[netaddr.IP]string, string, string) {
  747. status := r.Status.DNS()
  748. if status == "" {
  749. return nil, "", ""
  750. }
  751. t, fqdn, ips := l7.ParseDns(r.Payload)
  752. if t == "" {
  753. return nil, "", ""
  754. }
  755. if c.dnsStats.Requests == nil {
  756. dnsReq := L7Requests[l7.ProtocolDNS]
  757. c.dnsStats.Requests = prometheus.NewCounterVec(
  758. prometheus.CounterOpts{Name: dnsReq.Name, Help: dnsReq.Help},
  759. []string{"request_type", "domain", "status"},
  760. )
  761. }
  762. if m, _ := c.dnsStats.Requests.GetMetricWithLabelValues(t, fqdn, status); m != nil {
  763. m.Inc()
  764. }
  765. if r.Duration != 0 {
  766. if c.dnsStats.Latency == nil {
  767. dnsLatency := L7Latency[l7.ProtocolDNS]
  768. c.dnsStats.Latency = prometheus.NewHistogram(prometheus.HistogramOpts{Name: dnsLatency.Name, Help: dnsLatency.Help})
  769. }
  770. c.dnsStats.Latency.Observe(r.Duration.Seconds())
  771. }
  772. ip2fqdn := map[netaddr.IP]string{}
  773. if fqdn != "" {
  774. for _, ip := range ips {
  775. ip2fqdn[ip] = fqdn
  776. }
  777. }
  778. return ip2fqdn, t, fqdn
  779. }
  780. func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  781. c.lock.Lock()
  782. defer c.lock.Unlock()
  783. if r.Protocol == l7.ProtocolDNS {
  784. //return c.onDNSRequest(r)
  785. }
  786. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  787. if conn == nil {
  788. return nil
  789. }
  790. if timestamp != 0 && conn.Timestamp != timestamp {
  791. return nil
  792. }
  793. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  794. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  795. switch r.Protocol {
  796. case l7.ProtocolHTTP:
  797. stats.observe(r.Status.Http(), "", r.Duration)
  798. method, path := l7.ParseHttp(r.Payload)
  799. trace.HttpRequest(method, path, r.Status, r.Duration)
  800. case l7.ProtocolHTTP2:
  801. if conn.http2Parser == nil {
  802. conn.http2Parser = l7.NewHttp2Parser()
  803. }
  804. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  805. for _, req := range requests {
  806. stats.observe(req.Status.Http(), "", req.Duration)
  807. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  808. }
  809. case l7.ProtocolPostgres:
  810. if r.Method != l7.MethodStatementClose {
  811. stats.observe(r.Status.String(), "", r.Duration)
  812. }
  813. if conn.postgresParser == nil {
  814. conn.postgresParser = l7.NewPostgresParser()
  815. }
  816. query := conn.postgresParser.Parse(r.Payload)
  817. trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  818. case l7.ProtocolMysql:
  819. if r.Method != l7.MethodStatementClose {
  820. stats.observe(r.Status.String(), "", r.Duration)
  821. }
  822. if conn.mysqlParser == nil {
  823. conn.mysqlParser = l7.NewMysqlParser()
  824. }
  825. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  826. trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  827. case l7.ProtocolMemcached:
  828. stats.observe(r.Status.String(), "", r.Duration)
  829. cmd, items := l7.ParseMemcached(r.Payload)
  830. trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  831. case l7.ProtocolRedis:
  832. stats.observe(r.Status.String(), "", r.Duration)
  833. cmd, args := l7.ParseRedis(r.Payload)
  834. trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  835. case l7.ProtocolMongo:
  836. stats.observe(r.Status.String(), "", r.Duration)
  837. query := l7.ParseMongo(r.Payload)
  838. trace.MongoQuery(query, r.Status.Error(), r.Duration)
  839. case l7.ProtocolKafka, l7.ProtocolCassandra:
  840. stats.observe(r.Status.String(), "", r.Duration)
  841. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  842. stats.observe(r.Status.String(), r.Method.String(), 0)
  843. case l7.ProtocolDubbo2:
  844. stats.observe(r.Status.String(), "", r.Duration)
  845. }
  846. return nil
  847. }
  848. func (c *Container) onRetransmission(srcDst AddrPair) bool {
  849. c.lock.Lock()
  850. defer c.lock.Unlock()
  851. conn, ok := c.connectionsActive[srcDst]
  852. if !ok {
  853. return false
  854. }
  855. key := AddrPair{src: srcDst.dst, dst: conn.ActualDest}
  856. stats := c.connectsSuccessful[key]
  857. if stats == nil {
  858. stats = &ConnectionStats{}
  859. c.connectsSuccessful[key] = stats
  860. }
  861. stats.Retransmissions++
  862. return true
  863. }
  864. func (c *Container) updateDelays() {
  865. c.delaysLock.Lock()
  866. defer c.delaysLock.Unlock()
  867. for pid := range c.processes {
  868. stats, err := TaskstatsTGID(pid)
  869. if err != nil {
  870. continue
  871. }
  872. d := c.delaysByPid[pid]
  873. c.delays.cpu += stats.CPUDelay - d.cpu
  874. c.delays.disk += stats.BlockIODelay - d.disk
  875. d.cpu = stats.CPUDelay
  876. d.disk = stats.BlockIODelay
  877. c.delaysByPid[pid] = d
  878. }
  879. }
  880. func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
  881. if len(c.mounts) == 0 {
  882. return nil
  883. }
  884. res := map[string]map[string]*proc.FSStat{}
  885. for _, mi := range c.mounts {
  886. var stat *proc.FSStat
  887. for pid := range c.processes {
  888. s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
  889. if err == nil {
  890. stat = &s
  891. break
  892. }
  893. }
  894. if stat == nil {
  895. continue
  896. }
  897. if _, ok := res[mi.MajorMinor]; !ok {
  898. res[mi.MajorMinor] = map[string]*proc.FSStat{}
  899. }
  900. res[mi.MajorMinor][mi.MountPoint] = stat
  901. }
  902. return res
  903. }
  904. func (c *Container) getListens() map[netaddr.IPPort]int {
  905. res := map[netaddr.IPPort]int{}
  906. for addr, byPid := range c.listens {
  907. open := 0
  908. isHostNs := false
  909. ips := map[netaddr.IP]bool{}
  910. for pid, details := range byPid {
  911. p := c.processes[pid]
  912. if p == nil {
  913. continue
  914. }
  915. if p.isHostNs() {
  916. isHostNs = true
  917. }
  918. if details.ClosedAt.IsZero() {
  919. open = 1
  920. }
  921. for _, ip := range details.NsIPs {
  922. ips[ip] = true
  923. }
  924. }
  925. if !addr.IP().IsUnspecified() {
  926. ips = map[netaddr.IP]bool{addr.IP(): true}
  927. }
  928. for ip := range ips {
  929. if ip.IsLoopback() && !isHostNs {
  930. continue
  931. }
  932. res[netaddr.IPPortFrom(ip, addr.Port())] = open
  933. }
  934. }
  935. return res
  936. }
  937. func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {
  938. if len(c.metadata.hostListens) == 0 {
  939. return nil
  940. }
  941. hasUnspecified := false
  942. for _, addrs := range c.metadata.hostListens {
  943. for _, addr := range addrs {
  944. if addr.IP().IsUnspecified() {
  945. hasUnspecified = true
  946. break
  947. }
  948. }
  949. }
  950. var hostIps []netaddr.IP
  951. if hasUnspecified {
  952. if ns, err := proc.GetHostNetNs(); err != nil {
  953. klog.Warningln(err)
  954. } else {
  955. ips, err := proc.GetNsIps(ns)
  956. _ = ns.Close()
  957. if err != nil {
  958. klog.Warningln(err)
  959. } else {
  960. hostIps = ips
  961. }
  962. }
  963. }
  964. res := map[string]map[netaddr.IPPort]struct{}{}
  965. for proxy, addrs := range c.metadata.hostListens {
  966. res[proxy] = map[netaddr.IPPort]struct{}{}
  967. for _, addr := range addrs {
  968. if addr.IP().IsUnspecified() {
  969. for _, ip := range hostIps {
  970. if addr.IP().Is4() && ip.Is4() || addr.IP().Is6() && ip.Is6() {
  971. res[proxy][netaddr.IPPortFrom(ip, addr.Port())] = struct{}{}
  972. }
  973. }
  974. } else {
  975. res[proxy][addr] = struct{}{}
  976. }
  977. }
  978. }
  979. return res
  980. }
  981. func (c *Container) ping() map[netaddr.IP]float64 {
  982. netNs := netns.None()
  983. for pid := range c.processes {
  984. if pid == agentPid {
  985. netNs = selfNetNs
  986. break
  987. }
  988. ns, err := proc.GetNetNs(pid)
  989. if err != nil {
  990. if !common.IsNotExist(err) {
  991. klog.Warningln(err)
  992. }
  993. continue
  994. }
  995. netNs = ns
  996. defer netNs.Close()
  997. break
  998. }
  999. if !netNs.IsOpen() {
  1000. return nil
  1001. }
  1002. ips := map[netaddr.IP]struct{}{}
  1003. for d := range c.connectsSuccessful {
  1004. ips[d.dst.IP()] = struct{}{}
  1005. }
  1006. for dst := range c.connectsFailed {
  1007. ips[dst.IP()] = struct{}{}
  1008. }
  1009. if len(ips) == 0 {
  1010. return nil
  1011. }
  1012. targets := make([]netaddr.IP, 0, len(ips))
  1013. for ip := range ips {
  1014. if ip.IsLoopback() {
  1015. continue
  1016. }
  1017. if !ip.Is4() { // pinger doesn't support IPv6 yet
  1018. continue
  1019. }
  1020. targets = append(targets, ip)
  1021. }
  1022. rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout)
  1023. if err != nil {
  1024. klog.Warningln(err)
  1025. return nil
  1026. }
  1027. return rtt
  1028. }
  1029. func (c *Container) runLogParser(logPath string) {
  1030. if *flags.DisableLogParsing {
  1031. return
  1032. }
  1033. containerId := string(c.id)
  1034. if logPath != "" {
  1035. if c.logParsers[logPath] != nil {
  1036. return
  1037. }
  1038. ch := make(chan logparser.LogEntry)
  1039. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1040. reader, err := logs.NewTailReader(proc.HostPath(logPath), ch)
  1041. if err != nil {
  1042. klog.Warningln(err)
  1043. parser.Stop()
  1044. return
  1045. }
  1046. klog.Infoln("started varlog logparser", "cg", c.cgroup.Id, "log", logPath)
  1047. c.logParsers[logPath] = &LogParser{parser: parser, stop: reader.Stop}
  1048. return
  1049. }
  1050. switch c.cgroup.ContainerType {
  1051. case cgroup.ContainerTypeSystemdService:
  1052. ch := make(chan logparser.LogEntry)
  1053. if err := JournaldSubscribe(c.cgroup, ch); err != nil {
  1054. klog.Warningln(err)
  1055. return
  1056. }
  1057. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1058. stop := func() {
  1059. JournaldUnsubscribe(c.cgroup)
  1060. }
  1061. klog.Infoln("started journald logparser", "cg", c.cgroup.Id)
  1062. c.logParsers["journald"] = &LogParser{parser: parser, stop: stop}
  1063. case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd, cgroup.ContainerTypeCrio:
  1064. if c.metadata.logPath == "" {
  1065. return
  1066. }
  1067. if parser := c.logParsers["stdout/stderr"]; parser != nil {
  1068. parser.Stop()
  1069. delete(c.logParsers, "stdout/stderr")
  1070. }
  1071. ch := make(chan logparser.LogEntry)
  1072. parser := logparser.NewParser(ch, c.metadata.logDecoder, logs.OtelLogEmitter(containerId))
  1073. reader, err := logs.NewTailReader(proc.HostPath(c.metadata.logPath), ch)
  1074. if err != nil {
  1075. klog.Warningln(err)
  1076. parser.Stop()
  1077. return
  1078. }
  1079. klog.Infoln("started container logparser", "cg", c.cgroup.Id)
  1080. c.logParsers["stdout/stderr"] = &LogParser{parser: parser, stop: reader.Stop}
  1081. }
  1082. }
  1083. func (c *Container) gc(now time.Time) {
  1084. // c.lock.Lock()
  1085. // defer c.lock.Unlock()
  1086. established := map[AddrPair]struct{}{}
  1087. establishedDst := map[netaddr.IPPort]struct{}{}
  1088. listens := map[netaddr.IPPort]string{}
  1089. seenNamespaces := map[string]bool{}
  1090. fdMap := map[uint64]struct{}{}
  1091. for _, p := range c.processes {
  1092. if seenNamespaces[p.NetNsId()] {
  1093. continue
  1094. }
  1095. sockets, err := proc.GetSockets(p.Pid)
  1096. if err != nil {
  1097. continue
  1098. }
  1099. fds, err := proc.ReadFds(p.Pid)
  1100. if err == nil {
  1101. for _, fd := range fds {
  1102. fdMap[fd.Fd] = struct{}{}
  1103. }
  1104. }
  1105. for _, s := range sockets {
  1106. if s.Listen {
  1107. listens[s.SAddr] = s.Inode
  1108. } else {
  1109. established[AddrPair{src: s.SAddr, dst: s.DAddr}] = struct{}{}
  1110. establishedDst[s.DAddr] = struct{}{}
  1111. }
  1112. }
  1113. seenNamespaces[p.NetNsId()] = true
  1114. }
  1115. c.revalidateListens(now, listens)
  1116. for srcDst, conn := range c.connectionsActive {
  1117. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1118. if _, ok := established[srcDst]; !ok {
  1119. delete(c.connectionsActive, srcDst)
  1120. if conn == c.connectionsByPidFd[pidFd] {
  1121. delete(c.connectionsByPidFd, pidFd)
  1122. }
  1123. continue
  1124. }
  1125. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1126. delete(c.connectionsActive, srcDst)
  1127. if conn == c.connectionsByPidFd[pidFd] {
  1128. delete(c.connectionsByPidFd, pidFd)
  1129. }
  1130. }
  1131. }
  1132. for srcDst, conn := range c.acceptsActive {
  1133. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1134. if _, ok := established[srcDst]; !ok {
  1135. delete(c.acceptsActive, srcDst)
  1136. if conn == c.acceptsByPidFd[pidFd] {
  1137. delete(c.acceptsByPidFd, pidFd)
  1138. }
  1139. continue
  1140. }
  1141. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1142. delete(c.acceptsActive, srcDst)
  1143. if conn == c.acceptsByPidFd[pidFd] {
  1144. delete(c.acceptsByPidFd, pidFd)
  1145. }
  1146. }
  1147. }
  1148. for _, conn := range c.connectionsByPidFd {
  1149. if _, ok := fdMap[conn.Fd]; !ok {
  1150. delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1151. }
  1152. }
  1153. for _, conn := range c.acceptsByPidFd {
  1154. if _, ok := fdMap[conn.Fd]; !ok {
  1155. delete(c.acceptsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1156. }
  1157. }
  1158. for dst, at := range c.connectLastAttempt {
  1159. _, active := establishedDst[dst]
  1160. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1161. delete(c.connectLastAttempt, dst)
  1162. delete(c.connectsFailed, dst)
  1163. for d := range c.connectsSuccessful {
  1164. if d.src == dst {
  1165. delete(c.connectsSuccessful, d)
  1166. }
  1167. }
  1168. c.l7Stats.delete(dst)
  1169. }
  1170. }
  1171. for dst, at := range c.acceptLastAttempt {
  1172. _, active := establishedDst[dst]
  1173. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1174. delete(c.acceptLastAttempt, dst)
  1175. for d := range c.acceptsSuccessful {
  1176. if d.src == dst {
  1177. delete(c.acceptsSuccessful, d)
  1178. }
  1179. }
  1180. c.l7Stats.delete(dst)
  1181. }
  1182. }
  1183. }
  1184. func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.IPPort]string) {
  1185. for addr, byPid := range c.listens {
  1186. if _, open := actualListens[addr]; open {
  1187. continue
  1188. }
  1189. klog.Warningln("deleting the outdated listen:", addr)
  1190. for _, details := range byPid {
  1191. if details.ClosedAt.IsZero() {
  1192. details.ClosedAt = now
  1193. }
  1194. }
  1195. }
  1196. missingListens := map[netaddr.IPPort]string{}
  1197. for addr, inode := range actualListens {
  1198. byPids, found := c.listens[addr]
  1199. if !found {
  1200. missingListens[addr] = inode
  1201. continue
  1202. }
  1203. open := false
  1204. for _, details := range byPids {
  1205. if details.ClosedAt.IsZero() {
  1206. open = true
  1207. break
  1208. }
  1209. }
  1210. if !open {
  1211. missingListens[addr] = inode
  1212. }
  1213. }
  1214. if len(missingListens) > 0 {
  1215. inodeToPid := map[string]uint32{}
  1216. for pid := range c.processes {
  1217. fds, err := proc.ReadFds(pid)
  1218. if err != nil {
  1219. klog.Warningln(err)
  1220. continue
  1221. }
  1222. for _, fd := range fds {
  1223. if fd.SocketInode != "" {
  1224. inodeToPid[fd.SocketInode] = pid
  1225. }
  1226. }
  1227. }
  1228. for addr, inode := range missingListens {
  1229. pid, found := inodeToPid[inode]
  1230. if !found {
  1231. continue
  1232. }
  1233. //klog.Warningln("missing listen found:", addr, pid)
  1234. c.onListenOpen(pid, addr, true)
  1235. }
  1236. }
  1237. for addr, pids := range c.listens {
  1238. for pid, details := range pids {
  1239. if !details.ClosedAt.IsZero() && now.Sub(details.ClosedAt) > gcInterval {
  1240. delete(c.listens[addr], pid)
  1241. }
  1242. }
  1243. if len(c.listens[addr]) == 0 {
  1244. delete(c.listens, addr)
  1245. }
  1246. }
  1247. }
  1248. func (c *Container) attachUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1249. klog.Infoln("[attach] attachUprobes start")
  1250. if tracer.DisableL7Tracing() {
  1251. return nil
  1252. }
  1253. codeType := c.GetCodeTypeFromCache(pid)
  1254. if codeType.IsUnknownCode() {
  1255. return nil
  1256. }
  1257. var err error
  1258. switch codeType {
  1259. case CodeTypeJava:
  1260. err = c.attachJVMUprobes(tracer, pid)
  1261. case CodeTypeJavaAot:
  1262. err = c.attachJavaAotUprobes(tracer, pid)
  1263. case CodeTypeGo:
  1264. err = c.attachTlsUprobes(tracer, pid)
  1265. case CodeTypeNetCoreAot:
  1266. err = c.attachNetCoreUprobes(tracer, pid)
  1267. }
  1268. if err != nil {
  1269. c.DetachUprobes(pid, ERROR_DETACH)
  1270. return err
  1271. }
  1272. c.l7AttachSuccess()
  1273. return nil
  1274. }
  1275. func (c *Container) GetCodeTypeFromCache(pid uint32) CodeType {
  1276. p := c.processes[pid]
  1277. if p == nil {
  1278. return CodeTypeUnknown
  1279. }
  1280. if p.codeType.IsWaitCheck() {
  1281. p.codeType = GetExeType(pid, c.getRootfs())
  1282. }
  1283. return p.codeType
  1284. }
  1285. func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1286. p := c.processes[pid]
  1287. if p == nil {
  1288. return nil
  1289. }
  1290. if !p.openSslUprobesChecked {
  1291. sslProbes, err := tracer.AttachOpenSslUprobes(pid)
  1292. if err != nil {
  1293. return err
  1294. }
  1295. p.uprobes = append(p.uprobes, sslProbes...)
  1296. p.openSslUprobesChecked = true
  1297. }
  1298. if !p.goTlsUprobesChecked {
  1299. codeType := c.GetCodeTypeFromCache(pid)
  1300. goProbes, err := tracer.AttachGoTlsUprobes(pid, &c.AppInfo, uint16(codeType))
  1301. if err != nil {
  1302. return err
  1303. }
  1304. p.uprobes = append(p.uprobes, goProbes...)
  1305. p.goTlsUprobesChecked = true
  1306. }
  1307. return nil
  1308. }
  1309. func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1310. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1311. return nil
  1312. }
  1313. p := c.processes[pid]
  1314. if p == nil {
  1315. return nil
  1316. }
  1317. codeType := c.GetCodeTypeFromCache(pid)
  1318. if !p.jvmAttachOnce {
  1319. p.jvmAttachOnce = true
  1320. rootfs := c.getRootfs()
  1321. tracer.InitKProcInfo(pid, &c.AppInfo)
  1322. // TODO java Aot
  1323. if codeType.IsJvmCode() {
  1324. // check version
  1325. libjavaso, err := utils.GetSoPath(pid, "libjava.so", rootfs)
  1326. if err != nil {
  1327. klog.WithError(err).Errorf("[attach] Failed get so path")
  1328. return err
  1329. }
  1330. v, err := ebpftracer.GetJvmVersion(libjavaso)
  1331. if err != nil {
  1332. klog.WithError(err).Errorf("[attach] Failed get Java version")
  1333. return err
  1334. }
  1335. c.AppInfo.Version = v
  1336. major, minor, patch, err := ebpftracer.ParseVersion(v)
  1337. klog.Infof("[attach] version: %s (Major: %d, Minor: %d, Patch: %d)", v, major, minor, patch)
  1338. if major != 1 || minor != 8 {
  1339. return fmt.Errorf("[attach] Unsupported Java version")
  1340. }
  1341. }
  1342. libNioProbes, err := tracer.AttachJavaNioReadUprobes(pid, codeType, rootfs)
  1343. if err != nil {
  1344. klog.Error(err)
  1345. return err
  1346. }
  1347. p.uprobes = append(p.uprobes, libNioProbes...)
  1348. libNetProbes, err := tracer.AttachJavaNetWriteUprobes(pid, rootfs)
  1349. if err != nil {
  1350. klog.Error(err)
  1351. return err
  1352. }
  1353. p.uprobes = append(p.uprobes, libNetProbes...)
  1354. //p.jvmUprobesChecked = true
  1355. } else {
  1356. klog.Infof("[attach] %s-%d already attach", codeType.String(), pid)
  1357. }
  1358. return nil
  1359. }
  1360. func (c *Container) attachJavaAotUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1361. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1362. return nil
  1363. }
  1364. p := c.processes[pid]
  1365. if p == nil {
  1366. return nil
  1367. }
  1368. codeType := c.GetCodeTypeFromCache(pid)
  1369. if !p.jvmUprobesChecked {
  1370. p.jvmUprobesChecked = true
  1371. rootfs := c.getRootfs()
  1372. tracer.InitKProcInfo(pid, &c.AppInfo)
  1373. // // TODO java Aot
  1374. // if codeType.IsJavaAotCode() {
  1375. // // check version
  1376. // libjavaso, err := utils.GetSoPath(pid, "libjava.so", rootfs)
  1377. // if err != nil {
  1378. // klog.WithError(err).Errorf("[attach] Failed get so path")
  1379. // return err
  1380. // }
  1381. // v, err := ebpftracer.GetJvmVersion(libjavaso)
  1382. // if err != nil {
  1383. // klog.WithError(err).Errorf("[attach] Failed get Java version")
  1384. // return err
  1385. // }
  1386. // c.AppInfo.Version = v
  1387. // major, minor, patch, err := ebpftracer.ParseVersion(v)
  1388. // klog.Infof("[attach] version: %s (Major: %d, Minor: %d, Patch: %d)", v, major, minor, patch)
  1389. // if major != 1 || minor != 8 {
  1390. // return fmt.Errorf("[attach] Unsupported Java version")
  1391. // }
  1392. // }
  1393. libNioProbes, err := tracer.AttachJavaAotNioReadUprobes(pid, codeType, rootfs)
  1394. if err != nil {
  1395. klog.Error(err)
  1396. return err
  1397. }
  1398. p.uprobes = append(p.uprobes, libNioProbes...)
  1399. libNetProbes, err := tracer.AttachJavaAotNetWriteUprobes(pid, rootfs)
  1400. if err != nil {
  1401. klog.Error(err)
  1402. return err
  1403. }
  1404. p.uprobes = append(p.uprobes, libNetProbes...)
  1405. //p.jvmUprobesChecked = true
  1406. } else {
  1407. klog.Infof("[attach] %s-%d already attach", codeType.String(), pid)
  1408. }
  1409. return nil
  1410. }
  1411. func (c *Container) errorClose(pid uint32, closeType int) {
  1412. p := c.processes[pid]
  1413. if p != nil {
  1414. p.DynamicClose(closeType)
  1415. }
  1416. }
  1417. func (c *Container) attachNetCoreUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1418. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1419. return nil
  1420. }
  1421. p := c.processes[pid]
  1422. if p == nil {
  1423. return nil
  1424. }
  1425. if !p.jvmAttachOnce {
  1426. p.jvmAttachOnce = true
  1427. //codeType := c.GetCodeTypeFromCache(pid)
  1428. tracer.InitKProcInfo(pid, &c.AppInfo)
  1429. p.uprobes = append(p.uprobes, tracer.AttachNetCoreNetThreadUprobes(pid)...)
  1430. readProbes, err := tracer.AttachNetCoreNetReadUprobes(pid)
  1431. if err != nil {
  1432. klog.Error(err)
  1433. return err
  1434. }
  1435. p.uprobes = append(p.uprobes, readProbes...)
  1436. WriteProbes, err := tracer.AttachNetCoreNetWriteUprobes(pid)
  1437. if err != nil {
  1438. klog.Error(err)
  1439. return err
  1440. }
  1441. p.uprobes = append(p.uprobes, WriteProbes...)
  1442. }
  1443. return nil
  1444. }
  1445. func resolveFd(pid uint32, fd uint64) (mntId string, logPath string) {
  1446. info := proc.GetFdInfo(pid, fd)
  1447. if info == nil {
  1448. return
  1449. }
  1450. switch {
  1451. case info.Flags&os.O_WRONLY == 0 && info.Flags&os.O_RDWR == 0,
  1452. !strings.HasPrefix(info.Dest, "/"),
  1453. strings.HasPrefix(info.Dest, "/proc/"),
  1454. strings.HasPrefix(info.Dest, "/dev/"),
  1455. strings.HasPrefix(info.Dest, "/sys/"),
  1456. strings.HasSuffix(info.Dest, "(deleted)"):
  1457. return
  1458. }
  1459. mntId = info.MntId
  1460. if info.Flags&os.O_WRONLY != 0 && strings.HasPrefix(info.Dest, "/var/log/") &&
  1461. !strings.HasPrefix(info.Dest, "/var/log/pods/") &&
  1462. !strings.HasPrefix(info.Dest, "/var/log/containers/") &&
  1463. !strings.HasPrefix(info.Dest, "/var/log/journal/") {
  1464. logPath = info.Dest
  1465. }
  1466. return
  1467. }
  1468. func counter(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1469. return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labelValues...)
  1470. }
  1471. func gauge(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1472. return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...)
  1473. }