container.go 46 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693
  1. package containers
  2. import (
  3. debugelf "debug/elf"
  4. "fmt"
  5. "os"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/coroot/coroot-node-agent/utils"
  12. . "github.com/coroot/coroot-node-agent/utils/modelse"
  13. "github.com/coroot/coroot-node-agent/cgroup"
  14. "github.com/coroot/coroot-node-agent/common"
  15. "github.com/coroot/coroot-node-agent/ebpftracer"
  16. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  18. "github.com/coroot/coroot-node-agent/flags"
  19. "github.com/coroot/coroot-node-agent/logs"
  20. "github.com/coroot/coroot-node-agent/node"
  21. "github.com/coroot/coroot-node-agent/pinger"
  22. "github.com/coroot/coroot-node-agent/proc"
  23. "github.com/coroot/coroot-node-agent/tracing"
  24. "github.com/coroot/logparser"
  25. "github.com/prometheus/client_golang/prometheus"
  26. klog "github.com/sirupsen/logrus"
  27. "github.com/vishvananda/netns"
  28. "golang.org/x/exp/maps"
  29. "inet.af/netaddr"
  30. )
  31. var (
  32. gcInterval = 1 * time.Minute
  33. AllAppInfoInterval = 1 * time.Minute
  34. RegisterAppInterval = 1 * time.Minute
  35. RegisterHostInterval = 10 * time.Minute
  36. pingTimeout = 300 * time.Millisecond
  37. )
  38. type ContainerID string
  39. type ContainerNetwork struct {
  40. NetworkID string
  41. }
  42. type ContainerMetadata struct {
  43. name string
  44. labels map[string]string
  45. volumes map[string]string
  46. logPath string
  47. image string
  48. logDecoder logparser.Decoder
  49. hostListens map[string][]netaddr.IPPort
  50. networks map[string]ContainerNetwork
  51. env map[string]string
  52. systemdTriggeredBy string
  53. rootfs string
  54. }
  55. type Delays struct {
  56. cpu time.Duration
  57. disk time.Duration
  58. }
  59. type LogParser struct {
  60. parser *logparser.Parser
  61. stop func()
  62. }
  63. func (p *LogParser) Stop() {
  64. if p.stop != nil {
  65. p.stop()
  66. }
  67. p.parser.Stop()
  68. }
  69. type AddrPair struct {
  70. src netaddr.IPPort
  71. dst netaddr.IPPort
  72. }
  73. type ActiveConnection struct {
  74. Dest netaddr.IPPort
  75. Src netaddr.IPPort
  76. ActualDest netaddr.IPPort
  77. Pid uint32
  78. Fd uint64
  79. Timestamp uint64
  80. Closed time.Time
  81. Retransmissions uint64
  82. BytesSent uint64
  83. PerBytesSent uint64
  84. BytesReceived uint64
  85. PerBytesReceived uint64
  86. ConEstTime time.Duration
  87. FirstReadTime uint64
  88. FirstWriteTime uint64
  89. NewReadTime uint64
  90. http2Parser *l7.Http2Parser
  91. postgresParser *l7.PostgresParser
  92. mysqlParser *l7.MysqlParser
  93. dmParser *l7.DmParser
  94. cassandraParser *l7.CassandraParser
  95. }
  96. type ActiveAccept struct {
  97. Dest netaddr.IPPort
  98. Src netaddr.IPPort
  99. Pid uint32
  100. Fd uint64
  101. Timestamp uint64
  102. Closed time.Time
  103. BytesSent uint64
  104. BytesReceived uint64
  105. }
  106. type ListenDetails struct {
  107. ClosedAt time.Time
  108. NsIPs []netaddr.IP
  109. }
  110. type PidFd struct {
  111. Pid uint32
  112. Fd uint64
  113. }
  114. type K8sContainer struct {
  115. ns string
  116. podName string
  117. podId string
  118. workload string
  119. containerName string
  120. pid string
  121. }
  122. type ConnectionStats struct {
  123. Count uint64
  124. TotalTime time.Duration
  125. Retransmissions uint64
  126. BytesSent uint64
  127. PerBytesSent uint64
  128. BytesReceived uint64
  129. PerBytesReceived uint64
  130. Src netaddr.IPPort
  131. ConEstTime time.Duration
  132. FirstReadTime uint64
  133. FirstWriteTime uint64
  134. NewReadTime uint64
  135. }
  136. type AcceptStats struct {
  137. BytesSent uint64
  138. BytesReceived uint64
  139. }
  140. type Container struct {
  141. id ContainerID
  142. cgroup *cgroup.Cgroup
  143. metadata *ContainerMetadata
  144. K8sContainer
  145. processes map[uint32]*Process
  146. startedAt time.Time
  147. zombieAt time.Time
  148. restarts int
  149. delays Delays
  150. delaysByPid map[uint32]Delays
  151. delaysLock sync.Mutex
  152. listens map[netaddr.IPPort]map[uint32]*ListenDetails
  153. connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count
  154. // connectsFailed map[netaddr.IPPort]int64 // dst -> count
  155. connectsFailed map[AddrPair]int64
  156. connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  157. connectionsActive map[AddrPair]*ActiveConnection
  158. connectionsByPidFd map[PidFd]*ActiveConnection
  159. acceptsSuccessful map[AddrPair]*AcceptStats
  160. acceptLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  161. acceptsActive map[AddrPair]*ActiveAccept
  162. acceptsByPidFd map[PidFd]*ActiveAccept
  163. l7Stats L7Stats
  164. dnsStats *L7Metrics
  165. oomKills int
  166. pythonThreadLockWaitTime time.Duration
  167. mounts map[string]proc.MountInfo
  168. logParsers map[string]*LogParser
  169. hostConntrack *Conntrack
  170. nsConntrack *Conntrack
  171. lbConntracks []*Conntrack
  172. registry *Registry
  173. lock sync.RWMutex
  174. done chan struct{}
  175. traceMap map[uint64]*tracing.Trace
  176. //instanceID utils.ID
  177. Symbols []debugelf.Symbol
  178. Uprobes []tracer.Uprobe
  179. UprobesMap map[string]tracer.Uprobe
  180. l7EventReady bool
  181. l7Attach bool
  182. // 白名单详情
  183. WhiteSettingInfo WhiteSettingInfo
  184. // 应用详情
  185. AppInfo AppInfo
  186. RegTime int
  187. }
  188. func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) {
  189. netNs, err := proc.GetNetNs(pid)
  190. if err != nil {
  191. return nil, err
  192. }
  193. defer netNs.Close()
  194. c := &Container{
  195. id: id,
  196. cgroup: cg,
  197. metadata: md,
  198. processes: map[uint32]*Process{},
  199. delaysByPid: map[uint32]Delays{},
  200. listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
  201. connectsSuccessful: map[AddrPair]*ConnectionStats{},
  202. // connectsFailed: map[netaddr.IPPort]int64{},
  203. connectsFailed: map[AddrPair]int64{},
  204. connectLastAttempt: map[netaddr.IPPort]time.Time{},
  205. connectionsActive: map[AddrPair]*ActiveConnection{},
  206. connectionsByPidFd: map[PidFd]*ActiveConnection{},
  207. acceptsSuccessful: map[AddrPair]*AcceptStats{},
  208. acceptLastAttempt: map[netaddr.IPPort]time.Time{},
  209. acceptsActive: map[AddrPair]*ActiveAccept{},
  210. acceptsByPidFd: map[PidFd]*ActiveAccept{},
  211. l7Stats: L7Stats{},
  212. dnsStats: &L7Metrics{},
  213. mounts: map[string]proc.MountInfo{},
  214. logParsers: map[string]*LogParser{},
  215. hostConntrack: hostConntrack,
  216. done: make(chan struct{}),
  217. traceMap: make(map[uint64]*tracing.Trace),
  218. registry: registry,
  219. }
  220. for _, n := range md.networks {
  221. if nsHandle := FindNetworkLoadBalancerNs(n.NetworkID); nsHandle.IsOpen() {
  222. if ct, err := NewConntrack(nsHandle); err != nil {
  223. klog.Warningln(err)
  224. } else {
  225. c.lbConntracks = append(c.lbConntracks, ct)
  226. }
  227. _ = nsHandle.Close()
  228. }
  229. }
  230. c.runLogParser("")
  231. // go func() {
  232. // ticker := time.NewTicker(gcInterval)
  233. // defer ticker.Stop()
  234. // for {
  235. // select {
  236. // case <-c.done:
  237. // return
  238. // case t := <-ticker.C:
  239. // c.gc(t)
  240. // }
  241. // }
  242. // }()
  243. return c, nil
  244. }
  245. func (c *Container) Close() {
  246. for _, p := range c.logParsers {
  247. p.Stop()
  248. }
  249. for _, ct := range c.lbConntracks {
  250. _ = ct.Close()
  251. }
  252. if c.nsConntrack != nil {
  253. _ = c.nsConntrack.Close()
  254. }
  255. close(c.done)
  256. }
  257. func (c *Container) Dead(now time.Time) bool {
  258. return !c.zombieAt.IsZero() && now.Sub(c.zombieAt) > gcInterval
  259. }
  260. func (c *Container) Describe(ch chan<- *prometheus.Desc) {
  261. // some fixed metric description is required here to register/unregister the collector correctly
  262. ch <- prometheus.NewDesc("container", "", nil, nil)
  263. }
  264. func (c *Container) Collect(ch chan<- prometheus.Metric) {
  265. c.registry.updateTrafficStatsIfNecessary()
  266. c.lock.RLock()
  267. defer c.lock.RUnlock()
  268. if c.metadata.image != "" || c.metadata.systemdTriggeredBy != "" {
  269. ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image, c.metadata.systemdTriggeredBy)
  270. }
  271. ch <- counter(metrics.Restarts, float64(c.restarts))
  272. if cpu, err := c.cgroup.CpuStat(); err == nil {
  273. if cpu.LimitCores > 0 {
  274. ch <- gauge(metrics.CPULimit, cpu.LimitCores)
  275. }
  276. ch <- counter(metrics.CPUUsage, cpu.UsageSeconds)
  277. ch <- counter(metrics.ThrottledTime, cpu.ThrottledTimeSeconds)
  278. }
  279. if taskstatsClient != nil {
  280. c.updateDelays()
  281. ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second))
  282. ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second))
  283. }
  284. if s, err := c.cgroup.MemoryStat(); err == nil {
  285. ch <- gauge(metrics.MemoryRss, float64(s.RSS))
  286. ch <- gauge(metrics.MemoryCache, float64(s.Cache))
  287. if s.Limit > 0 {
  288. ch <- gauge(metrics.MemoryLimit, float64(s.Limit))
  289. }
  290. }
  291. if c.oomKills > 0 {
  292. ch <- counter(metrics.OOMKills, float64(c.oomKills))
  293. }
  294. if disks, err := node.GetDisks(); err == nil {
  295. ioStat, _ := c.cgroup.IOStat()
  296. for majorMinor, mounts := range c.getMounts() {
  297. dev := disks.GetParentBlockDevice(majorMinor)
  298. if dev == nil {
  299. continue
  300. }
  301. for mountPoint, fsStat := range mounts {
  302. dls := []string{mountPoint, dev.Name, c.metadata.volumes[mountPoint]}
  303. ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...)
  304. ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...)
  305. ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...)
  306. if io, ok := ioStat[majorMinor]; ok {
  307. ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...)
  308. ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...)
  309. ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...)
  310. ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...)
  311. }
  312. }
  313. }
  314. }
  315. for addr, open := range c.getListens() {
  316. ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "")
  317. }
  318. for proxy, addrs := range c.getProxiedListens() {
  319. for addr := range addrs {
  320. ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy)
  321. }
  322. }
  323. strInstanceID := strconv.FormatInt(c.AppInfo.InstanceIdHash.IntVal, 10)
  324. strAppId := strconv.FormatInt(c.AppInfo.AppIdHash.IntVal, 10)
  325. strAppName := c.AppInfo.AppName
  326. apps := c.registry.GetAllAppInfoFromServer()
  327. for d, stats := range c.connectsSuccessful {
  328. targetAppId := apps[d.dst.String()]
  329. ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  330. ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  331. stats.Count = 0
  332. // if stats.Retransmissions > 0 {
  333. // ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  334. // }
  335. // ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  336. // ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  337. // ch <- counter(metrics.NetBytesSentPer, float64(stats.PerBytesSent), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  338. // ch <- counter(metrics.NetBytesReceivedPer, float64(stats.PerBytesReceived), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  339. // ch <- counter(metrics.NetDataLatency, float64(stats.FirstReadTime-stats.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  340. // ch <- counter(metrics.NetDataDuration, float64(stats.NewReadTime-stats.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  341. // ch <- counter(metrics.NetEstTime, float64(stats.ConEstTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  342. // klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime)
  343. // stats.PerBytesReceived = 0
  344. // stats.PerBytesSent = 0
  345. }
  346. // for d, stats := range c.acceptsSuccessful {
  347. // ch <- counter(metrics.NetAcceptsSuccessful, float64(0), d.src.String(), d.dst.String())
  348. // ch <- counter(metrics.NetAcceptBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String())
  349. // ch <- counter(metrics.NetAcceptBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String())
  350. // klog.Infof("c.acceptsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d", d.src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived)
  351. // }
  352. // for dst, count := range c.connectsFailed {
  353. // targetAppId := apps[dst.String()]
  354. // ch <- counter(metrics.NetConnectionsFailed, float64(count), strInstanceID, strAppId, targetAppId, strAppName, dst.String())
  355. // }
  356. for addrPair, count := range c.connectsFailed {
  357. targetAppId := apps[addrPair.dst.String()]
  358. ch <- counter(metrics.NetConnectionsFailed, float64(count), strInstanceID, strAppId, targetAppId, strAppName, addrPair.src.String(), addrPair.dst.String())
  359. c.connectsFailed[addrPair] = 0
  360. }
  361. connections := map[AddrPair]int{}
  362. for addrPair, conn := range c.connectionsActive {
  363. targetAppId := apps[conn.Dest.String()]
  364. if conn.Retransmissions > 0 {
  365. ch <- counter(metrics.NetRetransmits, float64(conn.Retransmissions), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  366. conn.Retransmissions = 0
  367. }
  368. ch <- counter(metrics.NetBytesSent, float64(conn.BytesSent), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  369. ch <- counter(metrics.NetBytesReceived, float64(conn.BytesReceived), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  370. ch <- counter(metrics.NetBytesSentPer, float64(conn.PerBytesSent), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  371. ch <- counter(metrics.NetBytesReceivedPer, float64(conn.PerBytesReceived), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  372. ch <- counter(metrics.NetDataLatency, float64(conn.FirstReadTime-conn.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  373. ch <- counter(metrics.NetDataDuration, float64(conn.NewReadTime-conn.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  374. ch <- counter(metrics.NetEstTime, float64(conn.ConEstTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  375. // klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime)
  376. conn.PerBytesReceived = 0
  377. conn.PerBytesSent = 0
  378. if !conn.Closed.IsZero() {
  379. continue
  380. }
  381. connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest}]++
  382. }
  383. for d, count := range connections {
  384. ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
  385. }
  386. for source, p := range c.logParsers {
  387. for _, c := range p.parser.GetCounters() {
  388. ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
  389. }
  390. }
  391. appTypes := map[string]struct{}{}
  392. seenJvms := map[string]bool{}
  393. seenDotNetApps := map[string]bool{}
  394. pids := maps.Keys(c.processes)
  395. sort.Slice(pids, func(i, j int) bool {
  396. return pids[i] < pids[j]
  397. })
  398. for _, pid := range pids {
  399. process := c.processes[pid]
  400. cmdline := proc.GetCmdline(pid)
  401. if len(cmdline) == 0 {
  402. continue
  403. }
  404. appType := guessApplicationType(cmdline)
  405. if appType != "" {
  406. appTypes[appType] = struct{}{}
  407. }
  408. if process.isGolangApp {
  409. appTypes["golang"] = struct{}{}
  410. }
  411. switch {
  412. case isJvm(cmdline):
  413. jvm, jMetrics := jvmMetrics(pid)
  414. if len(jMetrics) > 0 && !seenJvms[jvm] {
  415. seenJvms[jvm] = true
  416. for _, m := range jMetrics {
  417. ch <- m
  418. }
  419. }
  420. case process.dotNetMonitor != nil:
  421. appTypes["dotnet"] = struct{}{}
  422. appName := process.dotNetMonitor.AppName()
  423. if !seenDotNetApps[appName] {
  424. seenDotNetApps[appName] = true
  425. process.dotNetMonitor.Collect(ch)
  426. }
  427. }
  428. }
  429. for appType := range appTypes {
  430. ch <- gauge(metrics.ApplicationType, 1, appType)
  431. }
  432. if c.pythonThreadLockWaitTime > 0 {
  433. ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds())
  434. }
  435. if c.dnsStats.Requests != nil {
  436. c.dnsStats.Requests.Collect(ch)
  437. }
  438. if c.dnsStats.Latency != nil {
  439. c.dnsStats.Latency.Collect(ch)
  440. }
  441. c.l7Stats.collect(ch)
  442. if !*flags.DisablePinger {
  443. for ip, rtt := range c.ping() {
  444. ch <- gauge(metrics.NetLatency, rtt, ip.String())
  445. }
  446. }
  447. c.gc(time.Now())
  448. }
  449. func (c *Container) onProcessStart(pid uint32) *Process {
  450. c.lock.Lock()
  451. defer c.lock.Unlock()
  452. stats, err := TaskstatsPID(pid)
  453. if err != nil {
  454. klog.WithError(err).Debugf("Failed onProcessStart [%d]", pid)
  455. return nil
  456. }
  457. c.zombieAt = time.Time{}
  458. p := NewProcess(pid, stats, c.registry.tracer)
  459. if p == nil {
  460. return nil
  461. }
  462. c.processes[pid] = p
  463. if c.startedAt.IsZero() {
  464. c.startedAt = stats.BeginTime
  465. } else {
  466. min := stats.BeginTime
  467. for _, p := range c.processes {
  468. if p.StartedAt.Before(min) {
  469. min = p.StartedAt
  470. }
  471. }
  472. if min.After(c.startedAt) {
  473. c.restarts++
  474. c.startedAt = min
  475. }
  476. }
  477. return p
  478. }
  479. func (c *Container) onProcessExit(pid uint32, oomKill bool) {
  480. c.lock.Lock()
  481. defer c.lock.Unlock()
  482. if p := c.processes[pid]; p != nil {
  483. p.Close()
  484. }
  485. delete(c.processes, pid)
  486. if len(c.processes) == 0 {
  487. c.zombieAt = time.Now()
  488. }
  489. delete(c.delaysByPid, pid)
  490. if oomKill {
  491. c.oomKills++
  492. }
  493. }
  494. func (c *Container) onFileOpen(pid uint32, fd uint64) {
  495. mntId, logPath := resolveFd(pid, fd)
  496. func() {
  497. if mntId == "" {
  498. return
  499. }
  500. c.lock.Lock()
  501. _, ok := c.mounts[mntId]
  502. c.lock.Unlock()
  503. if ok {
  504. return
  505. }
  506. byMountId := proc.GetMountInfo(pid)
  507. if byMountId == nil {
  508. return
  509. }
  510. if mi, ok := byMountId[mntId]; ok {
  511. c.lock.Lock()
  512. c.mounts[mntId] = mi
  513. c.lock.Unlock()
  514. }
  515. }()
  516. if logPath != "" {
  517. c.lock.Lock()
  518. c.runLogParser(logPath)
  519. c.lock.Unlock()
  520. }
  521. }
  522. // set
  523. func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
  524. klog.Infof("TCP listen open pid=%d id=%s addr=%s", pid, c.id, addr)
  525. if common.PortFilter.ShouldBeSkipped(addr.Port()) {
  526. return
  527. }
  528. if !safe {
  529. c.lock.Lock()
  530. defer c.lock.Unlock()
  531. }
  532. if _, ok := c.listens[addr]; !ok {
  533. c.listens[addr] = map[uint32]*ListenDetails{}
  534. }
  535. details := &ListenDetails{}
  536. c.listens[addr][pid] = details
  537. if addr.IP().IsUnspecified() {
  538. ns, err := proc.GetNetNs(pid)
  539. if err != nil {
  540. if !common.IsNotExist(err) {
  541. klog.Warningln(err)
  542. }
  543. return
  544. }
  545. defer ns.Close()
  546. ips, err := proc.GetNsIps(ns)
  547. if err != nil {
  548. klog.Warningln(err)
  549. return
  550. }
  551. //klog.Infof("got IPs %s for %s", ips, ns.UniqueId())
  552. details.NsIPs = ips
  553. }
  554. }
  555. func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
  556. klog.Infof("TCP listen close pid=%d id=%s addr=%s", pid, c.id, addr)
  557. c.lock.Lock()
  558. defer c.lock.Unlock()
  559. if _, byAddr := c.listens[addr]; byAddr {
  560. if _, byPid := c.listens[addr][pid]; byPid {
  561. if details := c.listens[addr][pid]; details != nil {
  562. details.ClosedAt = time.Now()
  563. }
  564. }
  565. }
  566. }
  567. func (c *Container) onAcceptOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  568. //klog.Debugf("accept pid=%d id=%s dstaddr=%s srcaddr=%s", pid, c.id, dst.IP(), src.IP())
  569. // if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  570. // return
  571. // }
  572. p := c.processes[pid]
  573. if p == nil {
  574. return
  575. }
  576. if dst.IP().IsLoopback() && !p.isHostNs() {
  577. return
  578. }
  579. // actualDst, err := c.getActualDestination(p, src, dst)
  580. // if err != nil {
  581. // if !common.IsNotExist(err) {
  582. // klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  583. // }
  584. // return
  585. // }
  586. // switch {
  587. // case actualDst == nil:
  588. // actualDst = &dst
  589. // case actualDst.IP().IsLoopback() && !p.isHostNs():
  590. // return
  591. // }
  592. // if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  593. // return
  594. // }
  595. c.lock.Lock()
  596. defer c.lock.Unlock()
  597. if !failed {
  598. key := AddrPair{src: dst, dst: src}
  599. stats := c.acceptsSuccessful[key]
  600. if stats == nil {
  601. stats = &AcceptStats{}
  602. c.acceptsSuccessful[key] = stats
  603. }
  604. acceptCon := &ActiveAccept{
  605. Dest: src,
  606. Src: dst,
  607. Pid: pid,
  608. Fd: fd,
  609. Timestamp: timestamp,
  610. }
  611. c.acceptsActive[AddrPair{src: dst, dst: src}] = acceptCon
  612. c.acceptsByPidFd[PidFd{Pid: pid, Fd: fd}] = acceptCon
  613. }
  614. c.acceptLastAttempt[dst] = time.Now()
  615. }
  616. func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  617. if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  618. return
  619. }
  620. p := c.processes[pid]
  621. if p == nil {
  622. return
  623. }
  624. if dst.IP().IsLoopback() && !p.isHostNs() {
  625. return
  626. }
  627. actualDst, err := c.getActualDestination(p, src, dst)
  628. if err != nil {
  629. if !common.IsNotExist(err) {
  630. klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  631. }
  632. return
  633. }
  634. switch {
  635. case actualDst == nil:
  636. actualDst = &dst
  637. case actualDst.IP().IsLoopback() && !p.isHostNs():
  638. return
  639. }
  640. if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  641. return
  642. }
  643. c.lock.Lock()
  644. defer c.lock.Unlock()
  645. if failed {
  646. key := AddrPair{src: dst, dst: *actualDst}
  647. c.connectsFailed[key]++
  648. } else {
  649. key := AddrPair{src: dst, dst: *actualDst}
  650. stats := c.connectsSuccessful[key]
  651. if stats == nil {
  652. stats = &ConnectionStats{}
  653. c.connectsSuccessful[key] = stats
  654. }
  655. stats.Count++
  656. stats.TotalTime += duration
  657. stats.Src = src
  658. stats.ConEstTime = duration
  659. connection := &ActiveConnection{
  660. Dest: dst,
  661. Src: src,
  662. ActualDest: *actualDst,
  663. Pid: pid,
  664. Fd: fd,
  665. Timestamp: timestamp,
  666. ConEstTime: duration,
  667. }
  668. c.connectionsActive[AddrPair{src: src, dst: dst}] = connection
  669. c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection
  670. }
  671. c.connectLastAttempt[dst] = time.Now()
  672. }
  673. func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) {
  674. if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil {
  675. return actualDst, nil
  676. }
  677. for _, lb := range c.lbConntracks {
  678. if actualDst := lb.GetActualDestination(src, dst); actualDst != nil {
  679. return actualDst, nil
  680. }
  681. }
  682. actualDst := c.hostConntrack.GetActualDestination(src, dst)
  683. if actualDst != nil {
  684. return actualDst, nil
  685. }
  686. if !p.isHostNs() {
  687. if c.nsConntrack == nil {
  688. netNs, err := proc.GetNetNs(p.Pid)
  689. if err != nil {
  690. return nil, err
  691. }
  692. defer netNs.Close()
  693. c.nsConntrack, err = NewConntrack(netNs)
  694. if err != nil {
  695. return nil, err
  696. }
  697. }
  698. return c.nsConntrack.GetActualDestination(src, dst), nil
  699. }
  700. return nil, nil
  701. }
  702. func (c *Container) onConnectionClose(e ebpftracer.Event) {
  703. c.lock.Lock()
  704. conn := c.connectionsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  705. c.lock.Unlock()
  706. if conn != nil {
  707. if conn.Closed.IsZero() {
  708. if e.TrafficStats != nil {
  709. c.lock.Lock()
  710. c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived, e.FirstReadTime, e.FirstWriteTime, e.NewReadTime)
  711. c.lock.Unlock()
  712. }
  713. conn.Closed = time.Now()
  714. }
  715. }
  716. }
  717. func (c *Container) onAcceptClose(e ebpftracer.Event) {
  718. c.lock.Lock()
  719. conn := c.acceptsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  720. c.lock.Unlock()
  721. if conn != nil {
  722. if conn.Closed.IsZero() {
  723. if e.TrafficStats != nil {
  724. c.lock.Lock()
  725. c.updateAcceptTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived)
  726. c.lock.Unlock()
  727. }
  728. conn.Closed = time.Now()
  729. }
  730. }
  731. }
  732. func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) {
  733. if u == nil {
  734. return
  735. }
  736. c.lock.Lock()
  737. defer c.lock.Unlock()
  738. c.updateConnectionTrafficStats(c.connectionsByPidFd[PidFd{Pid: u.Pid, Fd: u.FD}], u.BytesSent, u.BytesReceived, 0, 0, 0)
  739. }
  740. func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, received, firstreadtime, firstwritetime, newreadtime uint64) {
  741. if ac == nil {
  742. return
  743. }
  744. key := AddrPair{src: ac.Dest, dst: ac.ActualDest}
  745. stats := c.connectsSuccessful[key]
  746. if stats == nil {
  747. stats = &ConnectionStats{}
  748. c.connectsSuccessful[key] = stats
  749. }
  750. if sent > ac.BytesSent {
  751. stats.BytesSent += sent - ac.BytesSent
  752. stats.PerBytesSent = sent - ac.BytesSent
  753. ac.PerBytesSent = sent - ac.BytesSent
  754. }
  755. if received > ac.BytesReceived {
  756. stats.BytesReceived += received - ac.BytesReceived
  757. stats.PerBytesReceived = received - ac.BytesReceived
  758. ac.PerBytesReceived = received - ac.BytesReceived
  759. }
  760. if firstreadtime != 0 && firstwritetime != 0 && newreadtime != 0 {
  761. stats.FirstReadTime = firstreadtime
  762. stats.FirstWriteTime = firstwritetime
  763. stats.NewReadTime = newreadtime
  764. ac.FirstReadTime = firstreadtime
  765. ac.FirstWriteTime = firstwritetime
  766. ac.NewReadTime = newreadtime
  767. }
  768. ac.BytesSent = sent
  769. ac.BytesReceived = received
  770. }
  771. func (c *Container) updateAcceptTrafficStats(ac *ActiveAccept, sent, received uint64) {
  772. if ac == nil {
  773. return
  774. }
  775. //klog.Infoln("TCP onConnectionClose5", ac.BytesSent, ac.BytesReceived, ac)
  776. key := AddrPair{src: ac.Src, dst: ac.Dest}
  777. stats := c.acceptsSuccessful[key]
  778. if stats == nil {
  779. stats = &AcceptStats{}
  780. c.acceptsSuccessful[key] = stats
  781. }
  782. if sent > ac.BytesSent {
  783. stats.BytesSent += sent - ac.BytesSent
  784. }
  785. if received > ac.BytesReceived {
  786. stats.BytesReceived += received - ac.BytesReceived
  787. }
  788. ac.BytesSent = sent
  789. ac.BytesReceived = received
  790. }
  791. func (c *Container) onDNSRequest(r *l7.RequestData) (map[netaddr.IP]string, string, string, uint32, []netaddr.IP) {
  792. status := r.Status.DNS()
  793. if status == "" {
  794. return nil, "", "", 0, nil
  795. }
  796. t, fqdn, ttl, ips := l7.ParseDns(r.Payload)
  797. if t == "" {
  798. return nil, "", "", 0, nil
  799. }
  800. if c.dnsStats.Requests == nil {
  801. dnsReq := L7Requests[l7.ProtocolDNS]
  802. c.dnsStats.Requests = prometheus.NewCounterVec(
  803. prometheus.CounterOpts{Name: dnsReq.Name, Help: dnsReq.Help},
  804. []string{"request_type", "domain", "status"},
  805. )
  806. }
  807. if m, _ := c.dnsStats.Requests.GetMetricWithLabelValues(t, fqdn, status); m != nil {
  808. m.Inc()
  809. }
  810. if r.Duration != 0 {
  811. if c.dnsStats.Latency == nil {
  812. dnsLatency := L7Latency[l7.ProtocolDNS]
  813. c.dnsStats.Latency = prometheus.NewHistogram(prometheus.HistogramOpts{Name: dnsLatency.Name, Help: dnsLatency.Help})
  814. }
  815. c.dnsStats.Latency.Observe(r.Duration.Seconds())
  816. }
  817. ip2fqdn := map[netaddr.IP]string{}
  818. if fqdn != "" {
  819. for _, ip := range ips {
  820. ip2fqdn[ip] = fqdn
  821. }
  822. }
  823. return ip2fqdn, t, fqdn, ttl, ips
  824. }
  825. func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  826. c.lock.Lock()
  827. defer c.lock.Unlock()
  828. if r.Protocol == l7.ProtocolDNS {
  829. //return c.onDNSRequest(r)
  830. }
  831. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  832. if conn == nil {
  833. return nil
  834. }
  835. if timestamp != 0 && conn.Timestamp != timestamp {
  836. return nil
  837. }
  838. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  839. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  840. switch r.Protocol {
  841. case l7.ProtocolHTTP:
  842. stats.observe(r.Status.Http(), "", r.Duration)
  843. method, path := l7.ParseHttp(r.Payload)
  844. trace.HttpRequest(method, path, r.Status, r.Duration)
  845. case l7.ProtocolHTTP2:
  846. if conn.http2Parser == nil {
  847. conn.http2Parser = l7.NewHttp2Parser()
  848. }
  849. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  850. for _, req := range requests {
  851. stats.observe(req.Status.Http(), "", req.Duration)
  852. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  853. }
  854. case l7.ProtocolPostgres:
  855. if r.Method != l7.MethodStatementClose {
  856. stats.observe(r.Status.String(), "", r.Duration)
  857. }
  858. if conn.postgresParser == nil {
  859. conn.postgresParser = l7.NewPostgresParser()
  860. }
  861. query := conn.postgresParser.Parse(r.Payload)
  862. trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  863. case l7.ProtocolMysql:
  864. if r.Method != l7.MethodStatementClose {
  865. stats.observe(r.Status.String(), "", r.Duration)
  866. }
  867. if conn.mysqlParser == nil {
  868. conn.mysqlParser = l7.NewMysqlParser()
  869. }
  870. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  871. trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  872. case l7.ProtocolMemcached:
  873. stats.observe(r.Status.String(), "", r.Duration)
  874. cmd, items := l7.ParseMemcached(r.Payload)
  875. trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  876. case l7.ProtocolRedis:
  877. stats.observe(r.Status.String(), "", r.Duration)
  878. cmd, args := l7.ParseRedis(r.Payload)
  879. trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  880. case l7.ProtocolMongo:
  881. stats.observe(r.Status.String(), "", r.Duration)
  882. query := l7.ParseMongo(r.Payload)
  883. trace.MongoQuery(query, r.Status.Error(), r.Duration)
  884. case l7.ProtocolKafka, l7.ProtocolCassandra:
  885. stats.observe(r.Status.String(), "", r.Duration)
  886. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  887. stats.observe(r.Status.String(), r.Method.String(), 0)
  888. case l7.ProtocolDubbo2:
  889. stats.observe(r.Status.String(), "", r.Duration)
  890. }
  891. return nil
  892. }
  893. func (c *Container) onRetransmission(srcDst AddrPair) bool {
  894. c.lock.Lock()
  895. defer c.lock.Unlock()
  896. conn, ok := c.connectionsActive[srcDst]
  897. if !ok {
  898. return false
  899. }
  900. key := AddrPair{src: srcDst.dst, dst: conn.ActualDest}
  901. stats := c.connectsSuccessful[key]
  902. if stats == nil {
  903. stats = &ConnectionStats{}
  904. c.connectsSuccessful[key] = stats
  905. }
  906. stats.Retransmissions++
  907. conn.Retransmissions++
  908. return true
  909. }
  910. func (c *Container) updateDelays() {
  911. c.delaysLock.Lock()
  912. defer c.delaysLock.Unlock()
  913. for pid := range c.processes {
  914. stats, err := TaskstatsTGID(pid)
  915. if err != nil {
  916. continue
  917. }
  918. d := c.delaysByPid[pid]
  919. c.delays.cpu += stats.CPUDelay - d.cpu
  920. c.delays.disk += stats.BlockIODelay - d.disk
  921. d.cpu = stats.CPUDelay
  922. d.disk = stats.BlockIODelay
  923. c.delaysByPid[pid] = d
  924. }
  925. }
  926. func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
  927. if len(c.mounts) == 0 {
  928. return nil
  929. }
  930. res := map[string]map[string]*proc.FSStat{}
  931. for _, mi := range c.mounts {
  932. var stat *proc.FSStat
  933. for pid := range c.processes {
  934. s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
  935. if err == nil {
  936. stat = &s
  937. break
  938. }
  939. }
  940. if stat == nil {
  941. continue
  942. }
  943. if _, ok := res[mi.MajorMinor]; !ok {
  944. res[mi.MajorMinor] = map[string]*proc.FSStat{}
  945. }
  946. res[mi.MajorMinor][mi.MountPoint] = stat
  947. }
  948. return res
  949. }
  950. func (c *Container) getListens() map[netaddr.IPPort]int {
  951. res := map[netaddr.IPPort]int{}
  952. for addr, byPid := range c.listens {
  953. open := 0
  954. isHostNs := false
  955. ips := map[netaddr.IP]bool{}
  956. for pid, details := range byPid {
  957. p := c.processes[pid]
  958. if p == nil {
  959. continue
  960. }
  961. if p.isHostNs() {
  962. isHostNs = true
  963. }
  964. if details.ClosedAt.IsZero() {
  965. open = 1
  966. }
  967. for _, ip := range details.NsIPs {
  968. ips[ip] = true
  969. }
  970. }
  971. if !addr.IP().IsUnspecified() {
  972. ips = map[netaddr.IP]bool{addr.IP(): true}
  973. }
  974. for ip := range ips {
  975. if ip.IsLoopback() && !isHostNs {
  976. continue
  977. }
  978. res[netaddr.IPPortFrom(ip, addr.Port())] = open
  979. }
  980. }
  981. return res
  982. }
  983. func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {
  984. if len(c.metadata.hostListens) == 0 {
  985. return nil
  986. }
  987. hasUnspecified := false
  988. for _, addrs := range c.metadata.hostListens {
  989. for _, addr := range addrs {
  990. if addr.IP().IsUnspecified() {
  991. hasUnspecified = true
  992. break
  993. }
  994. }
  995. }
  996. var hostIps []netaddr.IP
  997. if hasUnspecified {
  998. if ns, err := proc.GetHostNetNs(); err != nil {
  999. klog.Warningln(err)
  1000. } else {
  1001. ips, err := proc.GetNsIps(ns)
  1002. _ = ns.Close()
  1003. if err != nil {
  1004. klog.Warningln(err)
  1005. } else {
  1006. hostIps = ips
  1007. }
  1008. }
  1009. }
  1010. res := map[string]map[netaddr.IPPort]struct{}{}
  1011. for proxy, addrs := range c.metadata.hostListens {
  1012. res[proxy] = map[netaddr.IPPort]struct{}{}
  1013. for _, addr := range addrs {
  1014. if addr.IP().IsUnspecified() {
  1015. for _, ip := range hostIps {
  1016. if addr.IP().Is4() && ip.Is4() || addr.IP().Is6() && ip.Is6() {
  1017. res[proxy][netaddr.IPPortFrom(ip, addr.Port())] = struct{}{}
  1018. }
  1019. }
  1020. } else {
  1021. res[proxy][addr] = struct{}{}
  1022. }
  1023. }
  1024. }
  1025. return res
  1026. }
  1027. func (c *Container) ping() map[netaddr.IP]float64 {
  1028. netNs := netns.None()
  1029. for pid := range c.processes {
  1030. if pid == agentPid {
  1031. netNs = selfNetNs
  1032. break
  1033. }
  1034. ns, err := proc.GetNetNs(pid)
  1035. if err != nil {
  1036. if !common.IsNotExist(err) {
  1037. klog.Warningln(err)
  1038. }
  1039. continue
  1040. }
  1041. netNs = ns
  1042. defer netNs.Close()
  1043. break
  1044. }
  1045. if !netNs.IsOpen() {
  1046. return nil
  1047. }
  1048. ips := map[netaddr.IP]struct{}{}
  1049. for d := range c.connectsSuccessful {
  1050. ips[d.dst.IP()] = struct{}{}
  1051. }
  1052. for dst := range c.connectsFailed {
  1053. ips[dst.src.IP()] = struct{}{}
  1054. }
  1055. if len(ips) == 0 {
  1056. return nil
  1057. }
  1058. targets := make([]netaddr.IP, 0, len(ips))
  1059. for ip := range ips {
  1060. if ip.IsLoopback() {
  1061. continue
  1062. }
  1063. if !ip.Is4() { // pinger doesn't support IPv6 yet
  1064. continue
  1065. }
  1066. targets = append(targets, ip)
  1067. }
  1068. rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout)
  1069. if err != nil {
  1070. klog.Warningln(err)
  1071. return nil
  1072. }
  1073. return rtt
  1074. }
  1075. func (c *Container) runLogParser(logPath string) {
  1076. if *flags.DisableLogParsing {
  1077. return
  1078. }
  1079. containerId := string(c.id)
  1080. if logPath != "" {
  1081. if c.logParsers[logPath] != nil {
  1082. return
  1083. }
  1084. ch := make(chan logparser.LogEntry)
  1085. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1086. reader, err := logs.NewTailReader(proc.HostPath(logPath), ch)
  1087. if err != nil {
  1088. klog.Warningln(err)
  1089. parser.Stop()
  1090. return
  1091. }
  1092. klog.Debugln("started varlog logparser", "cg", c.cgroup.Id, "log", logPath)
  1093. c.logParsers[logPath] = &LogParser{parser: parser, stop: reader.Stop}
  1094. return
  1095. }
  1096. switch c.cgroup.ContainerType {
  1097. case cgroup.ContainerTypeSystemdService:
  1098. ch := make(chan logparser.LogEntry)
  1099. if err := JournaldSubscribe(c.cgroup, ch); err != nil {
  1100. klog.Warningln(err)
  1101. return
  1102. }
  1103. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1104. stop := func() {
  1105. JournaldUnsubscribe(c.cgroup)
  1106. }
  1107. klog.Infoln("started journald logparser", "cg", c.cgroup.Id)
  1108. c.logParsers["journald"] = &LogParser{parser: parser, stop: stop}
  1109. case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd, cgroup.ContainerTypeCrio:
  1110. if c.metadata.logPath == "" {
  1111. return
  1112. }
  1113. if parser := c.logParsers["stdout/stderr"]; parser != nil {
  1114. parser.Stop()
  1115. delete(c.logParsers, "stdout/stderr")
  1116. }
  1117. ch := make(chan logparser.LogEntry)
  1118. parser := logparser.NewParser(ch, c.metadata.logDecoder, logs.OtelLogEmitter(containerId))
  1119. reader, err := logs.NewTailReader(proc.HostPath(c.metadata.logPath), ch)
  1120. if err != nil {
  1121. klog.Warningln(err)
  1122. parser.Stop()
  1123. return
  1124. }
  1125. klog.Infoln("started container logparser", "cg", c.cgroup.Id)
  1126. c.logParsers["stdout/stderr"] = &LogParser{parser: parser, stop: reader.Stop}
  1127. }
  1128. }
  1129. func (c *Container) gc(now time.Time) {
  1130. // c.lock.Lock()
  1131. // defer c.lock.Unlock()
  1132. established := map[AddrPair]struct{}{}
  1133. establishedDst := map[netaddr.IPPort]struct{}{}
  1134. listens := map[netaddr.IPPort]string{}
  1135. seenNamespaces := map[string]bool{}
  1136. fdMap := map[uint64]struct{}{}
  1137. for _, p := range c.processes {
  1138. if seenNamespaces[p.NetNsId()] {
  1139. continue
  1140. }
  1141. sockets, err := proc.GetSockets(p.Pid)
  1142. if err != nil {
  1143. continue
  1144. }
  1145. fds, err := proc.ReadFds(p.Pid)
  1146. if err == nil {
  1147. for _, fd := range fds {
  1148. fdMap[fd.Fd] = struct{}{}
  1149. }
  1150. }
  1151. for _, s := range sockets {
  1152. if s.Listen {
  1153. listens[s.SAddr] = s.Inode
  1154. } else {
  1155. established[AddrPair{src: s.SAddr, dst: s.DAddr}] = struct{}{}
  1156. establishedDst[s.DAddr] = struct{}{}
  1157. }
  1158. }
  1159. seenNamespaces[p.NetNsId()] = true
  1160. }
  1161. c.revalidateListens(now, listens)
  1162. for srcDst, conn := range c.connectionsActive {
  1163. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1164. if _, ok := established[srcDst]; !ok {
  1165. delete(c.connectionsActive, srcDst)
  1166. if conn == c.connectionsByPidFd[pidFd] {
  1167. delete(c.connectionsByPidFd, pidFd)
  1168. }
  1169. continue
  1170. }
  1171. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1172. delete(c.connectionsActive, srcDst)
  1173. if conn == c.connectionsByPidFd[pidFd] {
  1174. delete(c.connectionsByPidFd, pidFd)
  1175. }
  1176. }
  1177. }
  1178. for srcDst, conn := range c.acceptsActive {
  1179. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1180. if _, ok := established[srcDst]; !ok {
  1181. delete(c.acceptsActive, srcDst)
  1182. if conn == c.acceptsByPidFd[pidFd] {
  1183. delete(c.acceptsByPidFd, pidFd)
  1184. }
  1185. continue
  1186. }
  1187. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1188. delete(c.acceptsActive, srcDst)
  1189. if conn == c.acceptsByPidFd[pidFd] {
  1190. delete(c.acceptsByPidFd, pidFd)
  1191. }
  1192. }
  1193. }
  1194. for _, conn := range c.connectionsByPidFd {
  1195. if _, ok := fdMap[conn.Fd]; !ok {
  1196. delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1197. }
  1198. }
  1199. for _, conn := range c.acceptsByPidFd {
  1200. if _, ok := fdMap[conn.Fd]; !ok {
  1201. delete(c.acceptsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1202. }
  1203. }
  1204. for dst, at := range c.connectLastAttempt {
  1205. _, active := establishedDst[dst]
  1206. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1207. delete(c.connectLastAttempt, dst)
  1208. // delete(c.connectsFailed, dst)
  1209. for dfailed := range c.connectsFailed {
  1210. if dfailed.src == dst {
  1211. delete(c.connectsFailed, dfailed)
  1212. }
  1213. }
  1214. for d := range c.connectsSuccessful {
  1215. if d.src == dst {
  1216. delete(c.connectsSuccessful, d)
  1217. }
  1218. }
  1219. c.l7Stats.delete(dst)
  1220. }
  1221. }
  1222. for dst, at := range c.acceptLastAttempt {
  1223. _, active := establishedDst[dst]
  1224. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1225. delete(c.acceptLastAttempt, dst)
  1226. for d := range c.acceptsSuccessful {
  1227. if d.src == dst {
  1228. delete(c.acceptsSuccessful, d)
  1229. }
  1230. }
  1231. c.l7Stats.delete(dst)
  1232. }
  1233. }
  1234. }
  1235. func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.IPPort]string) {
  1236. for addr, byPid := range c.listens {
  1237. if _, open := actualListens[addr]; open {
  1238. continue
  1239. }
  1240. klog.Warningln("deleting the outdated listen:", addr)
  1241. for _, details := range byPid {
  1242. if details.ClosedAt.IsZero() {
  1243. details.ClosedAt = now
  1244. }
  1245. }
  1246. }
  1247. missingListens := map[netaddr.IPPort]string{}
  1248. for addr, inode := range actualListens {
  1249. byPids, found := c.listens[addr]
  1250. if !found {
  1251. missingListens[addr] = inode
  1252. continue
  1253. }
  1254. open := false
  1255. for _, details := range byPids {
  1256. if details.ClosedAt.IsZero() {
  1257. open = true
  1258. break
  1259. }
  1260. }
  1261. if !open {
  1262. missingListens[addr] = inode
  1263. }
  1264. }
  1265. if len(missingListens) > 0 {
  1266. inodeToPid := map[string]uint32{}
  1267. for pid := range c.processes {
  1268. fds, err := proc.ReadFds(pid)
  1269. if err != nil {
  1270. klog.Warningln(err)
  1271. continue
  1272. }
  1273. for _, fd := range fds {
  1274. if fd.SocketInode != "" {
  1275. inodeToPid[fd.SocketInode] = pid
  1276. }
  1277. }
  1278. }
  1279. for addr, inode := range missingListens {
  1280. pid, found := inodeToPid[inode]
  1281. if !found {
  1282. continue
  1283. }
  1284. //klog.Warningln("missing listen found:", addr, pid)
  1285. c.onListenOpen(pid, addr, true)
  1286. }
  1287. }
  1288. for addr, pids := range c.listens {
  1289. for pid, details := range pids {
  1290. if !details.ClosedAt.IsZero() && now.Sub(details.ClosedAt) > gcInterval {
  1291. delete(c.listens[addr], pid)
  1292. }
  1293. }
  1294. if len(c.listens[addr]) == 0 {
  1295. delete(c.listens, addr)
  1296. }
  1297. }
  1298. }
  1299. func (c *Container) AttachUprobes(tracer *ebpftracer.Tracer, pid uint32, _type string) error {
  1300. klog.Infoln("[attach] attachUprobes start by :", _type)
  1301. if tracer.DisableL7Tracing() {
  1302. return nil
  1303. }
  1304. codeType := c.GetCodeTypeFromCache(pid)
  1305. if codeType.IsUnknownCode() {
  1306. return nil
  1307. }
  1308. var err error
  1309. switch codeType {
  1310. case CodeTypeJava:
  1311. err = c.attachJVMUprobes(tracer, pid)
  1312. case CodeTypeJavaAot:
  1313. err = c.attachJavaAotUprobes(tracer, pid)
  1314. case CodeTypeGo:
  1315. err = c.attachTlsUprobes(tracer, pid)
  1316. case CodeTypeNetCoreAot:
  1317. err = c.attachNetCoreUprobes(tracer, pid)
  1318. }
  1319. if err != nil {
  1320. klog.WithField("pid", pid).Errorf("[attach] error %v :", err)
  1321. deErr := c.DetachUprobes(tracer, pid, APP_UPROBE_ERROR)
  1322. if deErr != nil {
  1323. klog.WithField("pid", pid).Errorf("[attach] Detach Uprobes error %v :", deErr)
  1324. }
  1325. return err
  1326. }
  1327. return nil
  1328. }
  1329. func (c *Container) GetCodeTypeFromCache(pid uint32) CodeType {
  1330. p := c.processes[pid]
  1331. if p == nil {
  1332. return CodeTypeUnknown
  1333. }
  1334. if p.codeType.IsWaitCheck() {
  1335. p.codeType = GetExeType(pid, c.getRootfs())
  1336. }
  1337. return p.codeType
  1338. }
  1339. func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1340. p := c.processes[pid]
  1341. if p == nil {
  1342. return nil
  1343. }
  1344. if p.openSslUprobesChecked || p.goTlsUprobesChecked {
  1345. return nil
  1346. }
  1347. if !p.openSslUprobesChecked {
  1348. p.openSslUprobesChecked = true
  1349. sslProbes, err := tracer.AttachOpenSslUprobes(pid)
  1350. if err != nil {
  1351. return err
  1352. }
  1353. p.uprobes = append(p.uprobes, sslProbes...)
  1354. }
  1355. if !p.goTlsUprobesChecked {
  1356. p.goTlsUprobesChecked = true
  1357. codeType := c.GetCodeTypeFromCache(pid)
  1358. goProbes, err := tracer.AttachGoTlsUprobes(pid, &c.AppInfo, uint16(codeType))
  1359. if err != nil {
  1360. return err
  1361. }
  1362. p.uprobes = append(p.uprobes, goProbes...)
  1363. c.l7AttachSuccess()
  1364. }
  1365. return nil
  1366. }
  1367. func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1368. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1369. return nil
  1370. }
  1371. p := c.processes[pid]
  1372. if p == nil {
  1373. return nil
  1374. }
  1375. codeType := c.GetCodeTypeFromCache(pid)
  1376. if !p.jvmAttachOnce {
  1377. p.jvmAttachOnce = true
  1378. rootfs := c.getRootfs()
  1379. // TODO java Aot
  1380. if codeType.IsJvmCode() {
  1381. // check version
  1382. libjavaso, err := utils.GetSoPath(pid, "libjava.so", rootfs)
  1383. if err != nil {
  1384. klog.WithError(err).Errorf("[attach] Failed get so path")
  1385. return err
  1386. }
  1387. v, err := ebpftracer.GetJvmVersion(libjavaso)
  1388. if err != nil {
  1389. klog.WithError(err).Errorf("[attach] Failed get Java version")
  1390. return err
  1391. }
  1392. c.AppInfo.Version = v
  1393. major, minor, patch, err := ebpftracer.ParseVersion(v)
  1394. klog.Infof("[attach] version: %s (Major: %d, Minor: %d, Patch: %d)", v, major, minor, patch)
  1395. if major != 1 || minor != 8 {
  1396. klog.Errorf("[attach] Unsupported Java version.")
  1397. return fmt.Errorf("[attach] Unsupported Java version")
  1398. }
  1399. }
  1400. /*libNioProbes, err := tracer.AttachJavaNioReadUprobes(pid, codeType, rootfs)
  1401. if err != nil {
  1402. klog.Error(err)
  1403. return err
  1404. }
  1405. p.uprobes = append(p.uprobes, libNioProbes...)*/
  1406. libNetProbes, err := tracer.AttachJavaNetWriteUprobes(pid, rootfs)
  1407. if err != nil {
  1408. klog.Error(err)
  1409. return err
  1410. }
  1411. p.uprobes = append(p.uprobes, libNetProbes...)
  1412. //p.jvmUprobesChecked = true
  1413. c.l7AttachSuccess()
  1414. err = tracer.InitKProcInfo(pid, &c.AppInfo)
  1415. if err != nil {
  1416. klog.Errorf("[attach] InitKProcInfo failed pid:[%d] ;%s.", pid, err.Error())
  1417. return err
  1418. } else {
  1419. klog.Infof("[attach] InitKProcInfo succeed! pid:[%d]", pid)
  1420. }
  1421. } else {
  1422. klog.Infof("[attach] %s-%d already attach status:%v", codeType.String(), pid, c.Isl7AttachSuccess())
  1423. }
  1424. return nil
  1425. }
  1426. func (c *Container) attachJavaAotUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1427. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1428. return nil
  1429. }
  1430. p := c.processes[pid]
  1431. if p == nil {
  1432. return nil
  1433. }
  1434. codeType := c.GetCodeTypeFromCache(pid)
  1435. if !p.jvmAttachOnce {
  1436. p.jvmAttachOnce = true
  1437. rootfs := c.getRootfs()
  1438. // // TODO java Aot
  1439. // if codeType.IsJavaAotCode() {
  1440. // // check version
  1441. // libjavaso, err := utils.GetSoPath(pid, "libjava.so", rootfs)
  1442. // if err != nil {
  1443. // klog.WithError(err).Errorf("[attach] Failed get so path")
  1444. // return err
  1445. // }
  1446. // v, err := ebpftracer.GetJvmVersion(libjavaso)
  1447. // if err != nil {
  1448. // klog.WithError(err).Errorf("[attach] Failed get Java version")
  1449. // return err
  1450. // }
  1451. // c.AppInfo.Version = v
  1452. // major, minor, patch, err := ebpftracer.ParseVersion(v)
  1453. // klog.Infof("[attach] version: %s (Major: %d, Minor: %d, Patch: %d)", v, major, minor, patch)
  1454. // if major != 1 || minor != 8 {
  1455. // return fmt.Errorf("[attach] Unsupported Java version")
  1456. // }
  1457. // }
  1458. libNioProbes, err := tracer.AttachJavaAotNioReadUprobes(pid, codeType, rootfs)
  1459. if err != nil {
  1460. klog.Error(err)
  1461. return err
  1462. }
  1463. p.uprobes = append(p.uprobes, libNioProbes...)
  1464. libNetProbes, err := tracer.AttachJavaAotNetWriteUprobes(pid, rootfs)
  1465. if err != nil {
  1466. klog.Error(err)
  1467. return err
  1468. }
  1469. p.uprobes = append(p.uprobes, libNetProbes...)
  1470. //p.jvmUprobesChecked = true
  1471. c.l7AttachSuccess()
  1472. tracer.InitKProcInfo(pid, &c.AppInfo)
  1473. } else {
  1474. klog.Infof("[attach] %s-%d already attach", codeType.String(), pid)
  1475. }
  1476. return nil
  1477. }
  1478. func (c *Container) errorClose(pid uint32, closeType int) {
  1479. p := c.processes[pid]
  1480. if p != nil {
  1481. p.DynamicClose(closeType)
  1482. }
  1483. }
  1484. func (c *Container) attachNetCoreUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1485. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1486. return nil
  1487. }
  1488. p := c.processes[pid]
  1489. if p == nil {
  1490. return nil
  1491. }
  1492. if !p.jvmAttachOnce {
  1493. p.jvmAttachOnce = true
  1494. //codeType := c.GetCodeTypeFromCache(pid)
  1495. tracer.InitKProcInfo(pid, &c.AppInfo)
  1496. p.uprobes = append(p.uprobes, tracer.AttachNetCoreNetThreadUprobes(pid)...)
  1497. readProbes, err := tracer.AttachNetCoreNetReadUprobes(pid)
  1498. if err != nil {
  1499. klog.Error(err)
  1500. return err
  1501. }
  1502. p.uprobes = append(p.uprobes, readProbes...)
  1503. WriteProbes, err := tracer.AttachNetCoreNetWriteUprobes(pid)
  1504. if err != nil {
  1505. klog.Error(err)
  1506. return err
  1507. }
  1508. p.uprobes = append(p.uprobes, WriteProbes...)
  1509. c.l7AttachSuccess()
  1510. }
  1511. return nil
  1512. }
  1513. func resolveFd(pid uint32, fd uint64) (mntId string, logPath string) {
  1514. info := proc.GetFdInfo(pid, fd)
  1515. if info == nil {
  1516. return
  1517. }
  1518. switch {
  1519. case info.Flags&os.O_WRONLY == 0 && info.Flags&os.O_RDWR == 0,
  1520. !strings.HasPrefix(info.Dest, "/"),
  1521. strings.HasPrefix(info.Dest, "/proc/"),
  1522. strings.HasPrefix(info.Dest, "/dev/"),
  1523. strings.HasPrefix(info.Dest, "/sys/"),
  1524. strings.HasSuffix(info.Dest, "(deleted)"):
  1525. return
  1526. }
  1527. mntId = info.MntId
  1528. if info.Flags&os.O_WRONLY != 0 && strings.HasPrefix(info.Dest, "/var/log/") &&
  1529. !strings.HasPrefix(info.Dest, "/var/log/pods/") &&
  1530. !strings.HasPrefix(info.Dest, "/var/log/containers/") &&
  1531. !strings.HasPrefix(info.Dest, "/var/log/journal/") {
  1532. logPath = info.Dest
  1533. }
  1534. return
  1535. }
  1536. func counter(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1537. return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labelValues...)
  1538. }
  1539. func gauge(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1540. return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...)
  1541. }