container.go 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694
  1. package containers
  2. import (
  3. debugelf "debug/elf"
  4. "fmt"
  5. "os"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/coroot/coroot-node-agent/utils"
  12. . "github.com/coroot/coroot-node-agent/utils/modelse"
  13. "github.com/coroot/coroot-node-agent/cgroup"
  14. "github.com/coroot/coroot-node-agent/common"
  15. "github.com/coroot/coroot-node-agent/ebpftracer"
  16. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  18. "github.com/coroot/coroot-node-agent/flags"
  19. "github.com/coroot/coroot-node-agent/logs"
  20. "github.com/coroot/coroot-node-agent/node"
  21. "github.com/coroot/coroot-node-agent/pinger"
  22. "github.com/coroot/coroot-node-agent/proc"
  23. "github.com/coroot/coroot-node-agent/tracing"
  24. "github.com/coroot/logparser"
  25. "github.com/prometheus/client_golang/prometheus"
  26. klog "github.com/sirupsen/logrus"
  27. "github.com/vishvananda/netns"
  28. "golang.org/x/exp/maps"
  29. "inet.af/netaddr"
  30. )
  31. var (
  32. gcInterval = 1 * time.Minute
  33. AllAppInfoInterval = 1 * time.Minute
  34. RegisterAppInterval = 1 * time.Minute
  35. RegisterHostInterval = 10 * time.Minute
  36. pingTimeout = 300 * time.Millisecond
  37. )
  38. type ContainerID string
  39. type ContainerNetwork struct {
  40. NetworkID string
  41. }
  42. type ContainerMetadata struct {
  43. name string
  44. labels map[string]string
  45. volumes map[string]string
  46. logPath string
  47. image string
  48. logDecoder logparser.Decoder
  49. hostListens map[string][]netaddr.IPPort
  50. networks map[string]ContainerNetwork
  51. env map[string]string
  52. systemdTriggeredBy string
  53. rootfs string
  54. }
  55. type Delays struct {
  56. cpu time.Duration
  57. disk time.Duration
  58. }
  59. type LogParser struct {
  60. parser *logparser.Parser
  61. stop func()
  62. }
  63. func (p *LogParser) Stop() {
  64. if p.stop != nil {
  65. p.stop()
  66. }
  67. p.parser.Stop()
  68. }
  69. type AddrPair struct {
  70. src netaddr.IPPort
  71. dst netaddr.IPPort
  72. }
  73. type ActiveConnection struct {
  74. Dest netaddr.IPPort
  75. Src netaddr.IPPort
  76. ActualDest netaddr.IPPort
  77. Pid uint32
  78. Fd uint64
  79. Timestamp uint64
  80. Closed time.Time
  81. Retransmissions uint64
  82. BytesSent uint64
  83. PerBytesSent uint64
  84. BytesReceived uint64
  85. PerBytesReceived uint64
  86. ConEstTime time.Duration
  87. FirstReadTime uint64
  88. FirstWriteTime uint64
  89. NewReadTime uint64
  90. http2Parser *l7.Http2Parser
  91. postgresParser *l7.PostgresParser
  92. mysqlParser *l7.MysqlParser
  93. dmParser *l7.DmParser
  94. cassandraParser *l7.CassandraParser
  95. }
  96. type ActiveAccept struct {
  97. Dest netaddr.IPPort
  98. Src netaddr.IPPort
  99. Pid uint32
  100. Fd uint64
  101. Timestamp uint64
  102. Closed time.Time
  103. BytesSent uint64
  104. BytesReceived uint64
  105. }
  106. type ListenDetails struct {
  107. ClosedAt time.Time
  108. NsIPs []netaddr.IP
  109. }
  110. type PidFd struct {
  111. Pid uint32
  112. Fd uint64
  113. }
  114. type K8sContainer struct {
  115. ns string
  116. podName string
  117. podId string
  118. workload string
  119. containerName string
  120. pid string
  121. }
  122. type ConnectionStats struct {
  123. Count uint64
  124. TotalTime time.Duration
  125. Retransmissions uint64
  126. BytesSent uint64
  127. PerBytesSent uint64
  128. BytesReceived uint64
  129. PerBytesReceived uint64
  130. Src netaddr.IPPort
  131. ConEstTime time.Duration
  132. FirstReadTime uint64
  133. FirstWriteTime uint64
  134. NewReadTime uint64
  135. }
  136. type AcceptStats struct {
  137. BytesSent uint64
  138. BytesReceived uint64
  139. }
  140. type Container struct {
  141. id ContainerID
  142. cgroup *cgroup.Cgroup
  143. metadata *ContainerMetadata
  144. K8sContainer
  145. processes map[uint32]*Process
  146. startedAt time.Time
  147. zombieAt time.Time
  148. restarts int
  149. delays Delays
  150. delaysByPid map[uint32]Delays
  151. delaysLock sync.Mutex
  152. listens map[netaddr.IPPort]map[uint32]*ListenDetails
  153. connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count
  154. // connectsFailed map[netaddr.IPPort]int64 // dst -> count
  155. connectsFailed map[AddrPair]int64
  156. connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  157. connectionsActive map[AddrPair]*ActiveConnection
  158. connectionsByPidFd map[PidFd]*ActiveConnection
  159. acceptsSuccessful map[AddrPair]*AcceptStats
  160. acceptLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  161. acceptsActive map[AddrPair]*ActiveAccept
  162. acceptsByPidFd map[PidFd]*ActiveAccept
  163. l7Stats L7Stats
  164. dnsStats *L7Metrics
  165. oomKills int
  166. pythonThreadLockWaitTime time.Duration
  167. mounts map[string]proc.MountInfo
  168. logParsers map[string]*LogParser
  169. hostConntrack *Conntrack
  170. nsConntrack *Conntrack
  171. lbConntracks []*Conntrack
  172. registry *Registry
  173. lock sync.RWMutex
  174. done chan struct{}
  175. traceMap map[uint64]*tracing.Trace
  176. //instanceID utils.ID
  177. Symbols []debugelf.Symbol
  178. Uprobes []tracer.Uprobe
  179. UprobesMap map[string]tracer.Uprobe
  180. l7EventReady bool
  181. l7Attach bool
  182. licenseFuse bool
  183. // 白名单详情
  184. WhiteSettingInfo WhiteSettingInfo
  185. // 应用详情
  186. AppInfo AppInfo
  187. RegTime int
  188. }
  189. func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) {
  190. netNs, err := proc.GetNetNs(pid)
  191. if err != nil {
  192. return nil, err
  193. }
  194. defer netNs.Close()
  195. c := &Container{
  196. id: id,
  197. cgroup: cg,
  198. metadata: md,
  199. processes: map[uint32]*Process{},
  200. delaysByPid: map[uint32]Delays{},
  201. listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
  202. connectsSuccessful: map[AddrPair]*ConnectionStats{},
  203. // connectsFailed: map[netaddr.IPPort]int64{},
  204. connectsFailed: map[AddrPair]int64{},
  205. connectLastAttempt: map[netaddr.IPPort]time.Time{},
  206. connectionsActive: map[AddrPair]*ActiveConnection{},
  207. connectionsByPidFd: map[PidFd]*ActiveConnection{},
  208. acceptsSuccessful: map[AddrPair]*AcceptStats{},
  209. acceptLastAttempt: map[netaddr.IPPort]time.Time{},
  210. acceptsActive: map[AddrPair]*ActiveAccept{},
  211. acceptsByPidFd: map[PidFd]*ActiveAccept{},
  212. l7Stats: L7Stats{},
  213. dnsStats: &L7Metrics{},
  214. mounts: map[string]proc.MountInfo{},
  215. logParsers: map[string]*LogParser{},
  216. hostConntrack: hostConntrack,
  217. done: make(chan struct{}),
  218. traceMap: make(map[uint64]*tracing.Trace),
  219. registry: registry,
  220. }
  221. for _, n := range md.networks {
  222. if nsHandle := FindNetworkLoadBalancerNs(n.NetworkID); nsHandle.IsOpen() {
  223. if ct, err := NewConntrack(nsHandle); err != nil {
  224. klog.Warningln(err)
  225. } else {
  226. c.lbConntracks = append(c.lbConntracks, ct)
  227. }
  228. _ = nsHandle.Close()
  229. }
  230. }
  231. c.runLogParser("")
  232. // go func() {
  233. // ticker := time.NewTicker(gcInterval)
  234. // defer ticker.Stop()
  235. // for {
  236. // select {
  237. // case <-c.done:
  238. // return
  239. // case t := <-ticker.C:
  240. // c.gc(t)
  241. // }
  242. // }
  243. // }()
  244. return c, nil
  245. }
  246. func (c *Container) Close() {
  247. for _, p := range c.logParsers {
  248. p.Stop()
  249. }
  250. for _, ct := range c.lbConntracks {
  251. _ = ct.Close()
  252. }
  253. if c.nsConntrack != nil {
  254. _ = c.nsConntrack.Close()
  255. }
  256. close(c.done)
  257. }
  258. func (c *Container) Dead(now time.Time) bool {
  259. return !c.zombieAt.IsZero() && now.Sub(c.zombieAt) > gcInterval
  260. }
  261. func (c *Container) Describe(ch chan<- *prometheus.Desc) {
  262. // some fixed metric description is required here to register/unregister the collector correctly
  263. ch <- prometheus.NewDesc("container", "", nil, nil)
  264. }
  265. func (c *Container) Collect(ch chan<- prometheus.Metric) {
  266. c.registry.updateTrafficStatsIfNecessary()
  267. c.lock.RLock()
  268. defer c.lock.RUnlock()
  269. if c.metadata.image != "" || c.metadata.systemdTriggeredBy != "" {
  270. ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image, c.metadata.systemdTriggeredBy)
  271. }
  272. ch <- counter(metrics.Restarts, float64(c.restarts))
  273. if cpu, err := c.cgroup.CpuStat(); err == nil {
  274. if cpu.LimitCores > 0 {
  275. ch <- gauge(metrics.CPULimit, cpu.LimitCores)
  276. }
  277. ch <- counter(metrics.CPUUsage, cpu.UsageSeconds)
  278. ch <- counter(metrics.ThrottledTime, cpu.ThrottledTimeSeconds)
  279. }
  280. if taskstatsClient != nil {
  281. c.updateDelays()
  282. ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second))
  283. ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second))
  284. }
  285. if s, err := c.cgroup.MemoryStat(); err == nil {
  286. ch <- gauge(metrics.MemoryRss, float64(s.RSS))
  287. ch <- gauge(metrics.MemoryCache, float64(s.Cache))
  288. if s.Limit > 0 {
  289. ch <- gauge(metrics.MemoryLimit, float64(s.Limit))
  290. }
  291. }
  292. if c.oomKills > 0 {
  293. ch <- counter(metrics.OOMKills, float64(c.oomKills))
  294. }
  295. if disks, err := node.GetDisks(); err == nil {
  296. ioStat, _ := c.cgroup.IOStat()
  297. for majorMinor, mounts := range c.getMounts() {
  298. dev := disks.GetParentBlockDevice(majorMinor)
  299. if dev == nil {
  300. continue
  301. }
  302. for mountPoint, fsStat := range mounts {
  303. dls := []string{mountPoint, dev.Name, c.metadata.volumes[mountPoint]}
  304. ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...)
  305. ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...)
  306. ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...)
  307. if io, ok := ioStat[majorMinor]; ok {
  308. ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...)
  309. ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...)
  310. ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...)
  311. ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...)
  312. }
  313. }
  314. }
  315. }
  316. for addr, open := range c.getListens() {
  317. ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "")
  318. }
  319. for proxy, addrs := range c.getProxiedListens() {
  320. for addr := range addrs {
  321. ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy)
  322. }
  323. }
  324. strInstanceID := strconv.FormatInt(c.AppInfo.InstanceIdHash.IntVal, 10)
  325. strAppId := strconv.FormatInt(c.AppInfo.AppIdHash.IntVal, 10)
  326. strAppName := c.AppInfo.AppName
  327. apps := c.registry.GetAllAppInfoFromServer()
  328. for d, stats := range c.connectsSuccessful {
  329. targetAppId := apps[d.dst.String()]
  330. ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  331. ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  332. stats.Count = 0
  333. // if stats.Retransmissions > 0 {
  334. // ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  335. // }
  336. // ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  337. // ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  338. // ch <- counter(metrics.NetBytesSentPer, float64(stats.PerBytesSent), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  339. // ch <- counter(metrics.NetBytesReceivedPer, float64(stats.PerBytesReceived), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  340. // ch <- counter(metrics.NetDataLatency, float64(stats.FirstReadTime-stats.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  341. // ch <- counter(metrics.NetDataDuration, float64(stats.NewReadTime-stats.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  342. // ch <- counter(metrics.NetEstTime, float64(stats.ConEstTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  343. // klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime)
  344. // stats.PerBytesReceived = 0
  345. // stats.PerBytesSent = 0
  346. }
  347. // for d, stats := range c.acceptsSuccessful {
  348. // ch <- counter(metrics.NetAcceptsSuccessful, float64(0), d.src.String(), d.dst.String())
  349. // ch <- counter(metrics.NetAcceptBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String())
  350. // ch <- counter(metrics.NetAcceptBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String())
  351. // klog.Infof("c.acceptsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d", d.src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived)
  352. // }
  353. // for dst, count := range c.connectsFailed {
  354. // targetAppId := apps[dst.String()]
  355. // ch <- counter(metrics.NetConnectionsFailed, float64(count), strInstanceID, strAppId, targetAppId, strAppName, dst.String())
  356. // }
  357. for addrPair, count := range c.connectsFailed {
  358. targetAppId := apps[addrPair.dst.String()]
  359. ch <- counter(metrics.NetConnectionsFailed, float64(count), strInstanceID, strAppId, targetAppId, strAppName, addrPair.src.String(), addrPair.dst.String())
  360. c.connectsFailed[addrPair] = 0
  361. }
  362. connections := map[AddrPair]int{}
  363. for addrPair, conn := range c.connectionsActive {
  364. targetAppId := apps[conn.Dest.String()]
  365. if conn.Retransmissions > 0 {
  366. ch <- counter(metrics.NetRetransmits, float64(conn.Retransmissions), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  367. conn.Retransmissions = 0
  368. }
  369. ch <- counter(metrics.NetBytesSent, float64(conn.BytesSent), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  370. ch <- counter(metrics.NetBytesReceived, float64(conn.BytesReceived), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  371. ch <- counter(metrics.NetBytesSentPer, float64(conn.PerBytesSent), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  372. ch <- counter(metrics.NetBytesReceivedPer, float64(conn.PerBytesReceived), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  373. ch <- counter(metrics.NetDataLatency, float64(conn.FirstReadTime-conn.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  374. ch <- counter(metrics.NetDataDuration, float64(conn.NewReadTime-conn.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  375. ch <- counter(metrics.NetEstTime, float64(conn.ConEstTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  376. // klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime)
  377. conn.PerBytesReceived = 0
  378. conn.PerBytesSent = 0
  379. if !conn.Closed.IsZero() {
  380. continue
  381. }
  382. connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest}]++
  383. }
  384. for d, count := range connections {
  385. ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
  386. }
  387. for source, p := range c.logParsers {
  388. for _, c := range p.parser.GetCounters() {
  389. ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
  390. }
  391. }
  392. appTypes := map[string]struct{}{}
  393. seenJvms := map[string]bool{}
  394. seenDotNetApps := map[string]bool{}
  395. pids := maps.Keys(c.processes)
  396. sort.Slice(pids, func(i, j int) bool {
  397. return pids[i] < pids[j]
  398. })
  399. for _, pid := range pids {
  400. process := c.processes[pid]
  401. cmdline := proc.GetCmdline(pid)
  402. if len(cmdline) == 0 {
  403. continue
  404. }
  405. appType := guessApplicationType(cmdline)
  406. if appType != "" {
  407. appTypes[appType] = struct{}{}
  408. }
  409. if process.isGolangApp {
  410. appTypes["golang"] = struct{}{}
  411. }
  412. switch {
  413. case isJvm(cmdline):
  414. jvm, jMetrics := jvmMetrics(pid)
  415. if len(jMetrics) > 0 && !seenJvms[jvm] {
  416. seenJvms[jvm] = true
  417. for _, m := range jMetrics {
  418. ch <- m
  419. }
  420. }
  421. case process.dotNetMonitor != nil:
  422. appTypes["dotnet"] = struct{}{}
  423. appName := process.dotNetMonitor.AppName()
  424. if !seenDotNetApps[appName] {
  425. seenDotNetApps[appName] = true
  426. process.dotNetMonitor.Collect(ch)
  427. }
  428. }
  429. }
  430. for appType := range appTypes {
  431. ch <- gauge(metrics.ApplicationType, 1, appType)
  432. }
  433. if c.pythonThreadLockWaitTime > 0 {
  434. ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds())
  435. }
  436. if c.dnsStats.Requests != nil {
  437. c.dnsStats.Requests.Collect(ch)
  438. }
  439. if c.dnsStats.Latency != nil {
  440. c.dnsStats.Latency.Collect(ch)
  441. }
  442. c.l7Stats.collect(ch)
  443. if !*flags.DisablePinger {
  444. for ip, rtt := range c.ping() {
  445. ch <- gauge(metrics.NetLatency, rtt, ip.String())
  446. }
  447. }
  448. c.gc(time.Now())
  449. }
  450. func (c *Container) onProcessStart(pid uint32) *Process {
  451. c.lock.Lock()
  452. defer c.lock.Unlock()
  453. stats, err := TaskstatsPID(pid)
  454. if err != nil {
  455. klog.WithError(err).Debugf("Failed onProcessStart [%d]", pid)
  456. return nil
  457. }
  458. c.zombieAt = time.Time{}
  459. p := NewProcess(pid, stats, c.registry.tracer)
  460. if p == nil {
  461. return nil
  462. }
  463. c.processes[pid] = p
  464. if c.startedAt.IsZero() {
  465. c.startedAt = stats.BeginTime
  466. } else {
  467. min := stats.BeginTime
  468. for _, p := range c.processes {
  469. if p.StartedAt.Before(min) {
  470. min = p.StartedAt
  471. }
  472. }
  473. if min.After(c.startedAt) {
  474. c.restarts++
  475. c.startedAt = min
  476. }
  477. }
  478. return p
  479. }
  480. func (c *Container) onProcessExit(pid uint32, oomKill bool) {
  481. c.lock.Lock()
  482. defer c.lock.Unlock()
  483. if p := c.processes[pid]; p != nil {
  484. p.Close()
  485. }
  486. delete(c.processes, pid)
  487. if len(c.processes) == 0 {
  488. c.zombieAt = time.Now()
  489. }
  490. delete(c.delaysByPid, pid)
  491. if oomKill {
  492. c.oomKills++
  493. }
  494. }
  495. func (c *Container) onFileOpen(pid uint32, fd uint64) {
  496. mntId, logPath := resolveFd(pid, fd)
  497. func() {
  498. if mntId == "" {
  499. return
  500. }
  501. c.lock.Lock()
  502. _, ok := c.mounts[mntId]
  503. c.lock.Unlock()
  504. if ok {
  505. return
  506. }
  507. byMountId := proc.GetMountInfo(pid)
  508. if byMountId == nil {
  509. return
  510. }
  511. if mi, ok := byMountId[mntId]; ok {
  512. c.lock.Lock()
  513. c.mounts[mntId] = mi
  514. c.lock.Unlock()
  515. }
  516. }()
  517. if logPath != "" {
  518. c.lock.Lock()
  519. c.runLogParser(logPath)
  520. c.lock.Unlock()
  521. }
  522. }
  523. // set
  524. func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
  525. klog.Infof("TCP listen open pid=%d id=%s addr=%s", pid, c.id, addr)
  526. if common.PortFilter.ShouldBeSkipped(addr.Port()) {
  527. return
  528. }
  529. if !safe {
  530. c.lock.Lock()
  531. defer c.lock.Unlock()
  532. }
  533. if _, ok := c.listens[addr]; !ok {
  534. c.listens[addr] = map[uint32]*ListenDetails{}
  535. }
  536. details := &ListenDetails{}
  537. c.listens[addr][pid] = details
  538. if addr.IP().IsUnspecified() {
  539. ns, err := proc.GetNetNs(pid)
  540. if err != nil {
  541. if !common.IsNotExist(err) {
  542. klog.Warningln(err)
  543. }
  544. return
  545. }
  546. defer ns.Close()
  547. ips, err := proc.GetNsIps(ns)
  548. if err != nil {
  549. klog.Warningln(err)
  550. return
  551. }
  552. //klog.Infof("got IPs %s for %s", ips, ns.UniqueId())
  553. details.NsIPs = ips
  554. }
  555. }
  556. func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
  557. klog.Infof("TCP listen close pid=%d id=%s addr=%s", pid, c.id, addr)
  558. c.lock.Lock()
  559. defer c.lock.Unlock()
  560. if _, byAddr := c.listens[addr]; byAddr {
  561. if _, byPid := c.listens[addr][pid]; byPid {
  562. if details := c.listens[addr][pid]; details != nil {
  563. details.ClosedAt = time.Now()
  564. }
  565. }
  566. }
  567. }
  568. func (c *Container) onAcceptOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  569. //klog.Debugf("accept pid=%d id=%s dstaddr=%s srcaddr=%s", pid, c.id, dst.IP(), src.IP())
  570. // if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  571. // return
  572. // }
  573. p := c.processes[pid]
  574. if p == nil {
  575. return
  576. }
  577. if dst.IP().IsLoopback() && !p.isHostNs() {
  578. return
  579. }
  580. // actualDst, err := c.getActualDestination(p, src, dst)
  581. // if err != nil {
  582. // if !common.IsNotExist(err) {
  583. // klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  584. // }
  585. // return
  586. // }
  587. // switch {
  588. // case actualDst == nil:
  589. // actualDst = &dst
  590. // case actualDst.IP().IsLoopback() && !p.isHostNs():
  591. // return
  592. // }
  593. // if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  594. // return
  595. // }
  596. c.lock.Lock()
  597. defer c.lock.Unlock()
  598. if !failed {
  599. key := AddrPair{src: dst, dst: src}
  600. stats := c.acceptsSuccessful[key]
  601. if stats == nil {
  602. stats = &AcceptStats{}
  603. c.acceptsSuccessful[key] = stats
  604. }
  605. acceptCon := &ActiveAccept{
  606. Dest: src,
  607. Src: dst,
  608. Pid: pid,
  609. Fd: fd,
  610. Timestamp: timestamp,
  611. }
  612. c.acceptsActive[AddrPair{src: dst, dst: src}] = acceptCon
  613. c.acceptsByPidFd[PidFd{Pid: pid, Fd: fd}] = acceptCon
  614. }
  615. c.acceptLastAttempt[dst] = time.Now()
  616. }
  617. func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  618. if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  619. return
  620. }
  621. p := c.processes[pid]
  622. if p == nil {
  623. return
  624. }
  625. if dst.IP().IsLoopback() && !p.isHostNs() {
  626. return
  627. }
  628. actualDst, err := c.getActualDestination(p, src, dst)
  629. if err != nil {
  630. if !common.IsNotExist(err) {
  631. klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  632. }
  633. return
  634. }
  635. switch {
  636. case actualDst == nil:
  637. actualDst = &dst
  638. case actualDst.IP().IsLoopback() && !p.isHostNs():
  639. return
  640. }
  641. if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  642. return
  643. }
  644. c.lock.Lock()
  645. defer c.lock.Unlock()
  646. if failed {
  647. key := AddrPair{src: dst, dst: *actualDst}
  648. c.connectsFailed[key]++
  649. } else {
  650. key := AddrPair{src: dst, dst: *actualDst}
  651. stats := c.connectsSuccessful[key]
  652. if stats == nil {
  653. stats = &ConnectionStats{}
  654. c.connectsSuccessful[key] = stats
  655. }
  656. stats.Count++
  657. stats.TotalTime += duration
  658. stats.Src = src
  659. stats.ConEstTime = duration
  660. connection := &ActiveConnection{
  661. Dest: dst,
  662. Src: src,
  663. ActualDest: *actualDst,
  664. Pid: pid,
  665. Fd: fd,
  666. Timestamp: timestamp,
  667. ConEstTime: duration,
  668. }
  669. c.connectionsActive[AddrPair{src: src, dst: dst}] = connection
  670. c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection
  671. }
  672. c.connectLastAttempt[dst] = time.Now()
  673. }
  674. func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) {
  675. if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil {
  676. return actualDst, nil
  677. }
  678. for _, lb := range c.lbConntracks {
  679. if actualDst := lb.GetActualDestination(src, dst); actualDst != nil {
  680. return actualDst, nil
  681. }
  682. }
  683. actualDst := c.hostConntrack.GetActualDestination(src, dst)
  684. if actualDst != nil {
  685. return actualDst, nil
  686. }
  687. if !p.isHostNs() {
  688. if c.nsConntrack == nil {
  689. netNs, err := proc.GetNetNs(p.Pid)
  690. if err != nil {
  691. return nil, err
  692. }
  693. defer netNs.Close()
  694. c.nsConntrack, err = NewConntrack(netNs)
  695. if err != nil {
  696. return nil, err
  697. }
  698. }
  699. return c.nsConntrack.GetActualDestination(src, dst), nil
  700. }
  701. return nil, nil
  702. }
  703. func (c *Container) onConnectionClose(e ebpftracer.Event) {
  704. c.lock.Lock()
  705. conn := c.connectionsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  706. c.lock.Unlock()
  707. if conn != nil {
  708. if conn.Closed.IsZero() {
  709. if e.TrafficStats != nil {
  710. c.lock.Lock()
  711. c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived, e.FirstReadTime, e.FirstWriteTime, e.NewReadTime)
  712. c.lock.Unlock()
  713. }
  714. conn.Closed = time.Now()
  715. }
  716. }
  717. }
  718. func (c *Container) onAcceptClose(e ebpftracer.Event) {
  719. c.lock.Lock()
  720. conn := c.acceptsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  721. c.lock.Unlock()
  722. if conn != nil {
  723. if conn.Closed.IsZero() {
  724. if e.TrafficStats != nil {
  725. c.lock.Lock()
  726. c.updateAcceptTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived)
  727. c.lock.Unlock()
  728. }
  729. conn.Closed = time.Now()
  730. }
  731. }
  732. }
  733. func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) {
  734. if u == nil {
  735. return
  736. }
  737. c.lock.Lock()
  738. defer c.lock.Unlock()
  739. c.updateConnectionTrafficStats(c.connectionsByPidFd[PidFd{Pid: u.Pid, Fd: u.FD}], u.BytesSent, u.BytesReceived, 0, 0, 0)
  740. }
  741. func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, received, firstreadtime, firstwritetime, newreadtime uint64) {
  742. if ac == nil {
  743. return
  744. }
  745. key := AddrPair{src: ac.Dest, dst: ac.ActualDest}
  746. stats := c.connectsSuccessful[key]
  747. if stats == nil {
  748. stats = &ConnectionStats{}
  749. c.connectsSuccessful[key] = stats
  750. }
  751. if sent > ac.BytesSent {
  752. stats.BytesSent += sent - ac.BytesSent
  753. stats.PerBytesSent = sent - ac.BytesSent
  754. ac.PerBytesSent = sent - ac.BytesSent
  755. }
  756. if received > ac.BytesReceived {
  757. stats.BytesReceived += received - ac.BytesReceived
  758. stats.PerBytesReceived = received - ac.BytesReceived
  759. ac.PerBytesReceived = received - ac.BytesReceived
  760. }
  761. if firstreadtime != 0 && firstwritetime != 0 && newreadtime != 0 {
  762. stats.FirstReadTime = firstreadtime
  763. stats.FirstWriteTime = firstwritetime
  764. stats.NewReadTime = newreadtime
  765. ac.FirstReadTime = firstreadtime
  766. ac.FirstWriteTime = firstwritetime
  767. ac.NewReadTime = newreadtime
  768. }
  769. ac.BytesSent = sent
  770. ac.BytesReceived = received
  771. }
  772. func (c *Container) updateAcceptTrafficStats(ac *ActiveAccept, sent, received uint64) {
  773. if ac == nil {
  774. return
  775. }
  776. //klog.Infoln("TCP onConnectionClose5", ac.BytesSent, ac.BytesReceived, ac)
  777. key := AddrPair{src: ac.Src, dst: ac.Dest}
  778. stats := c.acceptsSuccessful[key]
  779. if stats == nil {
  780. stats = &AcceptStats{}
  781. c.acceptsSuccessful[key] = stats
  782. }
  783. if sent > ac.BytesSent {
  784. stats.BytesSent += sent - ac.BytesSent
  785. }
  786. if received > ac.BytesReceived {
  787. stats.BytesReceived += received - ac.BytesReceived
  788. }
  789. ac.BytesSent = sent
  790. ac.BytesReceived = received
  791. }
  792. func (c *Container) onDNSRequest(r *l7.RequestData) (map[netaddr.IP]string, string, string, uint32, []netaddr.IP) {
  793. status := r.Status.DNS()
  794. if status == "" {
  795. return nil, "", "", 0, nil
  796. }
  797. t, fqdn, ttl, ips := l7.ParseDns(r.Payload)
  798. if t == "" {
  799. return nil, "", "", 0, nil
  800. }
  801. if c.dnsStats.Requests == nil {
  802. dnsReq := L7Requests[l7.ProtocolDNS]
  803. c.dnsStats.Requests = prometheus.NewCounterVec(
  804. prometheus.CounterOpts{Name: dnsReq.Name, Help: dnsReq.Help},
  805. []string{"request_type", "domain", "status"},
  806. )
  807. }
  808. if m, _ := c.dnsStats.Requests.GetMetricWithLabelValues(t, fqdn, status); m != nil {
  809. m.Inc()
  810. }
  811. if r.Duration != 0 {
  812. if c.dnsStats.Latency == nil {
  813. dnsLatency := L7Latency[l7.ProtocolDNS]
  814. c.dnsStats.Latency = prometheus.NewHistogram(prometheus.HistogramOpts{Name: dnsLatency.Name, Help: dnsLatency.Help})
  815. }
  816. c.dnsStats.Latency.Observe(r.Duration.Seconds())
  817. }
  818. ip2fqdn := map[netaddr.IP]string{}
  819. if fqdn != "" {
  820. for _, ip := range ips {
  821. ip2fqdn[ip] = fqdn
  822. }
  823. }
  824. return ip2fqdn, t, fqdn, ttl, ips
  825. }
  826. func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  827. c.lock.Lock()
  828. defer c.lock.Unlock()
  829. if r.Protocol == l7.ProtocolDNS {
  830. //return c.onDNSRequest(r)
  831. }
  832. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  833. if conn == nil {
  834. return nil
  835. }
  836. if timestamp != 0 && conn.Timestamp != timestamp {
  837. return nil
  838. }
  839. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  840. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  841. switch r.Protocol {
  842. case l7.ProtocolHTTP:
  843. stats.observe(r.Status.Http(), "", r.Duration)
  844. method, path := l7.ParseHttp(r.Payload)
  845. trace.HttpRequest(method, path, r.Status, r.Duration)
  846. case l7.ProtocolHTTP2:
  847. if conn.http2Parser == nil {
  848. conn.http2Parser = l7.NewHttp2Parser()
  849. }
  850. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  851. for _, req := range requests {
  852. stats.observe(req.Status.Http(), "", req.Duration)
  853. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  854. }
  855. case l7.ProtocolPostgres:
  856. if r.Method != l7.MethodStatementClose {
  857. stats.observe(r.Status.String(), "", r.Duration)
  858. }
  859. if conn.postgresParser == nil {
  860. conn.postgresParser = l7.NewPostgresParser()
  861. }
  862. query := conn.postgresParser.Parse(r.Payload)
  863. trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  864. case l7.ProtocolMysql:
  865. if r.Method != l7.MethodStatementClose {
  866. stats.observe(r.Status.String(), "", r.Duration)
  867. }
  868. if conn.mysqlParser == nil {
  869. conn.mysqlParser = l7.NewMysqlParser()
  870. }
  871. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  872. trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  873. case l7.ProtocolMemcached:
  874. stats.observe(r.Status.String(), "", r.Duration)
  875. cmd, items := l7.ParseMemcached(r.Payload)
  876. trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  877. case l7.ProtocolRedis:
  878. stats.observe(r.Status.String(), "", r.Duration)
  879. cmd, args := l7.ParseRedis(r.Payload)
  880. trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  881. case l7.ProtocolMongo:
  882. stats.observe(r.Status.String(), "", r.Duration)
  883. query := l7.ParseMongo(r.Payload)
  884. trace.MongoQuery(query, r.Status.Error(), r.Duration)
  885. case l7.ProtocolKafka, l7.ProtocolCassandra:
  886. stats.observe(r.Status.String(), "", r.Duration)
  887. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  888. stats.observe(r.Status.String(), r.Method.String(), 0)
  889. case l7.ProtocolDubbo2:
  890. stats.observe(r.Status.String(), "", r.Duration)
  891. }
  892. return nil
  893. }
  894. func (c *Container) onRetransmission(srcDst AddrPair) bool {
  895. c.lock.Lock()
  896. defer c.lock.Unlock()
  897. conn, ok := c.connectionsActive[srcDst]
  898. if !ok {
  899. return false
  900. }
  901. key := AddrPair{src: srcDst.dst, dst: conn.ActualDest}
  902. stats := c.connectsSuccessful[key]
  903. if stats == nil {
  904. stats = &ConnectionStats{}
  905. c.connectsSuccessful[key] = stats
  906. }
  907. stats.Retransmissions++
  908. conn.Retransmissions++
  909. return true
  910. }
  911. func (c *Container) updateDelays() {
  912. c.delaysLock.Lock()
  913. defer c.delaysLock.Unlock()
  914. for pid := range c.processes {
  915. stats, err := TaskstatsTGID(pid)
  916. if err != nil {
  917. continue
  918. }
  919. d := c.delaysByPid[pid]
  920. c.delays.cpu += stats.CPUDelay - d.cpu
  921. c.delays.disk += stats.BlockIODelay - d.disk
  922. d.cpu = stats.CPUDelay
  923. d.disk = stats.BlockIODelay
  924. c.delaysByPid[pid] = d
  925. }
  926. }
  927. func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
  928. if len(c.mounts) == 0 {
  929. return nil
  930. }
  931. res := map[string]map[string]*proc.FSStat{}
  932. for _, mi := range c.mounts {
  933. var stat *proc.FSStat
  934. for pid := range c.processes {
  935. s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
  936. if err == nil {
  937. stat = &s
  938. break
  939. }
  940. }
  941. if stat == nil {
  942. continue
  943. }
  944. if _, ok := res[mi.MajorMinor]; !ok {
  945. res[mi.MajorMinor] = map[string]*proc.FSStat{}
  946. }
  947. res[mi.MajorMinor][mi.MountPoint] = stat
  948. }
  949. return res
  950. }
  951. func (c *Container) getListens() map[netaddr.IPPort]int {
  952. res := map[netaddr.IPPort]int{}
  953. for addr, byPid := range c.listens {
  954. open := 0
  955. isHostNs := false
  956. ips := map[netaddr.IP]bool{}
  957. for pid, details := range byPid {
  958. p := c.processes[pid]
  959. if p == nil {
  960. continue
  961. }
  962. if p.isHostNs() {
  963. isHostNs = true
  964. }
  965. if details.ClosedAt.IsZero() {
  966. open = 1
  967. }
  968. for _, ip := range details.NsIPs {
  969. ips[ip] = true
  970. }
  971. }
  972. if !addr.IP().IsUnspecified() {
  973. ips = map[netaddr.IP]bool{addr.IP(): true}
  974. }
  975. for ip := range ips {
  976. if ip.IsLoopback() && !isHostNs {
  977. continue
  978. }
  979. res[netaddr.IPPortFrom(ip, addr.Port())] = open
  980. }
  981. }
  982. return res
  983. }
  984. func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {
  985. if len(c.metadata.hostListens) == 0 {
  986. return nil
  987. }
  988. hasUnspecified := false
  989. for _, addrs := range c.metadata.hostListens {
  990. for _, addr := range addrs {
  991. if addr.IP().IsUnspecified() {
  992. hasUnspecified = true
  993. break
  994. }
  995. }
  996. }
  997. var hostIps []netaddr.IP
  998. if hasUnspecified {
  999. if ns, err := proc.GetHostNetNs(); err != nil {
  1000. klog.Warningln(err)
  1001. } else {
  1002. ips, err := proc.GetNsIps(ns)
  1003. _ = ns.Close()
  1004. if err != nil {
  1005. klog.Warningln(err)
  1006. } else {
  1007. hostIps = ips
  1008. }
  1009. }
  1010. }
  1011. res := map[string]map[netaddr.IPPort]struct{}{}
  1012. for proxy, addrs := range c.metadata.hostListens {
  1013. res[proxy] = map[netaddr.IPPort]struct{}{}
  1014. for _, addr := range addrs {
  1015. if addr.IP().IsUnspecified() {
  1016. for _, ip := range hostIps {
  1017. if addr.IP().Is4() && ip.Is4() || addr.IP().Is6() && ip.Is6() {
  1018. res[proxy][netaddr.IPPortFrom(ip, addr.Port())] = struct{}{}
  1019. }
  1020. }
  1021. } else {
  1022. res[proxy][addr] = struct{}{}
  1023. }
  1024. }
  1025. }
  1026. return res
  1027. }
  1028. func (c *Container) ping() map[netaddr.IP]float64 {
  1029. netNs := netns.None()
  1030. for pid := range c.processes {
  1031. if pid == agentPid {
  1032. netNs = selfNetNs
  1033. break
  1034. }
  1035. ns, err := proc.GetNetNs(pid)
  1036. if err != nil {
  1037. if !common.IsNotExist(err) {
  1038. klog.Warningln(err)
  1039. }
  1040. continue
  1041. }
  1042. netNs = ns
  1043. defer netNs.Close()
  1044. break
  1045. }
  1046. if !netNs.IsOpen() {
  1047. return nil
  1048. }
  1049. ips := map[netaddr.IP]struct{}{}
  1050. for d := range c.connectsSuccessful {
  1051. ips[d.dst.IP()] = struct{}{}
  1052. }
  1053. for dst := range c.connectsFailed {
  1054. ips[dst.src.IP()] = struct{}{}
  1055. }
  1056. if len(ips) == 0 {
  1057. return nil
  1058. }
  1059. targets := make([]netaddr.IP, 0, len(ips))
  1060. for ip := range ips {
  1061. if ip.IsLoopback() {
  1062. continue
  1063. }
  1064. if !ip.Is4() { // pinger doesn't support IPv6 yet
  1065. continue
  1066. }
  1067. targets = append(targets, ip)
  1068. }
  1069. rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout)
  1070. if err != nil {
  1071. klog.Warningln(err)
  1072. return nil
  1073. }
  1074. return rtt
  1075. }
  1076. func (c *Container) runLogParser(logPath string) {
  1077. if *flags.DisableLogParsing {
  1078. return
  1079. }
  1080. containerId := string(c.id)
  1081. if logPath != "" {
  1082. if c.logParsers[logPath] != nil {
  1083. return
  1084. }
  1085. ch := make(chan logparser.LogEntry)
  1086. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1087. reader, err := logs.NewTailReader(proc.HostPath(logPath), ch)
  1088. if err != nil {
  1089. klog.Warningln(err)
  1090. parser.Stop()
  1091. return
  1092. }
  1093. klog.Debugln("started varlog logparser", "cg", c.cgroup.Id, "log", logPath)
  1094. c.logParsers[logPath] = &LogParser{parser: parser, stop: reader.Stop}
  1095. return
  1096. }
  1097. switch c.cgroup.ContainerType {
  1098. case cgroup.ContainerTypeSystemdService:
  1099. ch := make(chan logparser.LogEntry)
  1100. if err := JournaldSubscribe(c.cgroup, ch); err != nil {
  1101. klog.Warningln(err)
  1102. return
  1103. }
  1104. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1105. stop := func() {
  1106. JournaldUnsubscribe(c.cgroup)
  1107. }
  1108. klog.Infoln("started journald logparser", "cg", c.cgroup.Id)
  1109. c.logParsers["journald"] = &LogParser{parser: parser, stop: stop}
  1110. case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd, cgroup.ContainerTypeCrio:
  1111. if c.metadata.logPath == "" {
  1112. return
  1113. }
  1114. if parser := c.logParsers["stdout/stderr"]; parser != nil {
  1115. parser.Stop()
  1116. delete(c.logParsers, "stdout/stderr")
  1117. }
  1118. ch := make(chan logparser.LogEntry)
  1119. parser := logparser.NewParser(ch, c.metadata.logDecoder, logs.OtelLogEmitter(containerId))
  1120. reader, err := logs.NewTailReader(proc.HostPath(c.metadata.logPath), ch)
  1121. if err != nil {
  1122. klog.Warningln(err)
  1123. parser.Stop()
  1124. return
  1125. }
  1126. klog.Infoln("started container logparser", "cg", c.cgroup.Id)
  1127. c.logParsers["stdout/stderr"] = &LogParser{parser: parser, stop: reader.Stop}
  1128. }
  1129. }
  1130. func (c *Container) gc(now time.Time) {
  1131. // c.lock.Lock()
  1132. // defer c.lock.Unlock()
  1133. established := map[AddrPair]struct{}{}
  1134. establishedDst := map[netaddr.IPPort]struct{}{}
  1135. listens := map[netaddr.IPPort]string{}
  1136. seenNamespaces := map[string]bool{}
  1137. fdMap := map[uint64]struct{}{}
  1138. for _, p := range c.processes {
  1139. if seenNamespaces[p.NetNsId()] {
  1140. continue
  1141. }
  1142. sockets, err := proc.GetSockets(p.Pid)
  1143. if err != nil {
  1144. continue
  1145. }
  1146. fds, err := proc.ReadFds(p.Pid)
  1147. if err == nil {
  1148. for _, fd := range fds {
  1149. fdMap[fd.Fd] = struct{}{}
  1150. }
  1151. }
  1152. for _, s := range sockets {
  1153. if s.Listen {
  1154. listens[s.SAddr] = s.Inode
  1155. } else {
  1156. established[AddrPair{src: s.SAddr, dst: s.DAddr}] = struct{}{}
  1157. establishedDst[s.DAddr] = struct{}{}
  1158. }
  1159. }
  1160. seenNamespaces[p.NetNsId()] = true
  1161. }
  1162. c.revalidateListens(now, listens)
  1163. for srcDst, conn := range c.connectionsActive {
  1164. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1165. if _, ok := established[srcDst]; !ok {
  1166. delete(c.connectionsActive, srcDst)
  1167. if conn == c.connectionsByPidFd[pidFd] {
  1168. delete(c.connectionsByPidFd, pidFd)
  1169. }
  1170. continue
  1171. }
  1172. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1173. delete(c.connectionsActive, srcDst)
  1174. if conn == c.connectionsByPidFd[pidFd] {
  1175. delete(c.connectionsByPidFd, pidFd)
  1176. }
  1177. }
  1178. }
  1179. for srcDst, conn := range c.acceptsActive {
  1180. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1181. if _, ok := established[srcDst]; !ok {
  1182. delete(c.acceptsActive, srcDst)
  1183. if conn == c.acceptsByPidFd[pidFd] {
  1184. delete(c.acceptsByPidFd, pidFd)
  1185. }
  1186. continue
  1187. }
  1188. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1189. delete(c.acceptsActive, srcDst)
  1190. if conn == c.acceptsByPidFd[pidFd] {
  1191. delete(c.acceptsByPidFd, pidFd)
  1192. }
  1193. }
  1194. }
  1195. for _, conn := range c.connectionsByPidFd {
  1196. if _, ok := fdMap[conn.Fd]; !ok {
  1197. delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1198. }
  1199. }
  1200. for _, conn := range c.acceptsByPidFd {
  1201. if _, ok := fdMap[conn.Fd]; !ok {
  1202. delete(c.acceptsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1203. }
  1204. }
  1205. for dst, at := range c.connectLastAttempt {
  1206. _, active := establishedDst[dst]
  1207. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1208. delete(c.connectLastAttempt, dst)
  1209. // delete(c.connectsFailed, dst)
  1210. for dfailed := range c.connectsFailed {
  1211. if dfailed.src == dst {
  1212. delete(c.connectsFailed, dfailed)
  1213. }
  1214. }
  1215. for d := range c.connectsSuccessful {
  1216. if d.src == dst {
  1217. delete(c.connectsSuccessful, d)
  1218. }
  1219. }
  1220. c.l7Stats.delete(dst)
  1221. }
  1222. }
  1223. for dst, at := range c.acceptLastAttempt {
  1224. _, active := establishedDst[dst]
  1225. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1226. delete(c.acceptLastAttempt, dst)
  1227. for d := range c.acceptsSuccessful {
  1228. if d.src == dst {
  1229. delete(c.acceptsSuccessful, d)
  1230. }
  1231. }
  1232. c.l7Stats.delete(dst)
  1233. }
  1234. }
  1235. }
  1236. func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.IPPort]string) {
  1237. for addr, byPid := range c.listens {
  1238. if _, open := actualListens[addr]; open {
  1239. continue
  1240. }
  1241. klog.Warningln("deleting the outdated listen:", addr)
  1242. for _, details := range byPid {
  1243. if details.ClosedAt.IsZero() {
  1244. details.ClosedAt = now
  1245. }
  1246. }
  1247. }
  1248. missingListens := map[netaddr.IPPort]string{}
  1249. for addr, inode := range actualListens {
  1250. byPids, found := c.listens[addr]
  1251. if !found {
  1252. missingListens[addr] = inode
  1253. continue
  1254. }
  1255. open := false
  1256. for _, details := range byPids {
  1257. if details.ClosedAt.IsZero() {
  1258. open = true
  1259. break
  1260. }
  1261. }
  1262. if !open {
  1263. missingListens[addr] = inode
  1264. }
  1265. }
  1266. if len(missingListens) > 0 {
  1267. inodeToPid := map[string]uint32{}
  1268. for pid := range c.processes {
  1269. fds, err := proc.ReadFds(pid)
  1270. if err != nil {
  1271. klog.Warningln(err)
  1272. continue
  1273. }
  1274. for _, fd := range fds {
  1275. if fd.SocketInode != "" {
  1276. inodeToPid[fd.SocketInode] = pid
  1277. }
  1278. }
  1279. }
  1280. for addr, inode := range missingListens {
  1281. pid, found := inodeToPid[inode]
  1282. if !found {
  1283. continue
  1284. }
  1285. //klog.Warningln("missing listen found:", addr, pid)
  1286. c.onListenOpen(pid, addr, true)
  1287. }
  1288. }
  1289. for addr, pids := range c.listens {
  1290. for pid, details := range pids {
  1291. if !details.ClosedAt.IsZero() && now.Sub(details.ClosedAt) > gcInterval {
  1292. delete(c.listens[addr], pid)
  1293. }
  1294. }
  1295. if len(c.listens[addr]) == 0 {
  1296. delete(c.listens, addr)
  1297. }
  1298. }
  1299. }
  1300. func (c *Container) AttachUprobes(tracer *ebpftracer.Tracer, pid uint32, _type string) error {
  1301. klog.Infoln("[attach] attachUprobes start by :", _type)
  1302. if tracer.DisableL7Tracing() {
  1303. return nil
  1304. }
  1305. codeType := c.GetCodeTypeFromCache(pid)
  1306. if codeType.IsUnknownCode() {
  1307. return nil
  1308. }
  1309. var err error
  1310. switch codeType {
  1311. case CodeTypeJava:
  1312. err = c.attachJVMUprobes(tracer, pid)
  1313. case CodeTypeJavaAot:
  1314. err = c.attachJavaAotUprobes(tracer, pid)
  1315. case CodeTypeGo:
  1316. err = c.attachTlsUprobes(tracer, pid)
  1317. case CodeTypeNetCoreAot:
  1318. err = c.attachNetCoreUprobes(tracer, pid)
  1319. }
  1320. if err != nil {
  1321. klog.WithField("pid", pid).Errorf("[attach] error %v :", err)
  1322. deErr := c.DetachUprobes(tracer, pid, APP_UPROBE_ERROR)
  1323. if deErr != nil {
  1324. klog.WithField("pid", pid).Errorf("[attach] Detach Uprobes error %v :", deErr)
  1325. }
  1326. return err
  1327. }
  1328. return nil
  1329. }
  1330. func (c *Container) GetCodeTypeFromCache(pid uint32) CodeType {
  1331. p := c.processes[pid]
  1332. if p == nil {
  1333. return CodeTypeUnknown
  1334. }
  1335. if p.codeType.IsWaitCheck() {
  1336. p.codeType = GetExeType(pid, c.getRootfs())
  1337. }
  1338. return p.codeType
  1339. }
  1340. func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1341. p := c.processes[pid]
  1342. if p == nil {
  1343. return nil
  1344. }
  1345. if p.openSslUprobesChecked || p.goTlsUprobesChecked {
  1346. return nil
  1347. }
  1348. if !p.openSslUprobesChecked {
  1349. p.openSslUprobesChecked = true
  1350. sslProbes, err := tracer.AttachOpenSslUprobes(pid)
  1351. if err != nil {
  1352. return err
  1353. }
  1354. p.uprobes = append(p.uprobes, sslProbes...)
  1355. }
  1356. if !p.goTlsUprobesChecked {
  1357. p.goTlsUprobesChecked = true
  1358. codeType := c.GetCodeTypeFromCache(pid)
  1359. goProbes, err := tracer.AttachGoTlsUprobes(pid, &c.AppInfo, uint16(codeType))
  1360. if err != nil {
  1361. return err
  1362. }
  1363. p.uprobes = append(p.uprobes, goProbes...)
  1364. c.l7AttachSuccess()
  1365. }
  1366. return nil
  1367. }
  1368. func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1369. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1370. return nil
  1371. }
  1372. p := c.processes[pid]
  1373. if p == nil {
  1374. return nil
  1375. }
  1376. codeType := c.GetCodeTypeFromCache(pid)
  1377. if !p.jvmAttachOnce {
  1378. p.jvmAttachOnce = true
  1379. rootfs := c.getRootfs()
  1380. // TODO java Aot
  1381. if codeType.IsJvmCode() {
  1382. // check version
  1383. libjavaso, err := utils.GetSoPath(pid, "libjava.so", rootfs)
  1384. if err != nil {
  1385. klog.WithError(err).Errorf("[attach] Failed get so path")
  1386. return err
  1387. }
  1388. v, err := ebpftracer.GetJvmVersion(libjavaso)
  1389. if err != nil {
  1390. klog.WithError(err).Errorf("[attach] Failed get Java version")
  1391. return err
  1392. }
  1393. c.AppInfo.Version = v
  1394. major, minor, patch, err := ebpftracer.ParseVersion(v)
  1395. klog.Infof("[attach] version: %s (Major: %d, Minor: %d, Patch: %d)", v, major, minor, patch)
  1396. if major != 1 || minor != 8 {
  1397. klog.Errorf("[attach] Unsupported Java version.")
  1398. return fmt.Errorf("[attach] Unsupported Java version")
  1399. }
  1400. }
  1401. /*libNioProbes, err := tracer.AttachJavaNioReadUprobes(pid, codeType, rootfs)
  1402. if err != nil {
  1403. klog.Error(err)
  1404. return err
  1405. }
  1406. p.uprobes = append(p.uprobes, libNioProbes...)*/
  1407. libNetProbes, err := tracer.AttachJavaNetWriteUprobes(pid, rootfs)
  1408. if err != nil {
  1409. klog.Error(err)
  1410. return err
  1411. }
  1412. p.uprobes = append(p.uprobes, libNetProbes...)
  1413. //p.jvmUprobesChecked = true
  1414. c.l7AttachSuccess()
  1415. err = tracer.InitKProcInfo(pid, &c.AppInfo)
  1416. if err != nil {
  1417. klog.Errorf("[attach] InitKProcInfo failed pid:[%d] ;%s.", pid, err.Error())
  1418. return err
  1419. } else {
  1420. klog.Infof("[attach] InitKProcInfo succeed! pid:[%d]", pid)
  1421. }
  1422. } else {
  1423. klog.Infof("[attach] %s-%d already attach status:%v", codeType.String(), pid, c.Isl7AttachSuccess())
  1424. }
  1425. return nil
  1426. }
  1427. func (c *Container) attachJavaAotUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1428. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1429. return nil
  1430. }
  1431. p := c.processes[pid]
  1432. if p == nil {
  1433. return nil
  1434. }
  1435. codeType := c.GetCodeTypeFromCache(pid)
  1436. if !p.jvmAttachOnce {
  1437. p.jvmAttachOnce = true
  1438. rootfs := c.getRootfs()
  1439. // // TODO java Aot
  1440. // if codeType.IsJavaAotCode() {
  1441. // // check version
  1442. // libjavaso, err := utils.GetSoPath(pid, "libjava.so", rootfs)
  1443. // if err != nil {
  1444. // klog.WithError(err).Errorf("[attach] Failed get so path")
  1445. // return err
  1446. // }
  1447. // v, err := ebpftracer.GetJvmVersion(libjavaso)
  1448. // if err != nil {
  1449. // klog.WithError(err).Errorf("[attach] Failed get Java version")
  1450. // return err
  1451. // }
  1452. // c.AppInfo.Version = v
  1453. // major, minor, patch, err := ebpftracer.ParseVersion(v)
  1454. // klog.Infof("[attach] version: %s (Major: %d, Minor: %d, Patch: %d)", v, major, minor, patch)
  1455. // if major != 1 || minor != 8 {
  1456. // return fmt.Errorf("[attach] Unsupported Java version")
  1457. // }
  1458. // }
  1459. libNioProbes, err := tracer.AttachJavaAotNioReadUprobes(pid, codeType, rootfs)
  1460. if err != nil {
  1461. klog.Error(err)
  1462. return err
  1463. }
  1464. p.uprobes = append(p.uprobes, libNioProbes...)
  1465. libNetProbes, err := tracer.AttachJavaAotNetWriteUprobes(pid, rootfs)
  1466. if err != nil {
  1467. klog.Error(err)
  1468. return err
  1469. }
  1470. p.uprobes = append(p.uprobes, libNetProbes...)
  1471. //p.jvmUprobesChecked = true
  1472. c.l7AttachSuccess()
  1473. tracer.InitKProcInfo(pid, &c.AppInfo)
  1474. } else {
  1475. klog.Infof("[attach] %s-%d already attach", codeType.String(), pid)
  1476. }
  1477. return nil
  1478. }
  1479. func (c *Container) errorClose(pid uint32, closeType int) {
  1480. p := c.processes[pid]
  1481. if p != nil {
  1482. p.DynamicClose(closeType)
  1483. }
  1484. }
  1485. func (c *Container) attachNetCoreUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1486. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1487. return nil
  1488. }
  1489. p := c.processes[pid]
  1490. if p == nil {
  1491. return nil
  1492. }
  1493. if !p.jvmAttachOnce {
  1494. p.jvmAttachOnce = true
  1495. //codeType := c.GetCodeTypeFromCache(pid)
  1496. tracer.InitKProcInfo(pid, &c.AppInfo)
  1497. p.uprobes = append(p.uprobes, tracer.AttachNetCoreNetThreadUprobes(pid)...)
  1498. readProbes, err := tracer.AttachNetCoreNetReadUprobes(pid)
  1499. if err != nil {
  1500. klog.Error(err)
  1501. return err
  1502. }
  1503. p.uprobes = append(p.uprobes, readProbes...)
  1504. WriteProbes, err := tracer.AttachNetCoreNetWriteUprobes(pid)
  1505. if err != nil {
  1506. klog.Error(err)
  1507. return err
  1508. }
  1509. p.uprobes = append(p.uprobes, WriteProbes...)
  1510. c.l7AttachSuccess()
  1511. }
  1512. return nil
  1513. }
  1514. func resolveFd(pid uint32, fd uint64) (mntId string, logPath string) {
  1515. info := proc.GetFdInfo(pid, fd)
  1516. if info == nil {
  1517. return
  1518. }
  1519. switch {
  1520. case info.Flags&os.O_WRONLY == 0 && info.Flags&os.O_RDWR == 0,
  1521. !strings.HasPrefix(info.Dest, "/"),
  1522. strings.HasPrefix(info.Dest, "/proc/"),
  1523. strings.HasPrefix(info.Dest, "/dev/"),
  1524. strings.HasPrefix(info.Dest, "/sys/"),
  1525. strings.HasSuffix(info.Dest, "(deleted)"):
  1526. return
  1527. }
  1528. mntId = info.MntId
  1529. if info.Flags&os.O_WRONLY != 0 && strings.HasPrefix(info.Dest, "/var/log/") &&
  1530. !strings.HasPrefix(info.Dest, "/var/log/pods/") &&
  1531. !strings.HasPrefix(info.Dest, "/var/log/containers/") &&
  1532. !strings.HasPrefix(info.Dest, "/var/log/journal/") {
  1533. logPath = info.Dest
  1534. }
  1535. return
  1536. }
  1537. func counter(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1538. return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labelValues...)
  1539. }
  1540. func gauge(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1541. return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...)
  1542. }