container.go 46 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687
  1. package containers
  2. import (
  3. debugelf "debug/elf"
  4. "fmt"
  5. "os"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/coroot/coroot-node-agent/utils"
  12. . "github.com/coroot/coroot-node-agent/utils/modelse"
  13. "github.com/coroot/coroot-node-agent/cgroup"
  14. "github.com/coroot/coroot-node-agent/common"
  15. "github.com/coroot/coroot-node-agent/ebpftracer"
  16. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  18. "github.com/coroot/coroot-node-agent/flags"
  19. "github.com/coroot/coroot-node-agent/logs"
  20. "github.com/coroot/coroot-node-agent/node"
  21. "github.com/coroot/coroot-node-agent/pinger"
  22. "github.com/coroot/coroot-node-agent/proc"
  23. "github.com/coroot/coroot-node-agent/tracing"
  24. "github.com/coroot/logparser"
  25. "github.com/prometheus/client_golang/prometheus"
  26. klog "github.com/sirupsen/logrus"
  27. "github.com/vishvananda/netns"
  28. "golang.org/x/exp/maps"
  29. "inet.af/netaddr"
  30. )
  31. var (
  32. gcInterval = 1 * time.Minute
  33. AllAppInfoInterval = 1 * time.Minute
  34. RegisterAppInterval = 1 * time.Minute
  35. RegisterHostInterval = 10 * time.Minute
  36. pingTimeout = 300 * time.Millisecond
  37. )
  38. type ContainerID string
  39. type ContainerNetwork struct {
  40. NetworkID string
  41. }
  42. type ContainerMetadata struct {
  43. name string
  44. labels map[string]string
  45. volumes map[string]string
  46. logPath string
  47. image string
  48. logDecoder logparser.Decoder
  49. hostListens map[string][]netaddr.IPPort
  50. networks map[string]ContainerNetwork
  51. env map[string]string
  52. systemdTriggeredBy string
  53. rootfs string
  54. }
  55. type Delays struct {
  56. cpu time.Duration
  57. disk time.Duration
  58. }
  59. type LogParser struct {
  60. parser *logparser.Parser
  61. stop func()
  62. }
  63. func (p *LogParser) Stop() {
  64. if p.stop != nil {
  65. p.stop()
  66. }
  67. p.parser.Stop()
  68. }
  69. type AddrPair struct {
  70. src netaddr.IPPort
  71. dst netaddr.IPPort
  72. }
  73. type ActiveConnection struct {
  74. Dest netaddr.IPPort
  75. Src netaddr.IPPort
  76. ActualDest netaddr.IPPort
  77. Pid uint32
  78. Fd uint64
  79. Timestamp uint64
  80. Closed time.Time
  81. Retransmissions uint64
  82. BytesSent uint64
  83. PerBytesSent uint64
  84. BytesReceived uint64
  85. PerBytesReceived uint64
  86. ConEstTime time.Duration
  87. FirstReadTime uint64
  88. FirstWriteTime uint64
  89. NewReadTime uint64
  90. http2Parser *l7.Http2Parser
  91. postgresParser *l7.PostgresParser
  92. mysqlParser *l7.MysqlParser
  93. dmParser *l7.DmParser
  94. }
  95. type ActiveAccept struct {
  96. Dest netaddr.IPPort
  97. Src netaddr.IPPort
  98. Pid uint32
  99. Fd uint64
  100. Timestamp uint64
  101. Closed time.Time
  102. BytesSent uint64
  103. BytesReceived uint64
  104. }
  105. type ListenDetails struct {
  106. ClosedAt time.Time
  107. NsIPs []netaddr.IP
  108. }
  109. type PidFd struct {
  110. Pid uint32
  111. Fd uint64
  112. }
  113. type K8sContainer struct {
  114. ns string
  115. podName string
  116. podId string
  117. workload string
  118. containerName string
  119. pid string
  120. }
  121. type ConnectionStats struct {
  122. Count uint64
  123. TotalTime time.Duration
  124. Retransmissions uint64
  125. BytesSent uint64
  126. PerBytesSent uint64
  127. BytesReceived uint64
  128. PerBytesReceived uint64
  129. Src netaddr.IPPort
  130. ConEstTime time.Duration
  131. FirstReadTime uint64
  132. FirstWriteTime uint64
  133. NewReadTime uint64
  134. }
  135. type AcceptStats struct {
  136. BytesSent uint64
  137. BytesReceived uint64
  138. }
  139. type Container struct {
  140. id ContainerID
  141. cgroup *cgroup.Cgroup
  142. metadata *ContainerMetadata
  143. K8sContainer
  144. processes map[uint32]*Process
  145. startedAt time.Time
  146. zombieAt time.Time
  147. restarts int
  148. delays Delays
  149. delaysByPid map[uint32]Delays
  150. delaysLock sync.Mutex
  151. listens map[netaddr.IPPort]map[uint32]*ListenDetails
  152. connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count
  153. // connectsFailed map[netaddr.IPPort]int64 // dst -> count
  154. connectsFailed map[AddrPair]int64
  155. connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  156. connectionsActive map[AddrPair]*ActiveConnection
  157. connectionsByPidFd map[PidFd]*ActiveConnection
  158. acceptsSuccessful map[AddrPair]*AcceptStats
  159. acceptLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  160. acceptsActive map[AddrPair]*ActiveAccept
  161. acceptsByPidFd map[PidFd]*ActiveAccept
  162. l7Stats L7Stats
  163. dnsStats *L7Metrics
  164. oomKills int
  165. pythonThreadLockWaitTime time.Duration
  166. mounts map[string]proc.MountInfo
  167. logParsers map[string]*LogParser
  168. hostConntrack *Conntrack
  169. nsConntrack *Conntrack
  170. lbConntracks []*Conntrack
  171. registry *Registry
  172. lock sync.RWMutex
  173. done chan struct{}
  174. traceMap map[uint64]*tracing.Trace
  175. //instanceID utils.ID
  176. Symbols []debugelf.Symbol
  177. Uprobes []tracer.Uprobe
  178. UprobesMap map[string]tracer.Uprobe
  179. l7EventReady bool
  180. l7Attach bool
  181. // 白名单详情
  182. WhiteSettingInfo WhiteSettingInfo
  183. // 应用详情
  184. AppInfo AppInfo
  185. RegTime int
  186. }
  187. func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) {
  188. netNs, err := proc.GetNetNs(pid)
  189. if err != nil {
  190. return nil, err
  191. }
  192. defer netNs.Close()
  193. c := &Container{
  194. id: id,
  195. cgroup: cg,
  196. metadata: md,
  197. processes: map[uint32]*Process{},
  198. delaysByPid: map[uint32]Delays{},
  199. listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
  200. connectsSuccessful: map[AddrPair]*ConnectionStats{},
  201. // connectsFailed: map[netaddr.IPPort]int64{},
  202. connectsFailed: map[AddrPair]int64{},
  203. connectLastAttempt: map[netaddr.IPPort]time.Time{},
  204. connectionsActive: map[AddrPair]*ActiveConnection{},
  205. connectionsByPidFd: map[PidFd]*ActiveConnection{},
  206. acceptsSuccessful: map[AddrPair]*AcceptStats{},
  207. acceptLastAttempt: map[netaddr.IPPort]time.Time{},
  208. acceptsActive: map[AddrPair]*ActiveAccept{},
  209. acceptsByPidFd: map[PidFd]*ActiveAccept{},
  210. l7Stats: L7Stats{},
  211. dnsStats: &L7Metrics{},
  212. mounts: map[string]proc.MountInfo{},
  213. logParsers: map[string]*LogParser{},
  214. hostConntrack: hostConntrack,
  215. done: make(chan struct{}),
  216. traceMap: make(map[uint64]*tracing.Trace),
  217. registry: registry,
  218. }
  219. for _, n := range md.networks {
  220. if nsHandle := FindNetworkLoadBalancerNs(n.NetworkID); nsHandle.IsOpen() {
  221. if ct, err := NewConntrack(nsHandle); err != nil {
  222. klog.Warningln(err)
  223. } else {
  224. c.lbConntracks = append(c.lbConntracks, ct)
  225. }
  226. _ = nsHandle.Close()
  227. }
  228. }
  229. c.runLogParser("")
  230. // go func() {
  231. // ticker := time.NewTicker(gcInterval)
  232. // defer ticker.Stop()
  233. // for {
  234. // select {
  235. // case <-c.done:
  236. // return
  237. // case t := <-ticker.C:
  238. // c.gc(t)
  239. // }
  240. // }
  241. // }()
  242. return c, nil
  243. }
  244. func (c *Container) Close() {
  245. for _, p := range c.logParsers {
  246. p.Stop()
  247. }
  248. for _, ct := range c.lbConntracks {
  249. _ = ct.Close()
  250. }
  251. if c.nsConntrack != nil {
  252. _ = c.nsConntrack.Close()
  253. }
  254. close(c.done)
  255. }
  256. func (c *Container) Dead(now time.Time) bool {
  257. return !c.zombieAt.IsZero() && now.Sub(c.zombieAt) > gcInterval
  258. }
  259. func (c *Container) Describe(ch chan<- *prometheus.Desc) {
  260. // some fixed metric description is required here to register/unregister the collector correctly
  261. ch <- prometheus.NewDesc("container", "", nil, nil)
  262. }
  263. func (c *Container) Collect(ch chan<- prometheus.Metric) {
  264. c.registry.updateTrafficStatsIfNecessary()
  265. c.lock.RLock()
  266. defer c.lock.RUnlock()
  267. if c.metadata.image != "" || c.metadata.systemdTriggeredBy != "" {
  268. ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image, c.metadata.systemdTriggeredBy)
  269. }
  270. ch <- counter(metrics.Restarts, float64(c.restarts))
  271. if cpu, err := c.cgroup.CpuStat(); err == nil {
  272. if cpu.LimitCores > 0 {
  273. ch <- gauge(metrics.CPULimit, cpu.LimitCores)
  274. }
  275. ch <- counter(metrics.CPUUsage, cpu.UsageSeconds)
  276. ch <- counter(metrics.ThrottledTime, cpu.ThrottledTimeSeconds)
  277. }
  278. if taskstatsClient != nil {
  279. c.updateDelays()
  280. ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second))
  281. ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second))
  282. }
  283. if s, err := c.cgroup.MemoryStat(); err == nil {
  284. ch <- gauge(metrics.MemoryRss, float64(s.RSS))
  285. ch <- gauge(metrics.MemoryCache, float64(s.Cache))
  286. if s.Limit > 0 {
  287. ch <- gauge(metrics.MemoryLimit, float64(s.Limit))
  288. }
  289. }
  290. if c.oomKills > 0 {
  291. ch <- counter(metrics.OOMKills, float64(c.oomKills))
  292. }
  293. if disks, err := node.GetDisks(); err == nil {
  294. ioStat, _ := c.cgroup.IOStat()
  295. for majorMinor, mounts := range c.getMounts() {
  296. dev := disks.GetParentBlockDevice(majorMinor)
  297. if dev == nil {
  298. continue
  299. }
  300. for mountPoint, fsStat := range mounts {
  301. dls := []string{mountPoint, dev.Name, c.metadata.volumes[mountPoint]}
  302. ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...)
  303. ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...)
  304. ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...)
  305. if io, ok := ioStat[majorMinor]; ok {
  306. ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...)
  307. ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...)
  308. ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...)
  309. ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...)
  310. }
  311. }
  312. }
  313. }
  314. for addr, open := range c.getListens() {
  315. ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "")
  316. }
  317. for proxy, addrs := range c.getProxiedListens() {
  318. for addr := range addrs {
  319. ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy)
  320. }
  321. }
  322. strInstanceID := strconv.FormatInt(c.AppInfo.InstanceIdHash.IntVal, 10)
  323. strAppId := strconv.FormatInt(c.AppInfo.AppIdHash.IntVal, 10)
  324. strAppName := c.AppInfo.AppName
  325. apps := c.registry.GetAllAppInfoFromServer()
  326. for d, stats := range c.connectsSuccessful {
  327. targetAppId := apps[d.dst.String()]
  328. ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  329. ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  330. stats.Count = 0
  331. // if stats.Retransmissions > 0 {
  332. // ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  333. // }
  334. // ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  335. // ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  336. // ch <- counter(metrics.NetBytesSentPer, float64(stats.PerBytesSent), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  337. // ch <- counter(metrics.NetBytesReceivedPer, float64(stats.PerBytesReceived), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  338. // ch <- counter(metrics.NetDataLatency, float64(stats.FirstReadTime-stats.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  339. // ch <- counter(metrics.NetDataDuration, float64(stats.NewReadTime-stats.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  340. // ch <- counter(metrics.NetEstTime, float64(stats.ConEstTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  341. // klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime)
  342. // stats.PerBytesReceived = 0
  343. // stats.PerBytesSent = 0
  344. }
  345. // for d, stats := range c.acceptsSuccessful {
  346. // ch <- counter(metrics.NetAcceptsSuccessful, float64(0), d.src.String(), d.dst.String())
  347. // ch <- counter(metrics.NetAcceptBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String())
  348. // ch <- counter(metrics.NetAcceptBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String())
  349. // klog.Infof("c.acceptsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d", d.src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived)
  350. // }
  351. // for dst, count := range c.connectsFailed {
  352. // targetAppId := apps[dst.String()]
  353. // ch <- counter(metrics.NetConnectionsFailed, float64(count), strInstanceID, strAppId, targetAppId, strAppName, dst.String())
  354. // }
  355. for addrPair, count := range c.connectsFailed {
  356. targetAppId := apps[addrPair.dst.String()]
  357. ch <- counter(metrics.NetConnectionsFailed, float64(count), strInstanceID, strAppId, targetAppId, strAppName, addrPair.src.String(), addrPair.dst.String())
  358. c.connectsFailed[addrPair] = 0
  359. }
  360. connections := map[AddrPair]int{}
  361. for addrPair, conn := range c.connectionsActive {
  362. targetAppId := apps[conn.Dest.String()]
  363. if conn.Retransmissions > 0 {
  364. ch <- counter(metrics.NetRetransmits, float64(conn.Retransmissions), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  365. conn.Retransmissions = 0
  366. }
  367. ch <- counter(metrics.NetBytesSent, float64(conn.BytesSent), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  368. ch <- counter(metrics.NetBytesReceived, float64(conn.BytesReceived), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  369. ch <- counter(metrics.NetBytesSentPer, float64(conn.PerBytesSent), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  370. ch <- counter(metrics.NetBytesReceivedPer, float64(conn.PerBytesReceived), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  371. ch <- counter(metrics.NetDataLatency, float64(conn.FirstReadTime-conn.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  372. ch <- counter(metrics.NetDataDuration, float64(conn.NewReadTime-conn.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  373. ch <- counter(metrics.NetEstTime, float64(conn.ConEstTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  374. // klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime)
  375. conn.PerBytesReceived = 0
  376. conn.PerBytesSent = 0
  377. if !conn.Closed.IsZero() {
  378. continue
  379. }
  380. connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest}]++
  381. }
  382. for d, count := range connections {
  383. ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
  384. }
  385. for source, p := range c.logParsers {
  386. for _, c := range p.parser.GetCounters() {
  387. ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
  388. }
  389. }
  390. appTypes := map[string]struct{}{}
  391. seenJvms := map[string]bool{}
  392. seenDotNetApps := map[string]bool{}
  393. pids := maps.Keys(c.processes)
  394. sort.Slice(pids, func(i, j int) bool {
  395. return pids[i] < pids[j]
  396. })
  397. for _, pid := range pids {
  398. process := c.processes[pid]
  399. cmdline := proc.GetCmdline(pid)
  400. if len(cmdline) == 0 {
  401. continue
  402. }
  403. appType := guessApplicationType(cmdline)
  404. if appType != "" {
  405. appTypes[appType] = struct{}{}
  406. }
  407. if process.isGolangApp {
  408. appTypes["golang"] = struct{}{}
  409. }
  410. switch {
  411. case isJvm(cmdline):
  412. jvm, jMetrics := jvmMetrics(pid)
  413. if len(jMetrics) > 0 && !seenJvms[jvm] {
  414. seenJvms[jvm] = true
  415. for _, m := range jMetrics {
  416. ch <- m
  417. }
  418. }
  419. case process.dotNetMonitor != nil:
  420. appTypes["dotnet"] = struct{}{}
  421. appName := process.dotNetMonitor.AppName()
  422. if !seenDotNetApps[appName] {
  423. seenDotNetApps[appName] = true
  424. process.dotNetMonitor.Collect(ch)
  425. }
  426. }
  427. }
  428. for appType := range appTypes {
  429. ch <- gauge(metrics.ApplicationType, 1, appType)
  430. }
  431. if c.pythonThreadLockWaitTime > 0 {
  432. ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds())
  433. }
  434. if c.dnsStats.Requests != nil {
  435. c.dnsStats.Requests.Collect(ch)
  436. }
  437. if c.dnsStats.Latency != nil {
  438. c.dnsStats.Latency.Collect(ch)
  439. }
  440. c.l7Stats.collect(ch)
  441. if !*flags.DisablePinger {
  442. for ip, rtt := range c.ping() {
  443. ch <- gauge(metrics.NetLatency, rtt, ip.String())
  444. }
  445. }
  446. c.gc(time.Now())
  447. }
  448. func (c *Container) onProcessStart(pid uint32) *Process {
  449. c.lock.Lock()
  450. defer c.lock.Unlock()
  451. stats, err := TaskstatsPID(pid)
  452. if err != nil {
  453. klog.WithError(err).Debugf("Failed onProcessStart [%d]", pid)
  454. return nil
  455. }
  456. c.zombieAt = time.Time{}
  457. p := NewProcess(pid, stats, c.registry.tracer)
  458. if p == nil {
  459. return nil
  460. }
  461. c.processes[pid] = p
  462. if c.startedAt.IsZero() {
  463. c.startedAt = stats.BeginTime
  464. } else {
  465. min := stats.BeginTime
  466. for _, p := range c.processes {
  467. if p.StartedAt.Before(min) {
  468. min = p.StartedAt
  469. }
  470. }
  471. if min.After(c.startedAt) {
  472. c.restarts++
  473. c.startedAt = min
  474. }
  475. }
  476. return p
  477. }
  478. func (c *Container) onProcessExit(pid uint32, oomKill bool) {
  479. c.lock.Lock()
  480. defer c.lock.Unlock()
  481. if p := c.processes[pid]; p != nil {
  482. p.Close()
  483. }
  484. delete(c.processes, pid)
  485. if len(c.processes) == 0 {
  486. c.zombieAt = time.Now()
  487. }
  488. delete(c.delaysByPid, pid)
  489. if oomKill {
  490. c.oomKills++
  491. }
  492. }
  493. func (c *Container) onFileOpen(pid uint32, fd uint64) {
  494. mntId, logPath := resolveFd(pid, fd)
  495. func() {
  496. if mntId == "" {
  497. return
  498. }
  499. c.lock.Lock()
  500. _, ok := c.mounts[mntId]
  501. c.lock.Unlock()
  502. if ok {
  503. return
  504. }
  505. byMountId := proc.GetMountInfo(pid)
  506. if byMountId == nil {
  507. return
  508. }
  509. if mi, ok := byMountId[mntId]; ok {
  510. c.lock.Lock()
  511. c.mounts[mntId] = mi
  512. c.lock.Unlock()
  513. }
  514. }()
  515. if logPath != "" {
  516. c.lock.Lock()
  517. c.runLogParser(logPath)
  518. c.lock.Unlock()
  519. }
  520. }
  521. // set
  522. func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
  523. klog.Infof("TCP listen open pid=%d id=%s addr=%s", pid, c.id, addr)
  524. if common.PortFilter.ShouldBeSkipped(addr.Port()) {
  525. return
  526. }
  527. if !safe {
  528. c.lock.Lock()
  529. defer c.lock.Unlock()
  530. }
  531. if _, ok := c.listens[addr]; !ok {
  532. c.listens[addr] = map[uint32]*ListenDetails{}
  533. }
  534. details := &ListenDetails{}
  535. c.listens[addr][pid] = details
  536. if addr.IP().IsUnspecified() {
  537. ns, err := proc.GetNetNs(pid)
  538. if err != nil {
  539. if !common.IsNotExist(err) {
  540. klog.Warningln(err)
  541. }
  542. return
  543. }
  544. defer ns.Close()
  545. ips, err := proc.GetNsIps(ns)
  546. if err != nil {
  547. klog.Warningln(err)
  548. return
  549. }
  550. klog.Infof("got IPs %s for %s", ips, ns.UniqueId())
  551. details.NsIPs = ips
  552. }
  553. }
  554. func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
  555. klog.Infof("TCP listen close pid=%d id=%s addr=%s", pid, c.id, addr)
  556. c.lock.Lock()
  557. defer c.lock.Unlock()
  558. if _, byAddr := c.listens[addr]; byAddr {
  559. if _, byPid := c.listens[addr][pid]; byPid {
  560. if details := c.listens[addr][pid]; details != nil {
  561. details.ClosedAt = time.Now()
  562. }
  563. }
  564. }
  565. }
  566. func (c *Container) onAcceptOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  567. klog.Debugf("accept pid=%d id=%s dstaddr=%s srcaddr=%s", pid, c.id, dst.IP(), src.IP())
  568. // if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  569. // return
  570. // }
  571. p := c.processes[pid]
  572. if p == nil {
  573. return
  574. }
  575. if dst.IP().IsLoopback() && !p.isHostNs() {
  576. return
  577. }
  578. // actualDst, err := c.getActualDestination(p, src, dst)
  579. // if err != nil {
  580. // if !common.IsNotExist(err) {
  581. // klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  582. // }
  583. // return
  584. // }
  585. // switch {
  586. // case actualDst == nil:
  587. // actualDst = &dst
  588. // case actualDst.IP().IsLoopback() && !p.isHostNs():
  589. // return
  590. // }
  591. // if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  592. // return
  593. // }
  594. c.lock.Lock()
  595. defer c.lock.Unlock()
  596. if !failed {
  597. key := AddrPair{src: dst, dst: src}
  598. stats := c.acceptsSuccessful[key]
  599. if stats == nil {
  600. stats = &AcceptStats{}
  601. c.acceptsSuccessful[key] = stats
  602. }
  603. acceptCon := &ActiveAccept{
  604. Dest: src,
  605. Src: dst,
  606. Pid: pid,
  607. Fd: fd,
  608. Timestamp: timestamp,
  609. }
  610. c.acceptsActive[AddrPair{src: dst, dst: src}] = acceptCon
  611. c.acceptsByPidFd[PidFd{Pid: pid, Fd: fd}] = acceptCon
  612. }
  613. c.acceptLastAttempt[dst] = time.Now()
  614. }
  615. func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  616. if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  617. return
  618. }
  619. p := c.processes[pid]
  620. if p == nil {
  621. return
  622. }
  623. if dst.IP().IsLoopback() && !p.isHostNs() {
  624. return
  625. }
  626. actualDst, err := c.getActualDestination(p, src, dst)
  627. if err != nil {
  628. if !common.IsNotExist(err) {
  629. klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  630. }
  631. return
  632. }
  633. switch {
  634. case actualDst == nil:
  635. actualDst = &dst
  636. case actualDst.IP().IsLoopback() && !p.isHostNs():
  637. return
  638. }
  639. if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  640. return
  641. }
  642. c.lock.Lock()
  643. defer c.lock.Unlock()
  644. if failed {
  645. key := AddrPair{src: dst, dst: *actualDst}
  646. c.connectsFailed[key]++
  647. } else {
  648. key := AddrPair{src: dst, dst: *actualDst}
  649. stats := c.connectsSuccessful[key]
  650. if stats == nil {
  651. stats = &ConnectionStats{}
  652. c.connectsSuccessful[key] = stats
  653. }
  654. stats.Count++
  655. stats.TotalTime += duration
  656. stats.Src = src
  657. stats.ConEstTime = duration
  658. connection := &ActiveConnection{
  659. Dest: dst,
  660. Src: src,
  661. ActualDest: *actualDst,
  662. Pid: pid,
  663. Fd: fd,
  664. Timestamp: timestamp,
  665. ConEstTime: duration,
  666. }
  667. c.connectionsActive[AddrPair{src: src, dst: dst}] = connection
  668. c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection
  669. }
  670. c.connectLastAttempt[dst] = time.Now()
  671. }
  672. func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) {
  673. if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil {
  674. return actualDst, nil
  675. }
  676. for _, lb := range c.lbConntracks {
  677. if actualDst := lb.GetActualDestination(src, dst); actualDst != nil {
  678. return actualDst, nil
  679. }
  680. }
  681. actualDst := c.hostConntrack.GetActualDestination(src, dst)
  682. if actualDst != nil {
  683. return actualDst, nil
  684. }
  685. if !p.isHostNs() {
  686. if c.nsConntrack == nil {
  687. netNs, err := proc.GetNetNs(p.Pid)
  688. if err != nil {
  689. return nil, err
  690. }
  691. defer netNs.Close()
  692. c.nsConntrack, err = NewConntrack(netNs)
  693. if err != nil {
  694. return nil, err
  695. }
  696. }
  697. return c.nsConntrack.GetActualDestination(src, dst), nil
  698. }
  699. return nil, nil
  700. }
  701. func (c *Container) onConnectionClose(e ebpftracer.Event) {
  702. c.lock.Lock()
  703. conn := c.connectionsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  704. c.lock.Unlock()
  705. if conn != nil {
  706. if conn.Closed.IsZero() {
  707. if e.TrafficStats != nil {
  708. c.lock.Lock()
  709. c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived, e.FirstReadTime, e.FirstWriteTime, e.NewReadTime)
  710. c.lock.Unlock()
  711. }
  712. conn.Closed = time.Now()
  713. }
  714. }
  715. }
  716. func (c *Container) onAcceptClose(e ebpftracer.Event) {
  717. c.lock.Lock()
  718. conn := c.acceptsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  719. c.lock.Unlock()
  720. if conn != nil {
  721. if conn.Closed.IsZero() {
  722. if e.TrafficStats != nil {
  723. c.lock.Lock()
  724. c.updateAcceptTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived)
  725. c.lock.Unlock()
  726. }
  727. conn.Closed = time.Now()
  728. }
  729. }
  730. }
  731. func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) {
  732. if u == nil {
  733. return
  734. }
  735. c.lock.Lock()
  736. defer c.lock.Unlock()
  737. c.updateConnectionTrafficStats(c.connectionsByPidFd[PidFd{Pid: u.Pid, Fd: u.FD}], u.BytesSent, u.BytesReceived, 0, 0, 0)
  738. }
  739. func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, received, firstreadtime, firstwritetime, newreadtime uint64) {
  740. if ac == nil {
  741. return
  742. }
  743. key := AddrPair{src: ac.Dest, dst: ac.ActualDest}
  744. stats := c.connectsSuccessful[key]
  745. if stats == nil {
  746. stats = &ConnectionStats{}
  747. c.connectsSuccessful[key] = stats
  748. }
  749. if sent > ac.BytesSent {
  750. stats.BytesSent += sent - ac.BytesSent
  751. stats.PerBytesSent = sent - ac.BytesSent
  752. ac.PerBytesSent = sent - ac.BytesSent
  753. }
  754. if received > ac.BytesReceived {
  755. stats.BytesReceived += received - ac.BytesReceived
  756. stats.PerBytesReceived = received - ac.BytesReceived
  757. ac.PerBytesReceived = received - ac.BytesReceived
  758. }
  759. if firstreadtime != 0 && firstwritetime != 0 && newreadtime != 0 {
  760. stats.FirstReadTime = firstreadtime
  761. stats.FirstWriteTime = firstwritetime
  762. stats.NewReadTime = newreadtime
  763. ac.FirstReadTime = firstreadtime
  764. ac.FirstWriteTime = firstwritetime
  765. ac.NewReadTime = newreadtime
  766. }
  767. ac.BytesSent = sent
  768. ac.BytesReceived = received
  769. }
  770. func (c *Container) updateAcceptTrafficStats(ac *ActiveAccept, sent, received uint64) {
  771. if ac == nil {
  772. return
  773. }
  774. //klog.Infoln("TCP onConnectionClose5", ac.BytesSent, ac.BytesReceived, ac)
  775. key := AddrPair{src: ac.Src, dst: ac.Dest}
  776. stats := c.acceptsSuccessful[key]
  777. if stats == nil {
  778. stats = &AcceptStats{}
  779. c.acceptsSuccessful[key] = stats
  780. }
  781. if sent > ac.BytesSent {
  782. stats.BytesSent += sent - ac.BytesSent
  783. }
  784. if received > ac.BytesReceived {
  785. stats.BytesReceived += received - ac.BytesReceived
  786. }
  787. ac.BytesSent = sent
  788. ac.BytesReceived = received
  789. }
  790. func (c *Container) onDNSRequest(r *l7.RequestData) (map[netaddr.IP]string, string, string, uint32, []netaddr.IP) {
  791. status := r.Status.DNS()
  792. if status == "" {
  793. return nil, "", "", 0, nil
  794. }
  795. t, fqdn, ttl, ips := l7.ParseDns(r.Payload)
  796. if t == "" {
  797. return nil, "", "", 0, nil
  798. }
  799. if c.dnsStats.Requests == nil {
  800. dnsReq := L7Requests[l7.ProtocolDNS]
  801. c.dnsStats.Requests = prometheus.NewCounterVec(
  802. prometheus.CounterOpts{Name: dnsReq.Name, Help: dnsReq.Help},
  803. []string{"request_type", "domain", "status"},
  804. )
  805. }
  806. if m, _ := c.dnsStats.Requests.GetMetricWithLabelValues(t, fqdn, status); m != nil {
  807. m.Inc()
  808. }
  809. if r.Duration != 0 {
  810. if c.dnsStats.Latency == nil {
  811. dnsLatency := L7Latency[l7.ProtocolDNS]
  812. c.dnsStats.Latency = prometheus.NewHistogram(prometheus.HistogramOpts{Name: dnsLatency.Name, Help: dnsLatency.Help})
  813. }
  814. c.dnsStats.Latency.Observe(r.Duration.Seconds())
  815. }
  816. ip2fqdn := map[netaddr.IP]string{}
  817. if fqdn != "" {
  818. for _, ip := range ips {
  819. ip2fqdn[ip] = fqdn
  820. }
  821. }
  822. return ip2fqdn, t, fqdn, ttl, ips
  823. }
  824. func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  825. c.lock.Lock()
  826. defer c.lock.Unlock()
  827. if r.Protocol == l7.ProtocolDNS {
  828. //return c.onDNSRequest(r)
  829. }
  830. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  831. if conn == nil {
  832. return nil
  833. }
  834. if timestamp != 0 && conn.Timestamp != timestamp {
  835. return nil
  836. }
  837. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  838. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  839. switch r.Protocol {
  840. case l7.ProtocolHTTP:
  841. stats.observe(r.Status.Http(), "", r.Duration)
  842. method, path := l7.ParseHttp(r.Payload)
  843. trace.HttpRequest(method, path, r.Status, r.Duration)
  844. case l7.ProtocolHTTP2:
  845. if conn.http2Parser == nil {
  846. conn.http2Parser = l7.NewHttp2Parser()
  847. }
  848. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  849. for _, req := range requests {
  850. stats.observe(req.Status.Http(), "", req.Duration)
  851. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  852. }
  853. case l7.ProtocolPostgres:
  854. if r.Method != l7.MethodStatementClose {
  855. stats.observe(r.Status.String(), "", r.Duration)
  856. }
  857. if conn.postgresParser == nil {
  858. conn.postgresParser = l7.NewPostgresParser()
  859. }
  860. query := conn.postgresParser.Parse(r.Payload)
  861. trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  862. case l7.ProtocolMysql:
  863. if r.Method != l7.MethodStatementClose {
  864. stats.observe(r.Status.String(), "", r.Duration)
  865. }
  866. if conn.mysqlParser == nil {
  867. conn.mysqlParser = l7.NewMysqlParser()
  868. }
  869. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  870. trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  871. case l7.ProtocolMemcached:
  872. stats.observe(r.Status.String(), "", r.Duration)
  873. cmd, items := l7.ParseMemcached(r.Payload)
  874. trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  875. case l7.ProtocolRedis:
  876. stats.observe(r.Status.String(), "", r.Duration)
  877. cmd, args := l7.ParseRedis(r.Payload)
  878. trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  879. case l7.ProtocolMongo:
  880. stats.observe(r.Status.String(), "", r.Duration)
  881. query := l7.ParseMongo(r.Payload)
  882. trace.MongoQuery(query, r.Status.Error(), r.Duration)
  883. case l7.ProtocolKafka,l7.ProtocolCassandra:
  884. stats.observe(r.Status.String(), "", r.Duration)
  885. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  886. stats.observe(r.Status.String(), r.Method.String(), 0)
  887. case l7.ProtocolDubbo2:
  888. stats.observe(r.Status.String(), "", r.Duration)
  889. }
  890. return nil
  891. }
  892. func (c *Container) onRetransmission(srcDst AddrPair) bool {
  893. c.lock.Lock()
  894. defer c.lock.Unlock()
  895. conn, ok := c.connectionsActive[srcDst]
  896. if !ok {
  897. return false
  898. }
  899. key := AddrPair{src: srcDst.dst, dst: conn.ActualDest}
  900. stats := c.connectsSuccessful[key]
  901. if stats == nil {
  902. stats = &ConnectionStats{}
  903. c.connectsSuccessful[key] = stats
  904. }
  905. stats.Retransmissions++
  906. conn.Retransmissions++
  907. return true
  908. }
  909. func (c *Container) updateDelays() {
  910. c.delaysLock.Lock()
  911. defer c.delaysLock.Unlock()
  912. for pid := range c.processes {
  913. stats, err := TaskstatsTGID(pid)
  914. if err != nil {
  915. continue
  916. }
  917. d := c.delaysByPid[pid]
  918. c.delays.cpu += stats.CPUDelay - d.cpu
  919. c.delays.disk += stats.BlockIODelay - d.disk
  920. d.cpu = stats.CPUDelay
  921. d.disk = stats.BlockIODelay
  922. c.delaysByPid[pid] = d
  923. }
  924. }
  925. func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
  926. if len(c.mounts) == 0 {
  927. return nil
  928. }
  929. res := map[string]map[string]*proc.FSStat{}
  930. for _, mi := range c.mounts {
  931. var stat *proc.FSStat
  932. for pid := range c.processes {
  933. s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
  934. if err == nil {
  935. stat = &s
  936. break
  937. }
  938. }
  939. if stat == nil {
  940. continue
  941. }
  942. if _, ok := res[mi.MajorMinor]; !ok {
  943. res[mi.MajorMinor] = map[string]*proc.FSStat{}
  944. }
  945. res[mi.MajorMinor][mi.MountPoint] = stat
  946. }
  947. return res
  948. }
  949. func (c *Container) getListens() map[netaddr.IPPort]int {
  950. res := map[netaddr.IPPort]int{}
  951. for addr, byPid := range c.listens {
  952. open := 0
  953. isHostNs := false
  954. ips := map[netaddr.IP]bool{}
  955. for pid, details := range byPid {
  956. p := c.processes[pid]
  957. if p == nil {
  958. continue
  959. }
  960. if p.isHostNs() {
  961. isHostNs = true
  962. }
  963. if details.ClosedAt.IsZero() {
  964. open = 1
  965. }
  966. for _, ip := range details.NsIPs {
  967. ips[ip] = true
  968. }
  969. }
  970. if !addr.IP().IsUnspecified() {
  971. ips = map[netaddr.IP]bool{addr.IP(): true}
  972. }
  973. for ip := range ips {
  974. if ip.IsLoopback() && !isHostNs {
  975. continue
  976. }
  977. res[netaddr.IPPortFrom(ip, addr.Port())] = open
  978. }
  979. }
  980. return res
  981. }
  982. func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {
  983. if len(c.metadata.hostListens) == 0 {
  984. return nil
  985. }
  986. hasUnspecified := false
  987. for _, addrs := range c.metadata.hostListens {
  988. for _, addr := range addrs {
  989. if addr.IP().IsUnspecified() {
  990. hasUnspecified = true
  991. break
  992. }
  993. }
  994. }
  995. var hostIps []netaddr.IP
  996. if hasUnspecified {
  997. if ns, err := proc.GetHostNetNs(); err != nil {
  998. klog.Warningln(err)
  999. } else {
  1000. ips, err := proc.GetNsIps(ns)
  1001. _ = ns.Close()
  1002. if err != nil {
  1003. klog.Warningln(err)
  1004. } else {
  1005. hostIps = ips
  1006. }
  1007. }
  1008. }
  1009. res := map[string]map[netaddr.IPPort]struct{}{}
  1010. for proxy, addrs := range c.metadata.hostListens {
  1011. res[proxy] = map[netaddr.IPPort]struct{}{}
  1012. for _, addr := range addrs {
  1013. if addr.IP().IsUnspecified() {
  1014. for _, ip := range hostIps {
  1015. if addr.IP().Is4() && ip.Is4() || addr.IP().Is6() && ip.Is6() {
  1016. res[proxy][netaddr.IPPortFrom(ip, addr.Port())] = struct{}{}
  1017. }
  1018. }
  1019. } else {
  1020. res[proxy][addr] = struct{}{}
  1021. }
  1022. }
  1023. }
  1024. return res
  1025. }
  1026. func (c *Container) ping() map[netaddr.IP]float64 {
  1027. netNs := netns.None()
  1028. for pid := range c.processes {
  1029. if pid == agentPid {
  1030. netNs = selfNetNs
  1031. break
  1032. }
  1033. ns, err := proc.GetNetNs(pid)
  1034. if err != nil {
  1035. if !common.IsNotExist(err) {
  1036. klog.Warningln(err)
  1037. }
  1038. continue
  1039. }
  1040. netNs = ns
  1041. defer netNs.Close()
  1042. break
  1043. }
  1044. if !netNs.IsOpen() {
  1045. return nil
  1046. }
  1047. ips := map[netaddr.IP]struct{}{}
  1048. for d := range c.connectsSuccessful {
  1049. ips[d.dst.IP()] = struct{}{}
  1050. }
  1051. for dst := range c.connectsFailed {
  1052. ips[dst.src.IP()] = struct{}{}
  1053. }
  1054. if len(ips) == 0 {
  1055. return nil
  1056. }
  1057. targets := make([]netaddr.IP, 0, len(ips))
  1058. for ip := range ips {
  1059. if ip.IsLoopback() {
  1060. continue
  1061. }
  1062. if !ip.Is4() { // pinger doesn't support IPv6 yet
  1063. continue
  1064. }
  1065. targets = append(targets, ip)
  1066. }
  1067. rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout)
  1068. if err != nil {
  1069. klog.Warningln(err)
  1070. return nil
  1071. }
  1072. return rtt
  1073. }
  1074. func (c *Container) runLogParser(logPath string) {
  1075. if *flags.DisableLogParsing {
  1076. return
  1077. }
  1078. containerId := string(c.id)
  1079. if logPath != "" {
  1080. if c.logParsers[logPath] != nil {
  1081. return
  1082. }
  1083. ch := make(chan logparser.LogEntry)
  1084. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1085. reader, err := logs.NewTailReader(proc.HostPath(logPath), ch)
  1086. if err != nil {
  1087. klog.Warningln(err)
  1088. parser.Stop()
  1089. return
  1090. }
  1091. klog.Debugln("started varlog logparser", "cg", c.cgroup.Id, "log", logPath)
  1092. c.logParsers[logPath] = &LogParser{parser: parser, stop: reader.Stop}
  1093. return
  1094. }
  1095. switch c.cgroup.ContainerType {
  1096. case cgroup.ContainerTypeSystemdService:
  1097. ch := make(chan logparser.LogEntry)
  1098. if err := JournaldSubscribe(c.cgroup, ch); err != nil {
  1099. klog.Warningln(err)
  1100. return
  1101. }
  1102. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1103. stop := func() {
  1104. JournaldUnsubscribe(c.cgroup)
  1105. }
  1106. klog.Infoln("started journald logparser", "cg", c.cgroup.Id)
  1107. c.logParsers["journald"] = &LogParser{parser: parser, stop: stop}
  1108. case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd, cgroup.ContainerTypeCrio:
  1109. if c.metadata.logPath == "" {
  1110. return
  1111. }
  1112. if parser := c.logParsers["stdout/stderr"]; parser != nil {
  1113. parser.Stop()
  1114. delete(c.logParsers, "stdout/stderr")
  1115. }
  1116. ch := make(chan logparser.LogEntry)
  1117. parser := logparser.NewParser(ch, c.metadata.logDecoder, logs.OtelLogEmitter(containerId))
  1118. reader, err := logs.NewTailReader(proc.HostPath(c.metadata.logPath), ch)
  1119. if err != nil {
  1120. klog.Warningln(err)
  1121. parser.Stop()
  1122. return
  1123. }
  1124. klog.Infoln("started container logparser", "cg", c.cgroup.Id)
  1125. c.logParsers["stdout/stderr"] = &LogParser{parser: parser, stop: reader.Stop}
  1126. }
  1127. }
  1128. func (c *Container) gc(now time.Time) {
  1129. // c.lock.Lock()
  1130. // defer c.lock.Unlock()
  1131. established := map[AddrPair]struct{}{}
  1132. establishedDst := map[netaddr.IPPort]struct{}{}
  1133. listens := map[netaddr.IPPort]string{}
  1134. seenNamespaces := map[string]bool{}
  1135. fdMap := map[uint64]struct{}{}
  1136. for _, p := range c.processes {
  1137. if seenNamespaces[p.NetNsId()] {
  1138. continue
  1139. }
  1140. sockets, err := proc.GetSockets(p.Pid)
  1141. if err != nil {
  1142. continue
  1143. }
  1144. fds, err := proc.ReadFds(p.Pid)
  1145. if err == nil {
  1146. for _, fd := range fds {
  1147. fdMap[fd.Fd] = struct{}{}
  1148. }
  1149. }
  1150. for _, s := range sockets {
  1151. if s.Listen {
  1152. listens[s.SAddr] = s.Inode
  1153. } else {
  1154. established[AddrPair{src: s.SAddr, dst: s.DAddr}] = struct{}{}
  1155. establishedDst[s.DAddr] = struct{}{}
  1156. }
  1157. }
  1158. seenNamespaces[p.NetNsId()] = true
  1159. }
  1160. c.revalidateListens(now, listens)
  1161. for srcDst, conn := range c.connectionsActive {
  1162. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1163. if _, ok := established[srcDst]; !ok {
  1164. delete(c.connectionsActive, srcDst)
  1165. if conn == c.connectionsByPidFd[pidFd] {
  1166. delete(c.connectionsByPidFd, pidFd)
  1167. }
  1168. continue
  1169. }
  1170. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1171. delete(c.connectionsActive, srcDst)
  1172. if conn == c.connectionsByPidFd[pidFd] {
  1173. delete(c.connectionsByPidFd, pidFd)
  1174. }
  1175. }
  1176. }
  1177. for srcDst, conn := range c.acceptsActive {
  1178. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1179. if _, ok := established[srcDst]; !ok {
  1180. delete(c.acceptsActive, srcDst)
  1181. if conn == c.acceptsByPidFd[pidFd] {
  1182. delete(c.acceptsByPidFd, pidFd)
  1183. }
  1184. continue
  1185. }
  1186. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1187. delete(c.acceptsActive, srcDst)
  1188. if conn == c.acceptsByPidFd[pidFd] {
  1189. delete(c.acceptsByPidFd, pidFd)
  1190. }
  1191. }
  1192. }
  1193. for _, conn := range c.connectionsByPidFd {
  1194. if _, ok := fdMap[conn.Fd]; !ok {
  1195. delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1196. }
  1197. }
  1198. for _, conn := range c.acceptsByPidFd {
  1199. if _, ok := fdMap[conn.Fd]; !ok {
  1200. delete(c.acceptsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1201. }
  1202. }
  1203. for dst, at := range c.connectLastAttempt {
  1204. _, active := establishedDst[dst]
  1205. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1206. delete(c.connectLastAttempt, dst)
  1207. // delete(c.connectsFailed, dst)
  1208. for dfailed := range c.connectsFailed {
  1209. if dfailed.src == dst {
  1210. delete(c.connectsFailed, dfailed)
  1211. }
  1212. }
  1213. for d := range c.connectsSuccessful {
  1214. if d.src == dst {
  1215. delete(c.connectsSuccessful, d)
  1216. }
  1217. }
  1218. c.l7Stats.delete(dst)
  1219. }
  1220. }
  1221. for dst, at := range c.acceptLastAttempt {
  1222. _, active := establishedDst[dst]
  1223. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1224. delete(c.acceptLastAttempt, dst)
  1225. for d := range c.acceptsSuccessful {
  1226. if d.src == dst {
  1227. delete(c.acceptsSuccessful, d)
  1228. }
  1229. }
  1230. c.l7Stats.delete(dst)
  1231. }
  1232. }
  1233. }
  1234. func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.IPPort]string) {
  1235. for addr, byPid := range c.listens {
  1236. if _, open := actualListens[addr]; open {
  1237. continue
  1238. }
  1239. klog.Warningln("deleting the outdated listen:", addr)
  1240. for _, details := range byPid {
  1241. if details.ClosedAt.IsZero() {
  1242. details.ClosedAt = now
  1243. }
  1244. }
  1245. }
  1246. missingListens := map[netaddr.IPPort]string{}
  1247. for addr, inode := range actualListens {
  1248. byPids, found := c.listens[addr]
  1249. if !found {
  1250. missingListens[addr] = inode
  1251. continue
  1252. }
  1253. open := false
  1254. for _, details := range byPids {
  1255. if details.ClosedAt.IsZero() {
  1256. open = true
  1257. break
  1258. }
  1259. }
  1260. if !open {
  1261. missingListens[addr] = inode
  1262. }
  1263. }
  1264. if len(missingListens) > 0 {
  1265. inodeToPid := map[string]uint32{}
  1266. for pid := range c.processes {
  1267. fds, err := proc.ReadFds(pid)
  1268. if err != nil {
  1269. klog.Warningln(err)
  1270. continue
  1271. }
  1272. for _, fd := range fds {
  1273. if fd.SocketInode != "" {
  1274. inodeToPid[fd.SocketInode] = pid
  1275. }
  1276. }
  1277. }
  1278. for addr, inode := range missingListens {
  1279. pid, found := inodeToPid[inode]
  1280. if !found {
  1281. continue
  1282. }
  1283. //klog.Warningln("missing listen found:", addr, pid)
  1284. c.onListenOpen(pid, addr, true)
  1285. }
  1286. }
  1287. for addr, pids := range c.listens {
  1288. for pid, details := range pids {
  1289. if !details.ClosedAt.IsZero() && now.Sub(details.ClosedAt) > gcInterval {
  1290. delete(c.listens[addr], pid)
  1291. }
  1292. }
  1293. if len(c.listens[addr]) == 0 {
  1294. delete(c.listens, addr)
  1295. }
  1296. }
  1297. }
  1298. func (c *Container) AttachUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1299. klog.Infoln("[attach] attachUprobes start")
  1300. if tracer.DisableL7Tracing() {
  1301. return nil
  1302. }
  1303. codeType := c.GetCodeTypeFromCache(pid)
  1304. if codeType.IsUnknownCode() {
  1305. return nil
  1306. }
  1307. var err error
  1308. switch codeType {
  1309. case CodeTypeJava:
  1310. err = c.attachJVMUprobes(tracer, pid)
  1311. case CodeTypeJavaAot:
  1312. err = c.attachJavaAotUprobes(tracer, pid)
  1313. case CodeTypeGo:
  1314. err = c.attachTlsUprobes(tracer, pid)
  1315. case CodeTypeNetCoreAot:
  1316. err = c.attachNetCoreUprobes(tracer, pid)
  1317. }
  1318. if err != nil {
  1319. c.DetachUprobes(tracer, pid, APP_UPROBE_ERROR)
  1320. return err
  1321. }
  1322. return nil
  1323. }
  1324. func (c *Container) GetCodeTypeFromCache(pid uint32) CodeType {
  1325. p := c.processes[pid]
  1326. if p == nil {
  1327. return CodeTypeUnknown
  1328. }
  1329. if p.codeType.IsWaitCheck() {
  1330. p.codeType = GetExeType(pid, c.getRootfs())
  1331. }
  1332. return p.codeType
  1333. }
  1334. func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1335. p := c.processes[pid]
  1336. if p == nil {
  1337. return nil
  1338. }
  1339. if p.openSslUprobesChecked || p.goTlsUprobesChecked {
  1340. return nil
  1341. }
  1342. if !p.openSslUprobesChecked {
  1343. p.openSslUprobesChecked = true
  1344. sslProbes, err := tracer.AttachOpenSslUprobes(pid)
  1345. if err != nil {
  1346. return err
  1347. }
  1348. p.uprobes = append(p.uprobes, sslProbes...)
  1349. }
  1350. if !p.goTlsUprobesChecked {
  1351. p.goTlsUprobesChecked = true
  1352. codeType := c.GetCodeTypeFromCache(pid)
  1353. goProbes, err := tracer.AttachGoTlsUprobes(pid, &c.AppInfo, uint16(codeType))
  1354. if err != nil {
  1355. return err
  1356. }
  1357. p.uprobes = append(p.uprobes, goProbes...)
  1358. c.l7AttachSuccess()
  1359. }
  1360. return nil
  1361. }
  1362. func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1363. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1364. return nil
  1365. }
  1366. p := c.processes[pid]
  1367. if p == nil {
  1368. return nil
  1369. }
  1370. codeType := c.GetCodeTypeFromCache(pid)
  1371. if !p.jvmAttachOnce {
  1372. p.jvmAttachOnce = true
  1373. rootfs := c.getRootfs()
  1374. // TODO java Aot
  1375. if codeType.IsJvmCode() {
  1376. // check version
  1377. libjavaso, err := utils.GetSoPath(pid, "libjava.so", rootfs)
  1378. if err != nil {
  1379. klog.WithError(err).Errorf("[attach] Failed get so path")
  1380. return err
  1381. }
  1382. v, err := ebpftracer.GetJvmVersion(libjavaso)
  1383. if err != nil {
  1384. klog.WithError(err).Errorf("[attach] Failed get Java version")
  1385. return err
  1386. }
  1387. c.AppInfo.Version = v
  1388. major, minor, patch, err := ebpftracer.ParseVersion(v)
  1389. klog.Infof("[attach] version: %s (Major: %d, Minor: %d, Patch: %d)", v, major, minor, patch)
  1390. if major != 1 || minor != 8 {
  1391. klog.Errorf("[attach] Unsupported Java version.")
  1392. return fmt.Errorf("[attach] Unsupported Java version")
  1393. }
  1394. }
  1395. /*libNioProbes, err := tracer.AttachJavaNioReadUprobes(pid, codeType, rootfs)
  1396. if err != nil {
  1397. klog.Error(err)
  1398. return err
  1399. }
  1400. p.uprobes = append(p.uprobes, libNioProbes...)*/
  1401. libNetProbes, err := tracer.AttachJavaNetWriteUprobes(pid, rootfs)
  1402. if err != nil {
  1403. klog.Error(err)
  1404. return err
  1405. }
  1406. p.uprobes = append(p.uprobes, libNetProbes...)
  1407. //p.jvmUprobesChecked = true
  1408. c.l7AttachSuccess()
  1409. err = tracer.InitKProcInfo(pid, &c.AppInfo)
  1410. if err != nil {
  1411. klog.Errorf("[attach] InitKProcInfo failed pid:[%d] ;%s.", err.Error(), pid)
  1412. } else {
  1413. klog.Infof("[attach] InitKProcInfo succeed! pid:[%d]", pid)
  1414. }
  1415. } else {
  1416. klog.Infof("[attach] %s-%d already attach status:%v", codeType.String(), pid, c.Isl7AttachSuccess())
  1417. }
  1418. return nil
  1419. }
  1420. func (c *Container) attachJavaAotUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1421. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1422. return nil
  1423. }
  1424. p := c.processes[pid]
  1425. if p == nil {
  1426. return nil
  1427. }
  1428. codeType := c.GetCodeTypeFromCache(pid)
  1429. if !p.jvmAttachOnce {
  1430. p.jvmAttachOnce = true
  1431. rootfs := c.getRootfs()
  1432. // // TODO java Aot
  1433. // if codeType.IsJavaAotCode() {
  1434. // // check version
  1435. // libjavaso, err := utils.GetSoPath(pid, "libjava.so", rootfs)
  1436. // if err != nil {
  1437. // klog.WithError(err).Errorf("[attach] Failed get so path")
  1438. // return err
  1439. // }
  1440. // v, err := ebpftracer.GetJvmVersion(libjavaso)
  1441. // if err != nil {
  1442. // klog.WithError(err).Errorf("[attach] Failed get Java version")
  1443. // return err
  1444. // }
  1445. // c.AppInfo.Version = v
  1446. // major, minor, patch, err := ebpftracer.ParseVersion(v)
  1447. // klog.Infof("[attach] version: %s (Major: %d, Minor: %d, Patch: %d)", v, major, minor, patch)
  1448. // if major != 1 || minor != 8 {
  1449. // return fmt.Errorf("[attach] Unsupported Java version")
  1450. // }
  1451. // }
  1452. libNioProbes, err := tracer.AttachJavaAotNioReadUprobes(pid, codeType, rootfs)
  1453. if err != nil {
  1454. klog.Error(err)
  1455. return err
  1456. }
  1457. p.uprobes = append(p.uprobes, libNioProbes...)
  1458. libNetProbes, err := tracer.AttachJavaAotNetWriteUprobes(pid, rootfs)
  1459. if err != nil {
  1460. klog.Error(err)
  1461. return err
  1462. }
  1463. p.uprobes = append(p.uprobes, libNetProbes...)
  1464. //p.jvmUprobesChecked = true
  1465. c.l7AttachSuccess()
  1466. tracer.InitKProcInfo(pid, &c.AppInfo)
  1467. } else {
  1468. klog.Infof("[attach] %s-%d already attach", codeType.String(), pid)
  1469. }
  1470. return nil
  1471. }
  1472. func (c *Container) errorClose(pid uint32, closeType int) {
  1473. p := c.processes[pid]
  1474. if p != nil {
  1475. p.DynamicClose(closeType)
  1476. }
  1477. }
  1478. func (c *Container) attachNetCoreUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1479. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1480. return nil
  1481. }
  1482. p := c.processes[pid]
  1483. if p == nil {
  1484. return nil
  1485. }
  1486. if !p.jvmAttachOnce {
  1487. p.jvmAttachOnce = true
  1488. //codeType := c.GetCodeTypeFromCache(pid)
  1489. tracer.InitKProcInfo(pid, &c.AppInfo)
  1490. p.uprobes = append(p.uprobes, tracer.AttachNetCoreNetThreadUprobes(pid)...)
  1491. readProbes, err := tracer.AttachNetCoreNetReadUprobes(pid)
  1492. if err != nil {
  1493. klog.Error(err)
  1494. return err
  1495. }
  1496. p.uprobes = append(p.uprobes, readProbes...)
  1497. WriteProbes, err := tracer.AttachNetCoreNetWriteUprobes(pid)
  1498. if err != nil {
  1499. klog.Error(err)
  1500. return err
  1501. }
  1502. p.uprobes = append(p.uprobes, WriteProbes...)
  1503. c.l7AttachSuccess()
  1504. }
  1505. return nil
  1506. }
  1507. func resolveFd(pid uint32, fd uint64) (mntId string, logPath string) {
  1508. info := proc.GetFdInfo(pid, fd)
  1509. if info == nil {
  1510. return
  1511. }
  1512. switch {
  1513. case info.Flags&os.O_WRONLY == 0 && info.Flags&os.O_RDWR == 0,
  1514. !strings.HasPrefix(info.Dest, "/"),
  1515. strings.HasPrefix(info.Dest, "/proc/"),
  1516. strings.HasPrefix(info.Dest, "/dev/"),
  1517. strings.HasPrefix(info.Dest, "/sys/"),
  1518. strings.HasSuffix(info.Dest, "(deleted)"):
  1519. return
  1520. }
  1521. mntId = info.MntId
  1522. if info.Flags&os.O_WRONLY != 0 && strings.HasPrefix(info.Dest, "/var/log/") &&
  1523. !strings.HasPrefix(info.Dest, "/var/log/pods/") &&
  1524. !strings.HasPrefix(info.Dest, "/var/log/containers/") &&
  1525. !strings.HasPrefix(info.Dest, "/var/log/journal/") {
  1526. logPath = info.Dest
  1527. }
  1528. return
  1529. }
  1530. func counter(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1531. return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labelValues...)
  1532. }
  1533. func gauge(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1534. return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...)
  1535. }