container.go 49 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786
  1. package containers
  2. import (
  3. debugelf "debug/elf"
  4. "fmt"
  5. "os"
  6. "sort"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "encoding/json"
  12. "github.com/coroot/coroot-node-agent/utils"
  13. . "github.com/coroot/coroot-node-agent/utils/modelse"
  14. "github.com/coroot/coroot-node-agent/cgroup"
  15. "github.com/coroot/coroot-node-agent/common"
  16. "github.com/coroot/coroot-node-agent/ebpftracer"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  18. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  19. "github.com/coroot/coroot-node-agent/flags"
  20. "github.com/coroot/coroot-node-agent/logs"
  21. "github.com/coroot/coroot-node-agent/node"
  22. "github.com/coroot/coroot-node-agent/pinger"
  23. "github.com/coroot/coroot-node-agent/proc"
  24. "github.com/coroot/coroot-node-agent/tracing"
  25. "github.com/coroot/logparser"
  26. "github.com/prometheus/client_golang/prometheus"
  27. klog "github.com/sirupsen/logrus"
  28. "github.com/vishvananda/netns"
  29. "golang.org/x/exp/maps"
  30. "inet.af/netaddr"
  31. )
  32. var (
  33. gcInterval = 1 * time.Minute
  34. AllAppInfoInterval = 1 * time.Minute
  35. RegisterAppInterval = 1 * time.Minute
  36. RegisterHostInterval = 10 * time.Minute
  37. pingTimeout = 300 * time.Millisecond
  38. )
  39. type ContainerID string
  40. type ContainerNetwork struct {
  41. NetworkID string
  42. }
  43. type ContainerMetadata struct {
  44. name string
  45. labels map[string]string
  46. volumes map[string]string
  47. logPath string
  48. image string
  49. logDecoder logparser.Decoder
  50. hostListens map[string][]netaddr.IPPort
  51. networks map[string]ContainerNetwork
  52. env map[string]string
  53. systemdTriggeredBy string
  54. rootfs string
  55. }
  56. type Delays struct {
  57. cpu time.Duration
  58. disk time.Duration
  59. }
  60. type LogParser struct {
  61. parser *logparser.Parser
  62. stop func()
  63. }
  64. func (p *LogParser) Stop() {
  65. if p.stop != nil {
  66. p.stop()
  67. }
  68. p.parser.Stop()
  69. }
  70. type AddrPair struct {
  71. src netaddr.IPPort
  72. dst netaddr.IPPort
  73. }
  74. type ActiveConnection struct {
  75. Dest netaddr.IPPort
  76. Src netaddr.IPPort
  77. ActualDest netaddr.IPPort
  78. Pid uint32
  79. Fd uint64
  80. Timestamp uint64
  81. Closed time.Time
  82. Retransmissions uint64
  83. BytesSent uint64
  84. PerBytesSent uint64
  85. BytesReceived uint64
  86. PerBytesReceived uint64
  87. ConEstTime time.Duration
  88. FirstReadTime uint64
  89. FirstWriteTime uint64
  90. NewReadTime uint64
  91. http2Parser *l7.Http2Parser
  92. postgresParser *l7.PostgresParser
  93. mysqlParser *l7.MysqlParser
  94. dmParser *l7.DmParser
  95. }
  96. type ActiveAccept struct {
  97. Dest netaddr.IPPort
  98. Src netaddr.IPPort
  99. Pid uint32
  100. Fd uint64
  101. Timestamp uint64
  102. Closed time.Time
  103. BytesSent uint64
  104. BytesReceived uint64
  105. }
  106. type ListenDetails struct {
  107. ClosedAt time.Time
  108. NsIPs []netaddr.IP
  109. }
  110. type PidFd struct {
  111. Pid uint32
  112. Fd uint64
  113. }
  114. type K8sContainer struct {
  115. ns string
  116. podName string
  117. podId string
  118. workload string
  119. containerName string
  120. pid string
  121. }
  122. type ConnectionStats struct {
  123. Count uint64
  124. TotalTime time.Duration
  125. Retransmissions uint64
  126. BytesSent uint64
  127. PerBytesSent uint64
  128. BytesReceived uint64
  129. PerBytesReceived uint64
  130. Src netaddr.IPPort
  131. ConEstTime time.Duration
  132. FirstReadTime uint64
  133. FirstWriteTime uint64
  134. NewReadTime uint64
  135. }
  136. type ProcessOffCPUTime struct {
  137. OffCpuTime uint64
  138. OffNetTime uint64
  139. OffEpollTime uint64
  140. OffFileTime uint64
  141. OffFutexTime uint64
  142. OffMemTime uint64
  143. OffMemmapTime uint64
  144. OffMmapTime uint64
  145. RunqTime uint64
  146. CPUOnTime uint64
  147. Url string
  148. SAddr netaddr.IPPort
  149. }
  150. type ProcessOffCPUTimeData struct {
  151. OffCpuTime uint64 `json:"offCpuTime"`
  152. OffNetTime uint64 `json:"offNetTime"`
  153. OffEpollTime uint64 `json:"offEpollTime"`
  154. OffFileTime uint64 `json:"offFileTime"`
  155. OffFutexTime uint64 `json:"offFutexTime"`
  156. OffMemTime uint64 `json:"offMemTime"`
  157. OffMemmapTime uint64 `json:"offMemmapTime"`
  158. OffMmapTime uint64 `json:"offMmapTime"`
  159. RunqTime uint64 `json:"runqTime"`
  160. CPUOnTime uint64 `json:"cpuOnTime"`
  161. Url string `json:"url"`
  162. Sip string `json:"sip"`
  163. Sport uint16 `json:"sport"`
  164. Pid uint32 `json:"pid"`
  165. InstanceId int64 `json:"instanceId"`
  166. InstanceName string `json:"instanceName"`
  167. ServerName string `json:"serverName"`
  168. }
  169. type AcceptStats struct {
  170. BytesSent uint64
  171. BytesReceived uint64
  172. }
  173. type Container struct {
  174. id ContainerID
  175. cgroup *cgroup.Cgroup
  176. metadata *ContainerMetadata
  177. K8sContainer
  178. processes map[uint32]*Process
  179. startedAt time.Time
  180. zombieAt time.Time
  181. restarts int
  182. delays Delays
  183. delaysByPid map[uint32]Delays
  184. delaysLock sync.Mutex
  185. listens map[netaddr.IPPort]map[uint32]*ListenDetails
  186. connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count
  187. // connectsFailed map[netaddr.IPPort]int64 // dst -> count
  188. connectsFailed map[AddrPair]int64
  189. connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  190. connectionsActive map[AddrPair]*ActiveConnection
  191. connectionsByPidFd map[PidFd]*ActiveConnection
  192. acceptsSuccessful map[AddrPair]*AcceptStats
  193. acceptLastAttempt map[netaddr.IPPort]time.Time // dst -> time
  194. acceptsActive map[AddrPair]*ActiveAccept
  195. acceptsByPidFd map[PidFd]*ActiveAccept
  196. offCpuTimeInfo map[string][]*ProcessOffCPUTime
  197. l7Stats L7Stats
  198. dnsStats *L7Metrics
  199. oomKills int
  200. pythonThreadLockWaitTime time.Duration
  201. mounts map[string]proc.MountInfo
  202. logParsers map[string]*LogParser
  203. hostConntrack *Conntrack
  204. nsConntrack *Conntrack
  205. lbConntracks []*Conntrack
  206. registry *Registry
  207. lock sync.RWMutex
  208. done chan struct{}
  209. traceMap map[uint64]*tracing.Trace
  210. //instanceID utils.ID
  211. Symbols []debugelf.Symbol
  212. Uprobes []tracer.Uprobe
  213. UprobesMap map[string]tracer.Uprobe
  214. l7EventReady bool
  215. l7Attach bool
  216. // 白名单详情
  217. WhiteSettingInfo WhiteSettingInfo
  218. // 应用详情
  219. AppInfo AppInfo
  220. RegTime int
  221. }
  222. func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) {
  223. netNs, err := proc.GetNetNs(pid)
  224. if err != nil {
  225. return nil, err
  226. }
  227. defer netNs.Close()
  228. c := &Container{
  229. id: id,
  230. cgroup: cg,
  231. metadata: md,
  232. processes: map[uint32]*Process{},
  233. delaysByPid: map[uint32]Delays{},
  234. listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
  235. connectsSuccessful: map[AddrPair]*ConnectionStats{},
  236. // connectsFailed: map[netaddr.IPPort]int64{},
  237. connectsFailed: map[AddrPair]int64{},
  238. connectLastAttempt: map[netaddr.IPPort]time.Time{},
  239. connectionsActive: map[AddrPair]*ActiveConnection{},
  240. connectionsByPidFd: map[PidFd]*ActiveConnection{},
  241. acceptsSuccessful: map[AddrPair]*AcceptStats{},
  242. acceptLastAttempt: map[netaddr.IPPort]time.Time{},
  243. acceptsActive: map[AddrPair]*ActiveAccept{},
  244. acceptsByPidFd: map[PidFd]*ActiveAccept{},
  245. l7Stats: L7Stats{},
  246. dnsStats: &L7Metrics{},
  247. mounts: map[string]proc.MountInfo{},
  248. logParsers: map[string]*LogParser{},
  249. hostConntrack: hostConntrack,
  250. done: make(chan struct{}),
  251. traceMap: make(map[uint64]*tracing.Trace),
  252. registry: registry,
  253. offCpuTimeInfo:make(map[string][]*ProcessOffCPUTime),
  254. }
  255. for _, n := range md.networks {
  256. if nsHandle := FindNetworkLoadBalancerNs(n.NetworkID); nsHandle.IsOpen() {
  257. if ct, err := NewConntrack(nsHandle); err != nil {
  258. klog.Warningln(err)
  259. } else {
  260. c.lbConntracks = append(c.lbConntracks, ct)
  261. }
  262. _ = nsHandle.Close()
  263. }
  264. }
  265. c.runLogParser("")
  266. // go func() {
  267. // ticker := time.NewTicker(gcInterval)
  268. // defer ticker.Stop()
  269. // for {
  270. // select {
  271. // case <-c.done:
  272. // return
  273. // case t := <-ticker.C:
  274. // c.gc(t)
  275. // }
  276. // }
  277. // }()
  278. return c, nil
  279. }
  280. func (c *Container) Close() {
  281. for _, p := range c.logParsers {
  282. p.Stop()
  283. }
  284. for _, ct := range c.lbConntracks {
  285. _ = ct.Close()
  286. }
  287. if c.nsConntrack != nil {
  288. _ = c.nsConntrack.Close()
  289. }
  290. close(c.done)
  291. }
  292. func (c *Container) Dead(now time.Time) bool {
  293. return !c.zombieAt.IsZero() && now.Sub(c.zombieAt) > gcInterval
  294. }
  295. func (c *Container) Describe(ch chan<- *prometheus.Desc) {
  296. // some fixed metric description is required here to register/unregister the collector correctly
  297. ch <- prometheus.NewDesc("container", "", nil, nil)
  298. }
  299. func (c *Container) Collect(ch chan<- prometheus.Metric) {
  300. c.registry.updateTrafficStatsIfNecessary()
  301. c.lock.RLock()
  302. defer c.lock.RUnlock()
  303. if c.metadata.image != "" || c.metadata.systemdTriggeredBy != "" {
  304. ch <- gauge(metrics.ContainerInfo, 1, c.metadata.image, c.metadata.systemdTriggeredBy)
  305. }
  306. ch <- counter(metrics.Restarts, float64(c.restarts))
  307. if cpu, err := c.cgroup.CpuStat(); err == nil {
  308. if cpu.LimitCores > 0 {
  309. ch <- gauge(metrics.CPULimit, cpu.LimitCores)
  310. }
  311. ch <- counter(metrics.CPUUsage, cpu.UsageSeconds)
  312. ch <- counter(metrics.ThrottledTime, cpu.ThrottledTimeSeconds)
  313. }
  314. if taskstatsClient != nil {
  315. c.updateDelays()
  316. ch <- counter(metrics.CPUDelay, float64(c.delays.cpu)/float64(time.Second))
  317. ch <- counter(metrics.DiskDelay, float64(c.delays.disk)/float64(time.Second))
  318. }
  319. if s, err := c.cgroup.MemoryStat(); err == nil {
  320. ch <- gauge(metrics.MemoryRss, float64(s.RSS))
  321. ch <- gauge(metrics.MemoryCache, float64(s.Cache))
  322. if s.Limit > 0 {
  323. ch <- gauge(metrics.MemoryLimit, float64(s.Limit))
  324. }
  325. }
  326. if c.oomKills > 0 {
  327. ch <- counter(metrics.OOMKills, float64(c.oomKills))
  328. }
  329. if disks, err := node.GetDisks(); err == nil {
  330. ioStat, _ := c.cgroup.IOStat()
  331. for majorMinor, mounts := range c.getMounts() {
  332. dev := disks.GetParentBlockDevice(majorMinor)
  333. if dev == nil {
  334. continue
  335. }
  336. for mountPoint, fsStat := range mounts {
  337. dls := []string{mountPoint, dev.Name, c.metadata.volumes[mountPoint]}
  338. ch <- gauge(metrics.DiskSize, float64(fsStat.CapacityBytes), dls...)
  339. ch <- gauge(metrics.DiskUsed, float64(fsStat.UsedBytes), dls...)
  340. ch <- gauge(metrics.DiskReserved, float64(fsStat.ReservedBytes), dls...)
  341. if io, ok := ioStat[majorMinor]; ok {
  342. ch <- counter(metrics.DiskReadOps, float64(io.ReadOps), dls...)
  343. ch <- counter(metrics.DiskReadBytes, float64(io.ReadBytes), dls...)
  344. ch <- counter(metrics.DiskWriteOps, float64(io.WriteOps), dls...)
  345. ch <- counter(metrics.DiskWriteBytes, float64(io.WrittenBytes), dls...)
  346. }
  347. }
  348. }
  349. }
  350. for addr, open := range c.getListens() {
  351. ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "")
  352. }
  353. for proxy, addrs := range c.getProxiedListens() {
  354. for addr := range addrs {
  355. ch <- gauge(metrics.NetListenInfo, 1, addr.String(), proxy)
  356. }
  357. }
  358. strInstanceID := strconv.FormatInt(c.AppInfo.InstanceIdHash.IntVal, 10)
  359. strAppId := strconv.FormatInt(c.AppInfo.AppIdHash.IntVal, 10)
  360. strAppName := c.AppInfo.AppName
  361. apps := c.registry.GetAllAppInfoFromServer()
  362. for d, stats := range c.connectsSuccessful {
  363. targetAppId := apps[d.dst.String()]
  364. ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  365. ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  366. stats.Count = 0
  367. // if stats.Retransmissions > 0 {
  368. // ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.src.String(), d.dst.String())
  369. // }
  370. // ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  371. // ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  372. // ch <- counter(metrics.NetBytesSentPer, float64(stats.PerBytesSent), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  373. // ch <- counter(metrics.NetBytesReceivedPer, float64(stats.PerBytesReceived), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  374. // ch <- counter(metrics.NetDataLatency, float64(stats.FirstReadTime-stats.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  375. // ch <- counter(metrics.NetDataDuration, float64(stats.NewReadTime-stats.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  376. // ch <- counter(metrics.NetEstTime, float64(stats.ConEstTime), strInstanceID, strAppId, targetAppId, strAppName, stats.Src.String(), d.dst.String(), d.dst.String())
  377. // klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime)
  378. // stats.PerBytesReceived = 0
  379. // stats.PerBytesSent = 0
  380. }
  381. // for d, stats := range c.acceptsSuccessful {
  382. // ch <- counter(metrics.NetAcceptsSuccessful, float64(0), d.src.String(), d.dst.String())
  383. // ch <- counter(metrics.NetAcceptBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String())
  384. // ch <- counter(metrics.NetAcceptBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String())
  385. // klog.Infof("c.acceptsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d", d.src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived)
  386. // }
  387. // for dst, count := range c.connectsFailed {
  388. // targetAppId := apps[dst.String()]
  389. // ch <- counter(metrics.NetConnectionsFailed, float64(count), strInstanceID, strAppId, targetAppId, strAppName, dst.String())
  390. // }
  391. for addrPair, count := range c.connectsFailed {
  392. targetAppId := apps[addrPair.dst.String()]
  393. ch <- counter(metrics.NetConnectionsFailed, float64(count), strInstanceID, strAppId, targetAppId, strAppName, addrPair.src.String(), addrPair.dst.String())
  394. c.connectsFailed[addrPair] = 0
  395. }
  396. connections := map[AddrPair]int{}
  397. for addrPair, conn := range c.connectionsActive {
  398. targetAppId := apps[conn.Dest.String()]
  399. if conn.Retransmissions > 0 {
  400. ch <- counter(metrics.NetRetransmits, float64(conn.Retransmissions), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  401. conn.Retransmissions = 0
  402. }
  403. ch <- counter(metrics.NetBytesSent, float64(conn.BytesSent), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  404. ch <- counter(metrics.NetBytesReceived, float64(conn.BytesReceived), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  405. ch <- counter(metrics.NetBytesSentPer, float64(conn.PerBytesSent), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  406. ch <- counter(metrics.NetBytesReceivedPer, float64(conn.PerBytesReceived), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  407. ch <- counter(metrics.NetDataLatency, float64(conn.FirstReadTime-conn.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  408. ch <- counter(metrics.NetDataDuration, float64(conn.NewReadTime-conn.FirstWriteTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  409. ch <- counter(metrics.NetEstTime, float64(conn.ConEstTime), strInstanceID, strAppId, targetAppId, strAppName, conn.Src.String(), conn.Dest.String(), conn.ActualDest.String())
  410. // klog.Infof("c.connectsSuccessful d.src=%s d.dst=%s stats.BytesSent=%d,stats.BytesReceived=%d stats.PerBytesSent=%d,stats.PerBytesReceived=%d,stats.datalatency=%d,stats.dataduration=%d,stats.estTime=%d", stats.Src.String(), d.dst.String(), stats.BytesSent, stats.BytesReceived, stats.PerBytesSent, stats.PerBytesReceived, stats.FirstReadTime-stats.FirstWriteTime, stats.NewReadTime-stats.FirstWriteTime, stats.ConEstTime)
  411. conn.PerBytesReceived = 0
  412. conn.PerBytesSent = 0
  413. if !conn.Closed.IsZero() {
  414. continue
  415. }
  416. connections[AddrPair{src: addrPair.dst, dst: conn.ActualDest}]++
  417. }
  418. for d, count := range connections {
  419. ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
  420. }
  421. for source, p := range c.logParsers {
  422. for _, c := range p.parser.GetCounters() {
  423. ch <- counter(metrics.LogMessages, float64(c.Messages), source, c.Level.String(), c.Hash, c.Sample)
  424. }
  425. }
  426. appTypes := map[string]struct{}{}
  427. seenJvms := map[string]bool{}
  428. seenDotNetApps := map[string]bool{}
  429. pids := maps.Keys(c.processes)
  430. sort.Slice(pids, func(i, j int) bool {
  431. return pids[i] < pids[j]
  432. })
  433. for _, pid := range pids {
  434. process := c.processes[pid]
  435. cmdline := proc.GetCmdline(pid)
  436. if len(cmdline) == 0 {
  437. continue
  438. }
  439. appType := guessApplicationType(cmdline)
  440. if appType != "" {
  441. appTypes[appType] = struct{}{}
  442. }
  443. if process.isGolangApp {
  444. appTypes["golang"] = struct{}{}
  445. }
  446. switch {
  447. case isJvm(cmdline):
  448. jvm, jMetrics := jvmMetrics(pid)
  449. if len(jMetrics) > 0 && !seenJvms[jvm] {
  450. seenJvms[jvm] = true
  451. for _, m := range jMetrics {
  452. ch <- m
  453. }
  454. }
  455. case process.dotNetMonitor != nil:
  456. appTypes["dotnet"] = struct{}{}
  457. appName := process.dotNetMonitor.AppName()
  458. if !seenDotNetApps[appName] {
  459. seenDotNetApps[appName] = true
  460. process.dotNetMonitor.Collect(ch)
  461. }
  462. }
  463. }
  464. for appType := range appTypes {
  465. ch <- gauge(metrics.ApplicationType, 1, appType)
  466. }
  467. if c.pythonThreadLockWaitTime > 0 {
  468. ch <- counter(metrics.PythonThreadLockWaitTime, c.pythonThreadLockWaitTime.Seconds())
  469. }
  470. if c.dnsStats.Requests != nil {
  471. c.dnsStats.Requests.Collect(ch)
  472. }
  473. if c.dnsStats.Latency != nil {
  474. c.dnsStats.Latency.Collect(ch)
  475. }
  476. c.l7Stats.collect(ch)
  477. if !*flags.DisablePinger {
  478. for ip, rtt := range c.ping() {
  479. ch <- gauge(metrics.NetLatency, rtt, ip.String())
  480. }
  481. }
  482. c.gc(time.Now())
  483. }
  484. func (c *Container) onProcessStart(pid uint32) *Process {
  485. c.lock.Lock()
  486. defer c.lock.Unlock()
  487. stats, err := TaskstatsPID(pid)
  488. if err != nil {
  489. klog.WithError(err).Debugf("Failed onProcessStart [%d]", pid)
  490. return nil
  491. }
  492. c.zombieAt = time.Time{}
  493. p := NewProcess(pid, stats, c.registry.tracer)
  494. if p == nil {
  495. return nil
  496. }
  497. c.processes[pid] = p
  498. if c.startedAt.IsZero() {
  499. c.startedAt = stats.BeginTime
  500. } else {
  501. min := stats.BeginTime
  502. for _, p := range c.processes {
  503. if p.StartedAt.Before(min) {
  504. min = p.StartedAt
  505. }
  506. }
  507. if min.After(c.startedAt) {
  508. c.restarts++
  509. c.startedAt = min
  510. }
  511. }
  512. return p
  513. }
  514. func (c *Container) onProcessExit(pid uint32, oomKill bool) {
  515. c.lock.Lock()
  516. defer c.lock.Unlock()
  517. if p := c.processes[pid]; p != nil {
  518. p.Close()
  519. }
  520. delete(c.processes, pid)
  521. if len(c.processes) == 0 {
  522. c.zombieAt = time.Now()
  523. }
  524. delete(c.delaysByPid, pid)
  525. if oomKill {
  526. c.oomKills++
  527. }
  528. }
  529. func (c *Container) onFileOpen(pid uint32, fd uint64) {
  530. mntId, logPath := resolveFd(pid, fd)
  531. func() {
  532. if mntId == "" {
  533. return
  534. }
  535. c.lock.Lock()
  536. _, ok := c.mounts[mntId]
  537. c.lock.Unlock()
  538. if ok {
  539. return
  540. }
  541. byMountId := proc.GetMountInfo(pid)
  542. if byMountId == nil {
  543. return
  544. }
  545. if mi, ok := byMountId[mntId]; ok {
  546. c.lock.Lock()
  547. c.mounts[mntId] = mi
  548. c.lock.Unlock()
  549. }
  550. }()
  551. if logPath != "" {
  552. c.lock.Lock()
  553. c.runLogParser(logPath)
  554. c.lock.Unlock()
  555. }
  556. }
  557. // set
  558. func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
  559. klog.Infof("TCP listen open pid=%d id=%s addr=%s", pid, c.id, addr)
  560. if common.PortFilter.ShouldBeSkipped(addr.Port()) {
  561. return
  562. }
  563. if !safe {
  564. c.lock.Lock()
  565. defer c.lock.Unlock()
  566. }
  567. if _, ok := c.listens[addr]; !ok {
  568. c.listens[addr] = map[uint32]*ListenDetails{}
  569. }
  570. details := &ListenDetails{}
  571. c.listens[addr][pid] = details
  572. if addr.IP().IsUnspecified() {
  573. ns, err := proc.GetNetNs(pid)
  574. if err != nil {
  575. if !common.IsNotExist(err) {
  576. klog.Warningln(err)
  577. }
  578. return
  579. }
  580. defer ns.Close()
  581. ips, err := proc.GetNsIps(ns)
  582. if err != nil {
  583. klog.Warningln(err)
  584. return
  585. }
  586. klog.Infof("got IPs %s for %s", ips, ns.UniqueId())
  587. details.NsIPs = ips
  588. }
  589. }
  590. func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
  591. klog.Infof("TCP listen close pid=%d id=%s addr=%s", pid, c.id, addr)
  592. c.lock.Lock()
  593. defer c.lock.Unlock()
  594. if _, byAddr := c.listens[addr]; byAddr {
  595. if _, byPid := c.listens[addr][pid]; byPid {
  596. if details := c.listens[addr][pid]; details != nil {
  597. details.ClosedAt = time.Now()
  598. }
  599. }
  600. }
  601. }
  602. func (c *Container) onAcceptOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  603. klog.Debugf("accept pid=%d id=%s dstaddr=%s srcaddr=%s", pid, c.id, dst.IP(), src.IP())
  604. // if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  605. // return
  606. // }
  607. p := c.processes[pid]
  608. if p == nil {
  609. return
  610. }
  611. if dst.IP().IsLoopback() && !p.isHostNs() {
  612. return
  613. }
  614. // actualDst, err := c.getActualDestination(p, src, dst)
  615. // if err != nil {
  616. // if !common.IsNotExist(err) {
  617. // klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  618. // }
  619. // return
  620. // }
  621. // switch {
  622. // case actualDst == nil:
  623. // actualDst = &dst
  624. // case actualDst.IP().IsLoopback() && !p.isHostNs():
  625. // return
  626. // }
  627. // if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  628. // return
  629. // }
  630. c.lock.Lock()
  631. defer c.lock.Unlock()
  632. if !failed {
  633. key := AddrPair{src: dst, dst: src}
  634. stats := c.acceptsSuccessful[key]
  635. if stats == nil {
  636. stats = &AcceptStats{}
  637. c.acceptsSuccessful[key] = stats
  638. }
  639. acceptCon := &ActiveAccept{
  640. Dest: src,
  641. Src: dst,
  642. Pid: pid,
  643. Fd: fd,
  644. Timestamp: timestamp,
  645. }
  646. c.acceptsActive[AddrPair{src: dst, dst: src}] = acceptCon
  647. c.acceptsByPidFd[PidFd{Pid: pid, Fd: fd}] = acceptCon
  648. }
  649. c.acceptLastAttempt[dst] = time.Now()
  650. }
  651. func (c *Container) onOffCPUTime(offcputime *l7.OffCPUData) {
  652. p := c.processes[offcputime.Pid]
  653. if p == nil {
  654. return
  655. }
  656. _, path := l7.ParseHttpOffCPU(offcputime.Payload)
  657. // klog.Infoln("onOffCPUTime payload is ", offcputime.Payload)
  658. // klog.Infoln("onOffCPUTime url is ", path)
  659. processoffcputime := &ProcessOffCPUTime{
  660. OffCpuTime: offcputime.OffCpuTime,
  661. OffNetTime: offcputime.OffNetTime,
  662. OffEpollTime: offcputime.OffEpollTime,
  663. OffFileTime: offcputime.OffFileTime,
  664. OffFutexTime: offcputime.OffFutexTime,
  665. OffMemTime: offcputime.OffMemTime,
  666. OffMemmapTime: offcputime.OffMemmapTime,
  667. OffMmapTime: offcputime.OffMmapTime,
  668. RunqTime: offcputime.RunqTime,
  669. CPUOnTime: offcputime.CPUOnTime,
  670. Url: path,
  671. SAddr: offcputime.SAddr,
  672. }
  673. // c.offCpuTimeInfo[path] = processoffcputime
  674. c.offCpuTimeInfo[path] = append(c.offCpuTimeInfo[path], processoffcputime)
  675. cmdline := p.GetCmdline()
  676. if len(cmdline) == 0 {
  677. cmdline = "default"
  678. }
  679. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s+%s+%d", offcputime.SAddr.IP(),cmdline,offcputime.SAddr.Port(),))
  680. int64InstanceID,_ := strInstanceID.ToInt64()
  681. processoffcputimedata := &ProcessOffCPUTimeData{
  682. OffCpuTime: processoffcputime.OffCpuTime,
  683. OffNetTime: processoffcputime.OffNetTime,
  684. OffEpollTime: processoffcputime.OffEpollTime,
  685. OffFileTime: processoffcputime.OffFileTime,
  686. OffFutexTime: processoffcputime.OffFutexTime,
  687. OffMemTime: processoffcputime.OffMemTime,
  688. OffMemmapTime: processoffcputime.OffMemmapTime,
  689. OffMmapTime: processoffcputime.OffMmapTime,
  690. RunqTime: processoffcputime.RunqTime,
  691. CPUOnTime: processoffcputime.CPUOnTime,
  692. Url: path,
  693. Sip: processoffcputime.SAddr.IP().String(),
  694. Sport: processoffcputime.SAddr.Port(),
  695. Pid: offcputime.Pid,
  696. InstanceId: int64InstanceID,
  697. InstanceName: "defaultInstanceName",
  698. ServerName: "defaultServerName",
  699. }
  700. // 转换为JSON
  701. jsonData, _ := json.Marshal(processoffcputimedata)
  702. klog.Infoln("onOffCPUTime processoffcputimedata is ", string(jsonData))
  703. }
  704. func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
  705. if common.PortFilter.ShouldBeSkipped(dst.Port()) {
  706. return
  707. }
  708. p := c.processes[pid]
  709. if p == nil {
  710. return
  711. }
  712. if dst.IP().IsLoopback() && !p.isHostNs() {
  713. return
  714. }
  715. actualDst, err := c.getActualDestination(p, src, dst)
  716. if err != nil {
  717. if !common.IsNotExist(err) {
  718. klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
  719. }
  720. return
  721. }
  722. switch {
  723. case actualDst == nil:
  724. actualDst = &dst
  725. case actualDst.IP().IsLoopback() && !p.isHostNs():
  726. return
  727. }
  728. if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
  729. return
  730. }
  731. c.lock.Lock()
  732. defer c.lock.Unlock()
  733. if failed {
  734. key := AddrPair{src: dst, dst: *actualDst}
  735. c.connectsFailed[key]++
  736. } else {
  737. key := AddrPair{src: dst, dst: *actualDst}
  738. stats := c.connectsSuccessful[key]
  739. if stats == nil {
  740. stats = &ConnectionStats{}
  741. c.connectsSuccessful[key] = stats
  742. }
  743. stats.Count++
  744. stats.TotalTime += duration
  745. stats.Src = src
  746. stats.ConEstTime = duration
  747. connection := &ActiveConnection{
  748. Dest: dst,
  749. Src: src,
  750. ActualDest: *actualDst,
  751. Pid: pid,
  752. Fd: fd,
  753. Timestamp: timestamp,
  754. ConEstTime: duration,
  755. }
  756. c.connectionsActive[AddrPair{src: src, dst: dst}] = connection
  757. c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}] = connection
  758. }
  759. c.connectLastAttempt[dst] = time.Now()
  760. }
  761. func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) {
  762. if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil {
  763. return actualDst, nil
  764. }
  765. for _, lb := range c.lbConntracks {
  766. if actualDst := lb.GetActualDestination(src, dst); actualDst != nil {
  767. return actualDst, nil
  768. }
  769. }
  770. actualDst := c.hostConntrack.GetActualDestination(src, dst)
  771. if actualDst != nil {
  772. return actualDst, nil
  773. }
  774. if !p.isHostNs() {
  775. if c.nsConntrack == nil {
  776. netNs, err := proc.GetNetNs(p.Pid)
  777. if err != nil {
  778. return nil, err
  779. }
  780. defer netNs.Close()
  781. c.nsConntrack, err = NewConntrack(netNs)
  782. if err != nil {
  783. return nil, err
  784. }
  785. }
  786. return c.nsConntrack.GetActualDestination(src, dst), nil
  787. }
  788. return nil, nil
  789. }
  790. func (c *Container) onConnectionClose(e ebpftracer.Event) {
  791. c.lock.Lock()
  792. conn := c.connectionsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  793. c.lock.Unlock()
  794. if conn != nil {
  795. if conn.Closed.IsZero() {
  796. if e.TrafficStats != nil {
  797. c.lock.Lock()
  798. c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived, e.FirstReadTime, e.FirstWriteTime, e.NewReadTime)
  799. c.lock.Unlock()
  800. }
  801. conn.Closed = time.Now()
  802. }
  803. }
  804. }
  805. func (c *Container) onAcceptClose(e ebpftracer.Event) {
  806. c.lock.Lock()
  807. conn := c.acceptsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
  808. c.lock.Unlock()
  809. if conn != nil {
  810. if conn.Closed.IsZero() {
  811. if e.TrafficStats != nil {
  812. c.lock.Lock()
  813. c.updateAcceptTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived)
  814. c.lock.Unlock()
  815. }
  816. conn.Closed = time.Now()
  817. }
  818. }
  819. }
  820. func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) {
  821. if u == nil {
  822. return
  823. }
  824. c.lock.Lock()
  825. defer c.lock.Unlock()
  826. c.updateConnectionTrafficStats(c.connectionsByPidFd[PidFd{Pid: u.Pid, Fd: u.FD}], u.BytesSent, u.BytesReceived, 0, 0, 0)
  827. }
  828. func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, received, firstreadtime, firstwritetime, newreadtime uint64) {
  829. if ac == nil {
  830. return
  831. }
  832. key := AddrPair{src: ac.Dest, dst: ac.ActualDest}
  833. stats := c.connectsSuccessful[key]
  834. if stats == nil {
  835. stats = &ConnectionStats{}
  836. c.connectsSuccessful[key] = stats
  837. }
  838. if sent > ac.BytesSent {
  839. stats.BytesSent += sent - ac.BytesSent
  840. stats.PerBytesSent = sent - ac.BytesSent
  841. ac.PerBytesSent = sent - ac.BytesSent
  842. }
  843. if received > ac.BytesReceived {
  844. stats.BytesReceived += received - ac.BytesReceived
  845. stats.PerBytesReceived = received - ac.BytesReceived
  846. ac.PerBytesReceived = received - ac.BytesReceived
  847. }
  848. if firstreadtime != 0 && firstwritetime != 0 && newreadtime != 0 {
  849. stats.FirstReadTime = firstreadtime
  850. stats.FirstWriteTime = firstwritetime
  851. stats.NewReadTime = newreadtime
  852. ac.FirstReadTime = firstreadtime
  853. ac.FirstWriteTime = firstwritetime
  854. ac.NewReadTime = newreadtime
  855. }
  856. ac.BytesSent = sent
  857. ac.BytesReceived = received
  858. }
  859. func (c *Container) updateAcceptTrafficStats(ac *ActiveAccept, sent, received uint64) {
  860. if ac == nil {
  861. return
  862. }
  863. //klog.Infoln("TCP onConnectionClose5", ac.BytesSent, ac.BytesReceived, ac)
  864. key := AddrPair{src: ac.Src, dst: ac.Dest}
  865. stats := c.acceptsSuccessful[key]
  866. if stats == nil {
  867. stats = &AcceptStats{}
  868. c.acceptsSuccessful[key] = stats
  869. }
  870. if sent > ac.BytesSent {
  871. stats.BytesSent += sent - ac.BytesSent
  872. }
  873. if received > ac.BytesReceived {
  874. stats.BytesReceived += received - ac.BytesReceived
  875. }
  876. ac.BytesSent = sent
  877. ac.BytesReceived = received
  878. }
  879. func (c *Container) onDNSRequest(r *l7.RequestData) (map[netaddr.IP]string, string, string, uint32, []netaddr.IP) {
  880. status := r.Status.DNS()
  881. if status == "" {
  882. return nil, "", "", 0, nil
  883. }
  884. t, fqdn, ttl, ips := l7.ParseDns(r.Payload)
  885. if t == "" {
  886. return nil, "", "", 0, nil
  887. }
  888. if c.dnsStats.Requests == nil {
  889. dnsReq := L7Requests[l7.ProtocolDNS]
  890. c.dnsStats.Requests = prometheus.NewCounterVec(
  891. prometheus.CounterOpts{Name: dnsReq.Name, Help: dnsReq.Help},
  892. []string{"request_type", "domain", "status"},
  893. )
  894. }
  895. if m, _ := c.dnsStats.Requests.GetMetricWithLabelValues(t, fqdn, status); m != nil {
  896. m.Inc()
  897. }
  898. if r.Duration != 0 {
  899. if c.dnsStats.Latency == nil {
  900. dnsLatency := L7Latency[l7.ProtocolDNS]
  901. c.dnsStats.Latency = prometheus.NewHistogram(prometheus.HistogramOpts{Name: dnsLatency.Name, Help: dnsLatency.Help})
  902. }
  903. c.dnsStats.Latency.Observe(r.Duration.Seconds())
  904. }
  905. ip2fqdn := map[netaddr.IP]string{}
  906. if fqdn != "" {
  907. for _, ip := range ips {
  908. ip2fqdn[ip] = fqdn
  909. }
  910. }
  911. return ip2fqdn, t, fqdn, ttl, ips
  912. }
  913. func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  914. c.lock.Lock()
  915. defer c.lock.Unlock()
  916. if r.Protocol == l7.ProtocolDNS {
  917. //return c.onDNSRequest(r)
  918. }
  919. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  920. if conn == nil {
  921. return nil
  922. }
  923. if timestamp != 0 && conn.Timestamp != timestamp {
  924. return nil
  925. }
  926. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  927. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  928. switch r.Protocol {
  929. case l7.ProtocolHTTP:
  930. stats.observe(r.Status.Http(), "", r.Duration)
  931. method, path := l7.ParseHttp(r.Payload)
  932. trace.HttpRequest(method, path, r.Status, r.Duration)
  933. case l7.ProtocolHTTP2:
  934. if conn.http2Parser == nil {
  935. conn.http2Parser = l7.NewHttp2Parser()
  936. }
  937. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  938. for _, req := range requests {
  939. stats.observe(req.Status.Http(), "", req.Duration)
  940. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  941. }
  942. case l7.ProtocolPostgres:
  943. if r.Method != l7.MethodStatementClose {
  944. stats.observe(r.Status.String(), "", r.Duration)
  945. }
  946. if conn.postgresParser == nil {
  947. conn.postgresParser = l7.NewPostgresParser()
  948. }
  949. query := conn.postgresParser.Parse(r.Payload)
  950. trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  951. case l7.ProtocolMysql:
  952. if r.Method != l7.MethodStatementClose {
  953. stats.observe(r.Status.String(), "", r.Duration)
  954. }
  955. if conn.mysqlParser == nil {
  956. conn.mysqlParser = l7.NewMysqlParser()
  957. }
  958. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  959. trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  960. case l7.ProtocolMemcached:
  961. stats.observe(r.Status.String(), "", r.Duration)
  962. cmd, items := l7.ParseMemcached(r.Payload)
  963. trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  964. case l7.ProtocolRedis:
  965. stats.observe(r.Status.String(), "", r.Duration)
  966. cmd, args := l7.ParseRedis(r.Payload)
  967. trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  968. case l7.ProtocolMongo:
  969. stats.observe(r.Status.String(), "", r.Duration)
  970. query := l7.ParseMongo(r.Payload)
  971. trace.MongoQuery(query, r.Status.Error(), r.Duration)
  972. case l7.ProtocolKafka, l7.ProtocolCassandra:
  973. stats.observe(r.Status.String(), "", r.Duration)
  974. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  975. stats.observe(r.Status.String(), r.Method.String(), 0)
  976. case l7.ProtocolDubbo2:
  977. stats.observe(r.Status.String(), "", r.Duration)
  978. }
  979. return nil
  980. }
  981. func (c *Container) onRetransmission(srcDst AddrPair) bool {
  982. c.lock.Lock()
  983. defer c.lock.Unlock()
  984. conn, ok := c.connectionsActive[srcDst]
  985. if !ok {
  986. return false
  987. }
  988. key := AddrPair{src: srcDst.dst, dst: conn.ActualDest}
  989. stats := c.connectsSuccessful[key]
  990. if stats == nil {
  991. stats = &ConnectionStats{}
  992. c.connectsSuccessful[key] = stats
  993. }
  994. stats.Retransmissions++
  995. conn.Retransmissions++
  996. return true
  997. }
  998. func (c *Container) updateDelays() {
  999. c.delaysLock.Lock()
  1000. defer c.delaysLock.Unlock()
  1001. for pid := range c.processes {
  1002. stats, err := TaskstatsTGID(pid)
  1003. if err != nil {
  1004. continue
  1005. }
  1006. d := c.delaysByPid[pid]
  1007. c.delays.cpu += stats.CPUDelay - d.cpu
  1008. c.delays.disk += stats.BlockIODelay - d.disk
  1009. d.cpu = stats.CPUDelay
  1010. d.disk = stats.BlockIODelay
  1011. c.delaysByPid[pid] = d
  1012. }
  1013. }
  1014. func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
  1015. if len(c.mounts) == 0 {
  1016. return nil
  1017. }
  1018. res := map[string]map[string]*proc.FSStat{}
  1019. for _, mi := range c.mounts {
  1020. var stat *proc.FSStat
  1021. for pid := range c.processes {
  1022. s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
  1023. if err == nil {
  1024. stat = &s
  1025. break
  1026. }
  1027. }
  1028. if stat == nil {
  1029. continue
  1030. }
  1031. if _, ok := res[mi.MajorMinor]; !ok {
  1032. res[mi.MajorMinor] = map[string]*proc.FSStat{}
  1033. }
  1034. res[mi.MajorMinor][mi.MountPoint] = stat
  1035. }
  1036. return res
  1037. }
  1038. func (c *Container) getListens() map[netaddr.IPPort]int {
  1039. res := map[netaddr.IPPort]int{}
  1040. for addr, byPid := range c.listens {
  1041. open := 0
  1042. isHostNs := false
  1043. ips := map[netaddr.IP]bool{}
  1044. for pid, details := range byPid {
  1045. p := c.processes[pid]
  1046. if p == nil {
  1047. continue
  1048. }
  1049. if p.isHostNs() {
  1050. isHostNs = true
  1051. }
  1052. if details.ClosedAt.IsZero() {
  1053. open = 1
  1054. }
  1055. for _, ip := range details.NsIPs {
  1056. ips[ip] = true
  1057. }
  1058. }
  1059. if !addr.IP().IsUnspecified() {
  1060. ips = map[netaddr.IP]bool{addr.IP(): true}
  1061. }
  1062. for ip := range ips {
  1063. if ip.IsLoopback() && !isHostNs {
  1064. continue
  1065. }
  1066. res[netaddr.IPPortFrom(ip, addr.Port())] = open
  1067. }
  1068. }
  1069. return res
  1070. }
  1071. func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {
  1072. if len(c.metadata.hostListens) == 0 {
  1073. return nil
  1074. }
  1075. hasUnspecified := false
  1076. for _, addrs := range c.metadata.hostListens {
  1077. for _, addr := range addrs {
  1078. if addr.IP().IsUnspecified() {
  1079. hasUnspecified = true
  1080. break
  1081. }
  1082. }
  1083. }
  1084. var hostIps []netaddr.IP
  1085. if hasUnspecified {
  1086. if ns, err := proc.GetHostNetNs(); err != nil {
  1087. klog.Warningln(err)
  1088. } else {
  1089. ips, err := proc.GetNsIps(ns)
  1090. _ = ns.Close()
  1091. if err != nil {
  1092. klog.Warningln(err)
  1093. } else {
  1094. hostIps = ips
  1095. }
  1096. }
  1097. }
  1098. res := map[string]map[netaddr.IPPort]struct{}{}
  1099. for proxy, addrs := range c.metadata.hostListens {
  1100. res[proxy] = map[netaddr.IPPort]struct{}{}
  1101. for _, addr := range addrs {
  1102. if addr.IP().IsUnspecified() {
  1103. for _, ip := range hostIps {
  1104. if addr.IP().Is4() && ip.Is4() || addr.IP().Is6() && ip.Is6() {
  1105. res[proxy][netaddr.IPPortFrom(ip, addr.Port())] = struct{}{}
  1106. }
  1107. }
  1108. } else {
  1109. res[proxy][addr] = struct{}{}
  1110. }
  1111. }
  1112. }
  1113. return res
  1114. }
  1115. func (c *Container) ping() map[netaddr.IP]float64 {
  1116. netNs := netns.None()
  1117. for pid := range c.processes {
  1118. if pid == agentPid {
  1119. netNs = selfNetNs
  1120. break
  1121. }
  1122. ns, err := proc.GetNetNs(pid)
  1123. if err != nil {
  1124. if !common.IsNotExist(err) {
  1125. klog.Warningln(err)
  1126. }
  1127. continue
  1128. }
  1129. netNs = ns
  1130. defer netNs.Close()
  1131. break
  1132. }
  1133. if !netNs.IsOpen() {
  1134. return nil
  1135. }
  1136. ips := map[netaddr.IP]struct{}{}
  1137. for d := range c.connectsSuccessful {
  1138. ips[d.dst.IP()] = struct{}{}
  1139. }
  1140. for dst := range c.connectsFailed {
  1141. ips[dst.src.IP()] = struct{}{}
  1142. }
  1143. if len(ips) == 0 {
  1144. return nil
  1145. }
  1146. targets := make([]netaddr.IP, 0, len(ips))
  1147. for ip := range ips {
  1148. if ip.IsLoopback() {
  1149. continue
  1150. }
  1151. if !ip.Is4() { // pinger doesn't support IPv6 yet
  1152. continue
  1153. }
  1154. targets = append(targets, ip)
  1155. }
  1156. rtt, err := pinger.Ping(netNs, selfNetNs, targets, pingTimeout)
  1157. if err != nil {
  1158. klog.Warningln(err)
  1159. return nil
  1160. }
  1161. return rtt
  1162. }
  1163. func (c *Container) runLogParser(logPath string) {
  1164. if *flags.DisableLogParsing {
  1165. return
  1166. }
  1167. containerId := string(c.id)
  1168. if logPath != "" {
  1169. if c.logParsers[logPath] != nil {
  1170. return
  1171. }
  1172. ch := make(chan logparser.LogEntry)
  1173. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1174. reader, err := logs.NewTailReader(proc.HostPath(logPath), ch)
  1175. if err != nil {
  1176. klog.Warningln(err)
  1177. parser.Stop()
  1178. return
  1179. }
  1180. klog.Debugln("started varlog logparser", "cg", c.cgroup.Id, "log", logPath)
  1181. c.logParsers[logPath] = &LogParser{parser: parser, stop: reader.Stop}
  1182. return
  1183. }
  1184. switch c.cgroup.ContainerType {
  1185. case cgroup.ContainerTypeSystemdService:
  1186. ch := make(chan logparser.LogEntry)
  1187. if err := JournaldSubscribe(c.cgroup, ch); err != nil {
  1188. klog.Warningln(err)
  1189. return
  1190. }
  1191. parser := logparser.NewParser(ch, nil, logs.OtelLogEmitter(containerId))
  1192. stop := func() {
  1193. JournaldUnsubscribe(c.cgroup)
  1194. }
  1195. klog.Infoln("started journald logparser", "cg", c.cgroup.Id)
  1196. c.logParsers["journald"] = &LogParser{parser: parser, stop: stop}
  1197. case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd, cgroup.ContainerTypeCrio:
  1198. if c.metadata.logPath == "" {
  1199. return
  1200. }
  1201. if parser := c.logParsers["stdout/stderr"]; parser != nil {
  1202. parser.Stop()
  1203. delete(c.logParsers, "stdout/stderr")
  1204. }
  1205. ch := make(chan logparser.LogEntry)
  1206. parser := logparser.NewParser(ch, c.metadata.logDecoder, logs.OtelLogEmitter(containerId))
  1207. reader, err := logs.NewTailReader(proc.HostPath(c.metadata.logPath), ch)
  1208. if err != nil {
  1209. klog.Warningln(err)
  1210. parser.Stop()
  1211. return
  1212. }
  1213. klog.Infoln("started container logparser", "cg", c.cgroup.Id)
  1214. c.logParsers["stdout/stderr"] = &LogParser{parser: parser, stop: reader.Stop}
  1215. }
  1216. }
  1217. func (c *Container) gc(now time.Time) {
  1218. // c.lock.Lock()
  1219. // defer c.lock.Unlock()
  1220. established := map[AddrPair]struct{}{}
  1221. establishedDst := map[netaddr.IPPort]struct{}{}
  1222. listens := map[netaddr.IPPort]string{}
  1223. seenNamespaces := map[string]bool{}
  1224. fdMap := map[uint64]struct{}{}
  1225. for _, p := range c.processes {
  1226. if seenNamespaces[p.NetNsId()] {
  1227. continue
  1228. }
  1229. sockets, err := proc.GetSockets(p.Pid)
  1230. if err != nil {
  1231. continue
  1232. }
  1233. fds, err := proc.ReadFds(p.Pid)
  1234. if err == nil {
  1235. for _, fd := range fds {
  1236. fdMap[fd.Fd] = struct{}{}
  1237. }
  1238. }
  1239. for _, s := range sockets {
  1240. if s.Listen {
  1241. listens[s.SAddr] = s.Inode
  1242. } else {
  1243. established[AddrPair{src: s.SAddr, dst: s.DAddr}] = struct{}{}
  1244. establishedDst[s.DAddr] = struct{}{}
  1245. }
  1246. }
  1247. seenNamespaces[p.NetNsId()] = true
  1248. }
  1249. c.revalidateListens(now, listens)
  1250. for srcDst, conn := range c.connectionsActive {
  1251. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1252. if _, ok := established[srcDst]; !ok {
  1253. delete(c.connectionsActive, srcDst)
  1254. if conn == c.connectionsByPidFd[pidFd] {
  1255. delete(c.connectionsByPidFd, pidFd)
  1256. }
  1257. continue
  1258. }
  1259. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1260. delete(c.connectionsActive, srcDst)
  1261. if conn == c.connectionsByPidFd[pidFd] {
  1262. delete(c.connectionsByPidFd, pidFd)
  1263. }
  1264. }
  1265. }
  1266. for srcDst, conn := range c.acceptsActive {
  1267. pidFd := PidFd{Pid: conn.Pid, Fd: conn.Fd}
  1268. if _, ok := established[srcDst]; !ok {
  1269. delete(c.acceptsActive, srcDst)
  1270. if conn == c.acceptsByPidFd[pidFd] {
  1271. delete(c.acceptsByPidFd, pidFd)
  1272. }
  1273. continue
  1274. }
  1275. if !conn.Closed.IsZero() && now.Sub(conn.Closed) > gcInterval {
  1276. delete(c.acceptsActive, srcDst)
  1277. if conn == c.acceptsByPidFd[pidFd] {
  1278. delete(c.acceptsByPidFd, pidFd)
  1279. }
  1280. }
  1281. }
  1282. for _, conn := range c.connectionsByPidFd {
  1283. if _, ok := fdMap[conn.Fd]; !ok {
  1284. delete(c.connectionsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1285. }
  1286. }
  1287. for _, conn := range c.acceptsByPidFd {
  1288. if _, ok := fdMap[conn.Fd]; !ok {
  1289. delete(c.acceptsByPidFd, PidFd{Pid: conn.Pid, Fd: conn.Fd})
  1290. }
  1291. }
  1292. for dst, at := range c.connectLastAttempt {
  1293. _, active := establishedDst[dst]
  1294. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1295. delete(c.connectLastAttempt, dst)
  1296. // delete(c.connectsFailed, dst)
  1297. for dfailed := range c.connectsFailed {
  1298. if dfailed.src == dst {
  1299. delete(c.connectsFailed, dfailed)
  1300. }
  1301. }
  1302. for d := range c.connectsSuccessful {
  1303. if d.src == dst {
  1304. delete(c.connectsSuccessful, d)
  1305. }
  1306. }
  1307. c.l7Stats.delete(dst)
  1308. }
  1309. }
  1310. for dst, at := range c.acceptLastAttempt {
  1311. _, active := establishedDst[dst]
  1312. if !active && !at.IsZero() && now.Sub(at) > gcInterval {
  1313. delete(c.acceptLastAttempt, dst)
  1314. for d := range c.acceptsSuccessful {
  1315. if d.src == dst {
  1316. delete(c.acceptsSuccessful, d)
  1317. }
  1318. }
  1319. c.l7Stats.delete(dst)
  1320. }
  1321. }
  1322. }
  1323. func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.IPPort]string) {
  1324. for addr, byPid := range c.listens {
  1325. if _, open := actualListens[addr]; open {
  1326. continue
  1327. }
  1328. klog.Warningln("deleting the outdated listen:", addr)
  1329. for _, details := range byPid {
  1330. if details.ClosedAt.IsZero() {
  1331. details.ClosedAt = now
  1332. }
  1333. }
  1334. }
  1335. missingListens := map[netaddr.IPPort]string{}
  1336. for addr, inode := range actualListens {
  1337. byPids, found := c.listens[addr]
  1338. if !found {
  1339. missingListens[addr] = inode
  1340. continue
  1341. }
  1342. open := false
  1343. for _, details := range byPids {
  1344. if details.ClosedAt.IsZero() {
  1345. open = true
  1346. break
  1347. }
  1348. }
  1349. if !open {
  1350. missingListens[addr] = inode
  1351. }
  1352. }
  1353. if len(missingListens) > 0 {
  1354. inodeToPid := map[string]uint32{}
  1355. for pid := range c.processes {
  1356. fds, err := proc.ReadFds(pid)
  1357. if err != nil {
  1358. klog.Warningln(err)
  1359. continue
  1360. }
  1361. for _, fd := range fds {
  1362. if fd.SocketInode != "" {
  1363. inodeToPid[fd.SocketInode] = pid
  1364. }
  1365. }
  1366. }
  1367. for addr, inode := range missingListens {
  1368. pid, found := inodeToPid[inode]
  1369. if !found {
  1370. continue
  1371. }
  1372. //klog.Warningln("missing listen found:", addr, pid)
  1373. c.onListenOpen(pid, addr, true)
  1374. }
  1375. }
  1376. for addr, pids := range c.listens {
  1377. for pid, details := range pids {
  1378. if !details.ClosedAt.IsZero() && now.Sub(details.ClosedAt) > gcInterval {
  1379. delete(c.listens[addr], pid)
  1380. }
  1381. }
  1382. if len(c.listens[addr]) == 0 {
  1383. delete(c.listens, addr)
  1384. }
  1385. }
  1386. }
  1387. func (c *Container) AttachUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1388. klog.Infoln("[attach] attachUprobes start")
  1389. if tracer.DisableL7Tracing() {
  1390. return nil
  1391. }
  1392. codeType := c.GetCodeTypeFromCache(pid)
  1393. if codeType.IsUnknownCode() {
  1394. return nil
  1395. }
  1396. var err error
  1397. switch codeType {
  1398. case CodeTypeJava:
  1399. err = c.attachJVMUprobes(tracer, pid)
  1400. case CodeTypeJavaAot:
  1401. err = c.attachJavaAotUprobes(tracer, pid)
  1402. case CodeTypeGo:
  1403. err = c.attachTlsUprobes(tracer, pid)
  1404. case CodeTypeNetCoreAot:
  1405. err = c.attachNetCoreUprobes(tracer, pid)
  1406. }
  1407. if err != nil {
  1408. c.DetachUprobes(tracer, pid, APP_UPROBE_ERROR)
  1409. return err
  1410. }
  1411. return nil
  1412. }
  1413. func (c *Container) GetCodeTypeFromCache(pid uint32) CodeType {
  1414. p := c.processes[pid]
  1415. if p == nil {
  1416. return CodeTypeUnknown
  1417. }
  1418. if p.codeType.IsWaitCheck() {
  1419. p.codeType = GetExeType(pid, c.getRootfs())
  1420. }
  1421. return p.codeType
  1422. }
  1423. func (c *Container) attachTlsUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1424. p := c.processes[pid]
  1425. if p == nil {
  1426. return nil
  1427. }
  1428. if p.openSslUprobesChecked || p.goTlsUprobesChecked {
  1429. return nil
  1430. }
  1431. if !p.openSslUprobesChecked {
  1432. p.openSslUprobesChecked = true
  1433. sslProbes, err := tracer.AttachOpenSslUprobes(pid)
  1434. if err != nil {
  1435. return err
  1436. }
  1437. p.uprobes = append(p.uprobes, sslProbes...)
  1438. }
  1439. if !p.goTlsUprobesChecked {
  1440. p.goTlsUprobesChecked = true
  1441. codeType := c.GetCodeTypeFromCache(pid)
  1442. goProbes, err := tracer.AttachGoTlsUprobes(pid, &c.AppInfo, uint16(codeType))
  1443. if err != nil {
  1444. return err
  1445. }
  1446. p.uprobes = append(p.uprobes, goProbes...)
  1447. c.l7AttachSuccess()
  1448. }
  1449. return nil
  1450. }
  1451. func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1452. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1453. return nil
  1454. }
  1455. p := c.processes[pid]
  1456. if p == nil {
  1457. return nil
  1458. }
  1459. codeType := c.GetCodeTypeFromCache(pid)
  1460. if !p.jvmAttachOnce {
  1461. p.jvmAttachOnce = true
  1462. rootfs := c.getRootfs()
  1463. // TODO java Aot
  1464. if codeType.IsJvmCode() {
  1465. // check version
  1466. libjavaso, err := utils.GetSoPath(pid, "libjava.so", rootfs)
  1467. if err != nil {
  1468. klog.WithError(err).Errorf("[attach] Failed get so path")
  1469. return err
  1470. }
  1471. v, err := ebpftracer.GetJvmVersion(libjavaso)
  1472. if err != nil {
  1473. klog.WithError(err).Errorf("[attach] Failed get Java version")
  1474. return err
  1475. }
  1476. c.AppInfo.Version = v
  1477. major, minor, patch, err := ebpftracer.ParseVersion(v)
  1478. klog.Infof("[attach] version: %s (Major: %d, Minor: %d, Patch: %d)", v, major, minor, patch)
  1479. if major != 1 || minor != 8 {
  1480. klog.Errorf("[attach] Unsupported Java version.")
  1481. return fmt.Errorf("[attach] Unsupported Java version")
  1482. }
  1483. }
  1484. /*libNioProbes, err := tracer.AttachJavaNioReadUprobes(pid, codeType, rootfs)
  1485. if err != nil {
  1486. klog.Error(err)
  1487. return err
  1488. }
  1489. p.uprobes = append(p.uprobes, libNioProbes...)*/
  1490. libNetProbes, err := tracer.AttachJavaNetWriteUprobes(pid, rootfs)
  1491. if err != nil {
  1492. klog.Error(err)
  1493. return err
  1494. }
  1495. p.uprobes = append(p.uprobes, libNetProbes...)
  1496. //p.jvmUprobesChecked = true
  1497. c.l7AttachSuccess()
  1498. err = tracer.InitKProcInfo(pid, &c.AppInfo)
  1499. if err != nil {
  1500. klog.Errorf("[attach] InitKProcInfo failed pid:[%d] ;%s.", err.Error(), pid)
  1501. } else {
  1502. klog.Infof("[attach] InitKProcInfo succeed! pid:[%d]", pid)
  1503. }
  1504. } else {
  1505. klog.Infof("[attach] %s-%d already attach status:%v", codeType.String(), pid, c.Isl7AttachSuccess())
  1506. }
  1507. return nil
  1508. }
  1509. func (c *Container) attachJavaAotUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1510. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1511. return nil
  1512. }
  1513. p := c.processes[pid]
  1514. if p == nil {
  1515. return nil
  1516. }
  1517. codeType := c.GetCodeTypeFromCache(pid)
  1518. if !p.jvmAttachOnce {
  1519. p.jvmAttachOnce = true
  1520. rootfs := c.getRootfs()
  1521. // // TODO java Aot
  1522. // if codeType.IsJavaAotCode() {
  1523. // // check version
  1524. // libjavaso, err := utils.GetSoPath(pid, "libjava.so", rootfs)
  1525. // if err != nil {
  1526. // klog.WithError(err).Errorf("[attach] Failed get so path")
  1527. // return err
  1528. // }
  1529. // v, err := ebpftracer.GetJvmVersion(libjavaso)
  1530. // if err != nil {
  1531. // klog.WithError(err).Errorf("[attach] Failed get Java version")
  1532. // return err
  1533. // }
  1534. // c.AppInfo.Version = v
  1535. // major, minor, patch, err := ebpftracer.ParseVersion(v)
  1536. // klog.Infof("[attach] version: %s (Major: %d, Minor: %d, Patch: %d)", v, major, minor, patch)
  1537. // if major != 1 || minor != 8 {
  1538. // return fmt.Errorf("[attach] Unsupported Java version")
  1539. // }
  1540. // }
  1541. libNioProbes, err := tracer.AttachJavaAotNioReadUprobes(pid, codeType, rootfs)
  1542. if err != nil {
  1543. klog.Error(err)
  1544. return err
  1545. }
  1546. p.uprobes = append(p.uprobes, libNioProbes...)
  1547. libNetProbes, err := tracer.AttachJavaAotNetWriteUprobes(pid, rootfs)
  1548. if err != nil {
  1549. klog.Error(err)
  1550. return err
  1551. }
  1552. p.uprobes = append(p.uprobes, libNetProbes...)
  1553. //p.jvmUprobesChecked = true
  1554. c.l7AttachSuccess()
  1555. tracer.InitKProcInfo(pid, &c.AppInfo)
  1556. } else {
  1557. klog.Infof("[attach] %s-%d already attach", codeType.String(), pid)
  1558. }
  1559. return nil
  1560. }
  1561. func (c *Container) errorClose(pid uint32, closeType int) {
  1562. p := c.processes[pid]
  1563. if p != nil {
  1564. p.DynamicClose(closeType)
  1565. }
  1566. }
  1567. func (c *Container) attachNetCoreUprobes(tracer *ebpftracer.Tracer, pid uint32) error {
  1568. if common.IsOpenFilter() && !common.IsFilterPid(pid) {
  1569. return nil
  1570. }
  1571. p := c.processes[pid]
  1572. if p == nil {
  1573. return nil
  1574. }
  1575. if !p.jvmAttachOnce {
  1576. p.jvmAttachOnce = true
  1577. //codeType := c.GetCodeTypeFromCache(pid)
  1578. tracer.InitKProcInfo(pid, &c.AppInfo)
  1579. p.uprobes = append(p.uprobes, tracer.AttachNetCoreNetThreadUprobes(pid)...)
  1580. readProbes, err := tracer.AttachNetCoreNetReadUprobes(pid)
  1581. if err != nil {
  1582. klog.Error(err)
  1583. return err
  1584. }
  1585. p.uprobes = append(p.uprobes, readProbes...)
  1586. WriteProbes, err := tracer.AttachNetCoreNetWriteUprobes(pid)
  1587. if err != nil {
  1588. klog.Error(err)
  1589. return err
  1590. }
  1591. p.uprobes = append(p.uprobes, WriteProbes...)
  1592. c.l7AttachSuccess()
  1593. }
  1594. return nil
  1595. }
  1596. func resolveFd(pid uint32, fd uint64) (mntId string, logPath string) {
  1597. info := proc.GetFdInfo(pid, fd)
  1598. if info == nil {
  1599. return
  1600. }
  1601. switch {
  1602. case info.Flags&os.O_WRONLY == 0 && info.Flags&os.O_RDWR == 0,
  1603. !strings.HasPrefix(info.Dest, "/"),
  1604. strings.HasPrefix(info.Dest, "/proc/"),
  1605. strings.HasPrefix(info.Dest, "/dev/"),
  1606. strings.HasPrefix(info.Dest, "/sys/"),
  1607. strings.HasSuffix(info.Dest, "(deleted)"):
  1608. return
  1609. }
  1610. mntId = info.MntId
  1611. if info.Flags&os.O_WRONLY != 0 && strings.HasPrefix(info.Dest, "/var/log/") &&
  1612. !strings.HasPrefix(info.Dest, "/var/log/pods/") &&
  1613. !strings.HasPrefix(info.Dest, "/var/log/containers/") &&
  1614. !strings.HasPrefix(info.Dest, "/var/log/journal/") {
  1615. logPath = info.Dest
  1616. }
  1617. return
  1618. }
  1619. func counter(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1620. return prometheus.MustNewConstMetric(desc, prometheus.CounterValue, value, labelValues...)
  1621. }
  1622. func gauge(desc *prometheus.Desc, value float64, labelValues ...string) prometheus.Metric {
  1623. return prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, value, labelValues...)
  1624. }