container_apm.go 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129
  1. package containers
  2. import (
  3. "bufio"
  4. "bytes"
  5. "debug/elf"
  6. "fmt"
  7. "net"
  8. "os"
  9. "path"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/cilium/ebpf/link"
  15. "github.com/coroot/coroot-node-agent/flags"
  16. "github.com/coroot/coroot-node-agent/ebpftracer"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  18. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  19. "github.com/coroot/coroot-node-agent/proc"
  20. "github.com/coroot/coroot-node-agent/tracing"
  21. "github.com/coroot/coroot-node-agent/utils"
  22. . "github.com/coroot/coroot-node-agent/utils/modelse"
  23. "github.com/pkg/errors"
  24. klog "github.com/sirupsen/logrus"
  25. semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
  26. "inet.af/netaddr"
  27. )
  28. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  29. trace, ok := c.traceMap[traceId]
  30. return trace, ok
  31. }
  32. func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
  33. c.traceMap[traceId] = trace
  34. }
  35. // 查询或创建trace信息
  36. func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
  37. trace, ok := c.getTrace(traceId)
  38. if !ok {
  39. //new trace
  40. trace = tracing.NewTraceFromEvent(string(c.id))
  41. //create TraceMap
  42. c.createTraceMap(traceId, trace)
  43. //create ParentSpan
  44. trace.CreateRootSpan(traceId)
  45. }
  46. return trace, nil
  47. }
  48. // getGrpcServerNetworkInfo 获取 gRPC server 的网络信息
  49. // 返回: IP地址, 端口号, 容器ID
  50. func (c *Container) getGrpcServerNetworkInfo() (string, uint16, string) {
  51. containerID := ""
  52. if c.cgroup != nil {
  53. containerID = c.cgroup.ContainerId
  54. }
  55. ipAddr := ""
  56. ifaces, err := net.Interfaces()
  57. if err == nil {
  58. for _, iface := range ifaces {
  59. if iface.Name == "eth0" {
  60. addrs, err := iface.Addrs()
  61. if err == nil {
  62. for _, addr := range addrs {
  63. var ipnet *net.IPNet
  64. switch v := addr.(type) {
  65. case *net.IPNet:
  66. ipnet = v
  67. case *net.IPAddr:
  68. ipnet = &net.IPNet{IP: v.IP, Mask: v.IP.DefaultMask()}
  69. }
  70. if ipnet != nil && ipnet.IP.To4() != nil {
  71. ipAddr = ipnet.IP.String()
  72. break
  73. }
  74. }
  75. }
  76. break
  77. }
  78. }
  79. }
  80. klog.Debugf("grpc server ip %s", ipAddr)
  81. // 本地端口尝试从AppInfo.Sport获取
  82. port := c.AppInfo.Sport
  83. klog.Debugf("grpc server port %d", port)
  84. return ipAddr, uint16(port), containerID
  85. }
  86. // Deprecated: InitTrace not used
  87. //func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  88. // method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  89. // ip, err := netaddr.ParseIP(hostIp)
  90. // if err != nil {
  91. // //fmt.Println("host ip error")
  92. // hostIp = "127.0.0.1"
  93. // }
  94. // addr := netaddr.IPPortFrom(ip, port)
  95. // trace := tracing.NewTrace(string(c.id), addr)
  96. // if trace == nil {
  97. // return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  98. // }
  99. // c.traceMap[traceId] = trace
  100. //
  101. // trace.TraceStart(method, path, r.Status, r.Duration)
  102. // return nil
  103. //}
  104. // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
  105. // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
  106. func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
  107. if t.AllEventReady(traceID) {
  108. t.SendEvent()
  109. klog.Debugf("SendEvent %d", traceID)
  110. //fmt.Println(t.GetSpan())
  111. //fmt.Println("===============")
  112. delete(c.traceMap, traceID)
  113. }
  114. }
  115. func (c *Container) valuableTrace(traceID uint64) bool {
  116. return traceID != 0
  117. }
  118. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  119. c.lock.Lock()
  120. defer c.lock.Unlock()
  121. if r.Protocol == l7.ProtocolDNS {
  122. ip2fqdn, _type, fqdn, ttl, ips := c.onDNSRequest(r)
  123. if c.l7Attach && c.valuableTrace(r.TraceId) {
  124. apmTrace, err := c.getOrInitTrace(r.TraceId)
  125. if err == nil {
  126. apmTrace.DNSTraceQueryEvent(r, _type, fqdn, ttl, ips)
  127. c.SendEvent(apmTrace, r.TraceId)
  128. }
  129. }
  130. return ip2fqdn
  131. }
  132. //if !c.valuableTrace(r.TraceId) {
  133. // return nil
  134. //}
  135. //klog.Infof("====ProtocolTrace+++++ start==== %d %d", pid, r.TraceId)
  136. // klog.Infof("====ProtocolTrace===== start==== %d %d", r.Protocol == l7.ProtocolTrace, c.l7Attach)
  137. if c.l7Attach && c.valuableTrace(r.TraceId) {
  138. // klog.Infof("====ProtocolTrace---- start==== %d %d", pid, r.TraceId)
  139. if r.TraceStart == TRACE_STATUS {
  140. // klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId)
  141. trace, err := c.getOrInitTrace(r.TraceId)
  142. if c.AppInfo.AppName != "" {
  143. klog.Debugf("->>> [%s] -> payload:[%s]", c.AppInfo.AppName, r.Payload)
  144. }
  145. if err == nil {
  146. switch r.Protocol {
  147. case l7.ProtocolHTTP:
  148. method, requestURI, sn, sport, userAgent := l7.ParseHttpHostWithUserAgent(r.Payload, r.IsTls)
  149. // userAgent 可以在这里使用,例如传递给 trace.TraceStartEvent
  150. ip, _ := netaddr.ParseIP(sn)
  151. //codeType := c.GetCodeTypeFromCache(pid)
  152. container_id := ""
  153. if c.cgroup != nil {
  154. container_id = c.cgroup.ContainerId
  155. }
  156. trace.TraceStartEvent(method, requestURI, sn, userAgent, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo(), container_id)
  157. c.SendEvent(trace, r.TraceId)
  158. case l7.ProtocolGrpc:
  159. // gRPC
  160. ipAddr, port, containerID := c.getGrpcServerNetworkInfo()
  161. trace.GrpcServerTraceStartEvent(ipAddr, port, r, c.GetAppInfo(), containerID)
  162. c.SendEvent(trace, r.TraceId)
  163. case l7.ProtocolKafka:
  164. var sn string
  165. var sport uint16
  166. var container_id string
  167. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  168. if conn != nil {
  169. sn = conn.ActualDest.IP().String()
  170. sport = conn.ActualDest.Port()
  171. }
  172. if c.cgroup != nil {
  173. container_id = c.cgroup.ContainerId
  174. }
  175. // MQ
  176. trace.MQConsumerTraceStartEvent(sn, sport, r, c.GetAppInfo(), container_id)
  177. c.SendEvent(trace, r.TraceId)
  178. }
  179. }
  180. return nil
  181. }
  182. if r.TraceEnd == TRACE_STATUS {
  183. klog.Debugf("====ProtocolTrace end==== %d %d", pid, r.TraceId)
  184. trace, err := c.getOrInitTrace(r.TraceId)
  185. if err == nil {
  186. trace.TraceEndEvent(r)
  187. c.SendEvent(trace, r.TraceId)
  188. }
  189. return nil
  190. }
  191. }
  192. /**
  193. * HTTP
  194. */
  195. if r.Protocol == l7.ProtocolHTTP {
  196. if c.l7Attach && c.valuableTrace(r.TraceId) {
  197. // 检查是否启用 Elasticsearch 检测
  198. if *flags.EnableElasticsearchDetection {
  199. // 解析 User-Agent 以检测 Elasticsearch 请求
  200. method, requestURI, sn, sport, userAgent := l7.ParseHttpHostWithUserAgent(r.Payload, r.IsTls)
  201. // 检查是否是 Elasticsearch 请求(通过 User-Agent)
  202. isElasticsearch := strings.Contains(strings.ToLower(userAgent), "elasticsearch")
  203. apmTrace, err := c.getOrInitTrace(r.TraceId)
  204. //fmt.Println("ProtocolHTTP-----", r.TraceId, err)
  205. if err == nil {
  206. if isElasticsearch {
  207. r.Protocol = l7.ProtocolES
  208. // Elasticsearch 请求,按照 NoSQL 方式处理
  209. query := l7.ParseElasticsearch(r.Payload)
  210. if c.AppInfo.AppName != "" {
  211. klog.Debugf("[%s] ->>>>> Elasticsearch -> %s:%d Query:[%s]", c.AppInfo.AppName, sn, sport, query)
  212. }
  213. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  214. if conn == nil {
  215. conn = &ActiveConnection{
  216. Dest: r.ComponentDAddr,
  217. ActualDest: r.ComponentDAddr,
  218. Timestamp: timestamp,
  219. }
  220. }
  221. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemElasticsearch, method, query, r, conn.Src, conn.ActualDest)
  222. } else {
  223. // 普通 HTTP 请求
  224. apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
  225. }
  226. c.SendEvent(apmTrace, r.TraceId)
  227. }
  228. } else {
  229. // Elasticsearch 检测未启用,使用普通 HTTP 处理
  230. method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
  231. apmTrace, err := c.getOrInitTrace(r.TraceId)
  232. if err == nil {
  233. apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
  234. c.SendEvent(apmTrace, r.TraceId)
  235. }
  236. }
  237. }
  238. //return nil
  239. }
  240. /**
  241. * gRPC
  242. */
  243. if r.Protocol == l7.ProtocolGrpc {
  244. klog.Infoln("conn == nil r.Protocol == l7.ProtocolGrpc")
  245. klog.Infoln("enter the l7.ProtocolGrpc")
  246. if c.l7Attach && c.valuableTrace(r.TraceId) {
  247. apmTrace, err := c.getOrInitTrace(r.TraceId)
  248. if err == nil {
  249. apmTrace.GrpcClientTraceQueryEvent(r)
  250. c.SendEvent(apmTrace, r.TraceId)
  251. }
  252. }
  253. }
  254. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  255. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  256. if conn == nil {
  257. conn = &ActiveConnection{
  258. Dest: r.ComponentDAddr,
  259. ActualDest: r.ComponentDAddr,
  260. Timestamp: timestamp,
  261. }
  262. //return nil
  263. }
  264. if timestamp != 0 && conn.Timestamp != timestamp {
  265. //if r.Protocol == l7.ProtocolGrpc {
  266. // klog.Infoln("timestamp != 0 && conn.Timestamp != timestamp r.Protocol == l7.ProtocolGrpc")
  267. // klog.Infoln("enter the l7.ProtocolGrpc")
  268. // if c.l7Attach && c.valuableTrace(r.TraceId) {
  269. // apmTrace, err := c.getOrInitTrace(r.TraceId)
  270. // if err == nil {
  271. // apmTrace.GrpcClientTraceQueryEvent(r)
  272. // c.SendEvent(apmTrace, r.TraceId)
  273. // }
  274. // }
  275. //}
  276. return nil
  277. }
  278. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  279. //trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  280. switch r.Protocol {
  281. /**
  282. * HTTP
  283. */
  284. case l7.ProtocolHTTP:
  285. if c.AppInfo.AppName != "" {
  286. klog.Debugf("[%s] ->>>>> curl -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, r.Payload)
  287. }
  288. stats.observe(r.Status.Http(), "", r.Duration)
  289. /**
  290. * HTTP2
  291. */
  292. case l7.ProtocolHTTP2:
  293. if conn.http2Parser == nil {
  294. conn.http2Parser = l7.NewHttp2Parser()
  295. }
  296. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  297. for _, req := range requests {
  298. stats.observe(req.Status.Http(), "", req.Duration)
  299. //trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  300. }
  301. /**
  302. * PostgreSQL
  303. */
  304. case l7.ProtocolPostgres:
  305. if r.Method != l7.MethodStatementClose {
  306. stats.observe(r.Status.String(), "", r.Duration)
  307. }
  308. //if conn.postgresParser == nil {
  309. // conn.postgresParser = l7.NewPostgresParser()
  310. //}
  311. //query := conn.postgresParser.Parse(r.Payload)
  312. //trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  313. if c.l7Attach && c.valuableTrace(r.TraceId) {
  314. if conn.postgresParser == nil {
  315. conn.postgresParser = l7.NewPostgresParser()
  316. }
  317. query := conn.postgresParser.Parse(r.Payload)
  318. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  319. if c.AppInfo.AppName != "" {
  320. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  321. }
  322. //apmTrace, ok := c.getTrace(r.TraceId)
  323. apmTrace, err := c.getOrInitTrace(r.TraceId)
  324. //fmt.Println("mysql r.TraceId:", r.TraceId)
  325. //fmt.Println("ok:", ok)
  326. //fmt.Println("traceMap:", len(c.traceMap))
  327. if err == nil {
  328. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  329. //apmTrace.PostGreSqlTraceQueryEvent(query, r, conn.ActualDest)
  330. apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemPostgreSQL, query, r, conn.ActualDest)
  331. c.SendEvent(apmTrace, r.TraceId)
  332. }
  333. }
  334. /**
  335. * Mysql
  336. */
  337. case l7.ProtocolMysql:
  338. if r.Method != l7.MethodStatementClose {
  339. stats.observe(r.Status.String(), "", r.Duration)
  340. }
  341. if c.l7Attach && c.valuableTrace(r.TraceId) {
  342. if conn.mysqlParser == nil {
  343. conn.mysqlParser = l7.NewMysqlParser()
  344. }
  345. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  346. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  347. if c.AppInfo.AppName != "" {
  348. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  349. }
  350. //apmTrace, ok := c.getTrace(r.TraceId)
  351. apmTrace, err := c.getOrInitTrace(r.TraceId)
  352. //fmt.Println("mysql r.TraceId:", r.TraceId)
  353. //fmt.Println("ok:", ok)
  354. //fmt.Println("traceMap:", len(c.traceMap))
  355. if err == nil {
  356. dbSystem := semconv.DBSystemMySQL
  357. // 根据端口白名单确定协议类型
  358. l7Type := flags.GetProtocolByPort(uint16(conn.ActualDest.Port()))
  359. if l7Type == l7.ProtocolMariaDB {
  360. dbSystem = semconv.DBSystemMariaDB
  361. } else if l7Type == l7.ProtocolTiDB {
  362. dbSystem = semconv.DBSystemTiDB
  363. }
  364. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  365. //apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
  366. apmTrace.SQLTraceQueryEvent(l7Type, dbSystem, query, r, conn.ActualDest)
  367. c.SendEvent(apmTrace, r.TraceId)
  368. }
  369. }
  370. /**
  371. * DM (达梦数据库)
  372. */
  373. case l7.ProtocolDM:
  374. //统计dm的query次数
  375. stats.observe(r.Status.String(), "", r.Duration)
  376. //是否发送数据
  377. if c.l7Attach && c.valuableTrace(r.TraceId) {
  378. if conn.dmParser == nil {
  379. conn.dmParser = l7.NewDmParser()
  380. }
  381. query := conn.dmParser.Parse(r.Payload, r.StatementId)
  382. if c.AppInfo.AppName != "" {
  383. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  384. }
  385. apmTrace, err := c.getOrInitTrace(r.TraceId)
  386. if err == nil {
  387. //apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest)
  388. apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemDaMengDB, query, r, conn.ActualDest)
  389. c.SendEvent(apmTrace, r.TraceId)
  390. }
  391. }
  392. /**
  393. * Memcached
  394. */
  395. case l7.ProtocolMemcached:
  396. stats.observe(r.Status.String(), "", r.Duration)
  397. if c.l7Attach && c.valuableTrace(r.TraceId) {
  398. cmd, items := l7.ParseMemcached(r.Payload)
  399. if c.AppInfo.AppName != "" {
  400. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd+" "+strings.Join(items, " "))
  401. }
  402. apmTrace, err := c.getOrInitTrace(r.TraceId)
  403. if err == nil {
  404. statement := cmd
  405. if len(items) == 1 {
  406. statement += " " + items[0]
  407. } else if len(items) > 1 {
  408. joined := fmt.Sprintf("[%s]", strings.Join(items, " "))
  409. statement += " " + joined
  410. }
  411. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMemcached, cmd, statement, r, conn.Src, conn.ActualDest)
  412. c.SendEvent(apmTrace, r.TraceId)
  413. }
  414. }
  415. /**
  416. * Redis
  417. */
  418. case l7.ProtocolRedis:
  419. stats.observe(r.Status.String(), "", r.Duration)
  420. if c.l7Attach && c.valuableTrace(r.TraceId) {
  421. cmd, args := l7.ParseRedis(r.Payload)
  422. //fmt.Println("cmd", cmd)
  423. //fmt.Println("args", args)
  424. //apmTrace, ok := c.getTrace(r.TraceId)
  425. if c.AppInfo.AppName != "" {
  426. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd)
  427. }
  428. apmTrace, err := c.getOrInitTrace(r.TraceId)
  429. if err == nil {
  430. statement := cmd
  431. if args != "" {
  432. statement += " " + args
  433. }
  434. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemRedis, cmd, statement, r, conn.Src, conn.ActualDest)
  435. c.SendEvent(apmTrace, r.TraceId)
  436. }
  437. }
  438. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  439. /**
  440. * gRPC
  441. */
  442. case l7.ProtocolGrpc:
  443. klog.Debugln("enter the l7.ProtocolGrpc")
  444. stats.observe(r.Status.String(), "", r.Duration)
  445. if c.l7Attach && c.valuableTrace(r.TraceId) {
  446. apmTrace, err := c.getOrInitTrace(r.TraceId)
  447. if err == nil {
  448. apmTrace.GrpcClientTraceQueryEvent(r)
  449. c.SendEvent(apmTrace, r.TraceId)
  450. }
  451. }
  452. /**
  453. * MongoDB
  454. */
  455. case l7.ProtocolMongo:
  456. stats.observe(r.Status.String(), "", r.Duration)
  457. if c.l7Attach && c.valuableTrace(r.TraceId) {
  458. query := l7.ParseMongo(r.Payload)
  459. if c.AppInfo.AppName != "" {
  460. klog.Debugf("[%s] ->>>>> MongoDB -> %s SQL:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
  461. }
  462. apmTrace, err := c.getOrInitTrace(r.TraceId)
  463. if err == nil {
  464. // MongoDB query 格式通常是 JSON,如 {"insert":"users"} 或 {"find":"users","filter":{...}}
  465. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMongoDB, "", query, r, conn.Src, conn.ActualDest)
  466. c.SendEvent(apmTrace, r.TraceId)
  467. }
  468. }
  469. /**
  470. * Cassandra
  471. */
  472. case l7.ProtocolCassandra:
  473. stats.observe(r.Status.String(), "", r.Duration)
  474. if c.l7Attach && c.valuableTrace(r.TraceId) {
  475. if conn.cassandraParser == nil {
  476. conn.cassandraParser = l7.NewCassandraParser()
  477. }
  478. var query string
  479. query = string(r.Payload)
  480. //query := conn.cassandraParser.Parse(r.Payload)
  481. if c.AppInfo.AppName != "" {
  482. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  483. }
  484. apmTrace, err := c.getOrInitTrace(r.TraceId)
  485. if err == nil {
  486. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemCassandra, "", query, r, conn.Src, conn.ActualDest)
  487. c.SendEvent(apmTrace, r.TraceId)
  488. }
  489. }
  490. /**
  491. * Kafka
  492. */
  493. case l7.ProtocolKafka:
  494. stats.observe(r.Status.String(), "", r.Duration)
  495. if c.l7Attach && c.valuableTrace(r.TraceId) {
  496. if c.AppInfo.AppName != "" {
  497. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, r.DestAddrString)
  498. }
  499. apmTrace, err := c.getOrInitTrace(r.TraceId)
  500. if err == nil {
  501. apmTrace.MQTraceQueryEvent(r.Protocol, semconv.MessagingKafkaClientID("kafka"), "", "", r, conn.Src, conn.ActualDest)
  502. c.SendEvent(apmTrace, r.TraceId)
  503. }
  504. }
  505. /**
  506. * RabbitMQ / NATS
  507. */
  508. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  509. stats.observe(r.Status.String(), r.Method.String(), 0)
  510. if c.l7Attach && c.valuableTrace(r.TraceId) {
  511. }
  512. /**
  513. * Dubbo2
  514. */
  515. case l7.ProtocolDubbo2:
  516. stats.observe(r.Status.String(), "", r.Duration)
  517. if c.l7Attach && c.valuableTrace(r.TraceId) {
  518. }
  519. }
  520. return nil
  521. }
  522. func (c *Container) buildIDs(pid uint32) bool {
  523. c.lock.Lock()
  524. defer c.lock.Unlock()
  525. p := c.processes[pid]
  526. if p != nil {
  527. p.cmdline = string(proc.GetRealCmdline(pid))
  528. }
  529. var sns []string
  530. var sport uint16
  531. for address, val := range c.getListens() {
  532. if val == 1 {
  533. ip := address.IP()
  534. if ip.Is4() && !ip.IsLoopback() {
  535. // 获取端口号
  536. sport = address.Port()
  537. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  538. ////c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  539. //c.AppInfo.Sn = ip.String()
  540. //c.AppInfo.Sport = int(port)
  541. //strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip.String(), port))
  542. //fmt.Println(port)
  543. ////os.Exit(1)
  544. //c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  545. //c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  546. ////c.AppInfo.InstanceId = c.instanceID.IntVal
  547. //strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid))))
  548. //c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  549. //c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  550. //return true
  551. }
  552. }
  553. }
  554. if len(sns) > 0 {
  555. //c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  556. snsStr := strings.Join(sns, ",")
  557. c.AppInfo.Sn = snsStr
  558. c.AppInfo.Sport = int(sport)
  559. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  560. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  561. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  562. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  563. // c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  564. // c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  565. return true
  566. }
  567. return false
  568. }
  569. func (c *Container) ReBuildIds(pid uint32) {
  570. c.lock.Lock()
  571. defer c.lock.Unlock()
  572. var sns []string
  573. var sport uint16
  574. for address, val := range c.getListens() {
  575. if val == 1 {
  576. ip := address.IP()
  577. if ip.Is4() && !ip.IsLoopback() {
  578. // 获取端口号
  579. sport = address.Port()
  580. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  581. }
  582. }
  583. }
  584. if len(sns) > 0 {
  585. snsStr := strings.Join(sns, ",")
  586. c.AppInfo.Sn = snsStr
  587. c.AppInfo.Sport = int(sport)
  588. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  589. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  590. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  591. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  592. strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", utils.GetHostIP(), c.AppInfo.AppIdHash.IntVal))
  593. c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  594. c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  595. }
  596. }
  597. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  598. c.lock.Lock()
  599. defer c.lock.Unlock()
  600. // get the associated uprobe
  601. uprobe, err := c.GetUprobe(event, tracer)
  602. if err != nil {
  603. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  604. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  605. return
  606. }
  607. if event.TraceId <= 0 {
  608. //fmt.Println("StackProcess TraceId id 0")
  609. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  610. return
  611. }
  612. // fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  613. stackFun := ebpftracer.StackFunEvent{}
  614. stackFun.Uprobe = &uprobe
  615. stackFun.StackEvent = event
  616. apmTrace, ok := c.getTrace(event.TraceId)
  617. if ok {
  618. apmTrace.FunAdd(stackFun)
  619. }
  620. }
  621. func byteExtractString(nameString [100]byte) string {
  622. n := bytes.IndexFunc(nameString[:], func(r rune) bool {
  623. return r == 0 || r < 32 || r > 126 // 截取到第一个零值或非打印字符
  624. })
  625. if n == -1 {
  626. n = len(nameString) // 没找到零值或非打印字符,使用数组长度
  627. }
  628. return string(nameString[:n])
  629. }
  630. func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  631. c.lock.Lock()
  632. defer c.lock.Unlock()
  633. // get the associated uprobe
  634. switch event.Location {
  635. case 0: // ret
  636. Funcname := ""
  637. if event.Type != uint64(CodeTypeJava) {
  638. uprobe, err := c.GetUprobe(event, tracer)
  639. if err != nil {
  640. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  641. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  642. return
  643. }
  644. Funcname = uprobe.Funcname
  645. } else {
  646. ClassName := byteExtractString(event.ClassName)
  647. MethedName := byteExtractString(event.MethedName)
  648. Funcname = ClassName + "." + MethedName
  649. }
  650. if event.TraceId <= 0 {
  651. //fmt.Println("StackProcess TraceId id 0")
  652. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  653. return
  654. }
  655. //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  656. apmTrace, err := c.getOrInitTrace(event.TraceId)
  657. if err == nil {
  658. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  659. duration := event.TimeNsEnd - event.TimeNsStart
  660. apmTrace.FuncTraceQuery(Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
  661. c.SendEvent(apmTrace, event.TraceId)
  662. }
  663. }
  664. }
  665. // ResolveAddress returns the symbol(s) and offset of the given address.
  666. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  667. if addr == 0 {
  668. // err = errors.Wrapf(SymbolNotFoundError, "0")
  669. return
  670. }
  671. // symbols, _, err := e.Symbols()
  672. if err != nil {
  673. return
  674. }
  675. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  676. if idx == 0 {
  677. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  678. return
  679. }
  680. // why diff symbol may contains the same addr?
  681. sym := symbols[idx-1]
  682. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  683. syms = append(syms, symbols[i])
  684. }
  685. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  686. syms = append(syms, symbols[i])
  687. }
  688. return syms, uint(addr - sym.Value), nil
  689. }
  690. type MemoryMap struct {
  691. Start, End uint64
  692. }
  693. // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
  694. func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
  695. file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
  696. if err != nil {
  697. return nil, err
  698. }
  699. defer file.Close()
  700. scanner := bufio.NewScanner(file)
  701. if scanner.Scan() {
  702. fields := strings.Fields(scanner.Text())
  703. addresses := strings.Split(fields[0], "-")
  704. if len(addresses) != 2 {
  705. return nil, errors.New("unexpected format in /proc/<pid>/maps")
  706. }
  707. start, err := strconv.ParseUint(addresses[0], 16, 64)
  708. if err != nil {
  709. return nil, err
  710. }
  711. end, err := strconv.ParseUint(addresses[1], 16, 64)
  712. if err != nil {
  713. return nil, err
  714. }
  715. return &MemoryMap{
  716. Start: start,
  717. End: end,
  718. }, nil
  719. }
  720. if err := scanner.Err(); err != nil {
  721. return nil, err
  722. }
  723. return nil, errors.New("empty /proc/<pid>/maps")
  724. }
  725. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  726. //fmt.Println("GetUprobe entory:")
  727. memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
  728. Address := event.Ip - memoryMap.Start
  729. // fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
  730. for _, fun := range c.UprobesMap {
  731. funAddress := fun.Address + fun.AbsOffset
  732. // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
  733. if funAddress == Address {
  734. // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x ---- %s--%x\n", memoryMap.Start, event.Ip, fun.Funcname, fun.Address)
  735. return fun, nil
  736. }
  737. }
  738. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  739. if err != nil {
  740. return
  741. }
  742. for _, sym := range syms {
  743. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  744. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
  745. if ok {
  746. return uprobe, nil
  747. }
  748. }
  749. err = errors.New("uprobe not found")
  750. return
  751. }
  752. func (c *Container) GetAppInfo() AppInfo {
  753. return c.AppInfo
  754. }
  755. // 可注入前置
  756. func (c *Container) checkEventReady() bool {
  757. c.lock.Lock()
  758. defer c.lock.Unlock()
  759. return c.l7EventReady
  760. }
  761. func (c *Container) eventReady() {
  762. c.lock.Lock()
  763. defer c.lock.Unlock()
  764. c.l7EventReady = true
  765. }
  766. // uprobe前置
  767. func (c *Container) Isl7AttachSuccess() bool {
  768. c.lock.Lock()
  769. defer c.lock.Unlock()
  770. return c.l7Attach
  771. }
  772. func (c *Container) l7AttachSuccess() {
  773. c.lock.Lock()
  774. defer c.lock.Unlock()
  775. c.l7Attach = true
  776. }
  777. func (c *Container) ctrlStack(r *Registry, pid uint32) {
  778. resp, err := c.GetCodeSetting(r)
  779. if err != nil {
  780. klog.WithField("pid", pid).WithError(err).Error("[ctrlStack] GetCodeSetting failed.")
  781. return
  782. }
  783. if resp.BlackWhiteSettings.CollectStack == OPEN_STACK {
  784. // 有黑白名单规则 &&
  785. // 之前有注入 先卸载再注入
  786. // 之前没注入 直接注入
  787. // 没有有黑白名单 直接卸载
  788. if c.hasStackRule(resp) {
  789. if c.stackRuleUpdate(resp) {
  790. // 重新注入
  791. err = c.DetachStack(pid, APP_UNINSTALL)
  792. if err != nil {
  793. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  794. }
  795. }
  796. klog.WithField("pid", pid).Infoln("[ctrlStack] Attach app stack.")
  797. c.saveWhiteStackSettingInfo(resp)
  798. err = c.AttachStack(r.tracer, pid)
  799. if err != nil {
  800. c.AppInfo.SetAppStackError()
  801. klog.WithField("pid", pid).WithError(err).Errorf("[ctrlStack][end] Failed attach stack trace!")
  802. }
  803. } else {
  804. if c.noOrigRule() {
  805. return
  806. }
  807. c.saveWhiteStackSettingInfo(resp)
  808. // 关闭堆栈
  809. err = c.DetachStack(pid, APP_UNINSTALL)
  810. if err != nil {
  811. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  812. }
  813. }
  814. } else {
  815. if c.noOrigRule() {
  816. return
  817. }
  818. c.saveWhiteStackSettingInfo(resp)
  819. // 关闭堆栈
  820. err = c.DetachStack(pid, APP_UNINSTALL)
  821. if err != nil {
  822. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  823. }
  824. }
  825. }
  826. func (c *Container) verifyAttachConditions(r *Registry, pid uint32) (bool, int) {
  827. p := c.processes[pid]
  828. if p != nil && c.checkEventReady() {
  829. codeType := c.GetCodeTypeFromCache(pid)
  830. if codeType.IsUnknownCode() {
  831. klog.WithField("pid", pid).Debug("[verify] unknown language.")
  832. return false, 0
  833. }
  834. cmdline := p.GetCmdline()
  835. if len(cmdline) == 0 {
  836. return false, 0
  837. }
  838. //whiteListByCode := r.getWhiteListByCodeType(codeType)
  839. whiteListByCode := r.getWhiteListAll()
  840. //klog.WithField("pid", pid).WithField("codeType", codeType.String()).
  841. // Infof("[verify] white list %v", utils.ToString(whiteListByCode))
  842. // 当前语言的白名单规则
  843. for _, setting := range whiteListByCode {
  844. ruleVal := setting.Filters
  845. if ruleVal == "" {
  846. continue
  847. }
  848. // 判断规则
  849. if strings.Contains(cmdline, ruleVal) {
  850. //if !codeType.IsJvmCode() {
  851. // klog.WithField("pid", pid).Warning("[verify] This agent version only supports JVM applications.")
  852. // return false, 0
  853. //}
  854. c.WhiteSettingInfo.AppName = setting.AppName
  855. c.WhiteSettingInfo.Filters = setting.Filters
  856. klog.WithField("pid", pid).
  857. WithField("codeType", codeType.String()).
  858. WithField("ruleVal", ruleVal).
  859. WithField("cmdline", cmdline).
  860. //WithField("stack", setting.OpenStack).
  861. WithField("white list", utils.ToString(whiteListByCode)).
  862. Infoln("[verify] check successful.")
  863. return true, 0
  864. }
  865. }
  866. }
  867. return false, 0
  868. }
  869. // 1.卸载入口
  870. func (c *Container) Detach(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) {
  871. c.lock.Lock()
  872. defer c.lock.Unlock()
  873. if p := c.processes[pid]; p != nil {
  874. err := c.DetachUprobes(tracer, pid, detachType)
  875. if err != nil {
  876. klog.WithError(err).Errorln("DetachUprobes Error.")
  877. }
  878. err = c.DetachStack(pid, detachType)
  879. if err != nil {
  880. klog.WithError(err).Errorln("DetachStack Error.")
  881. }
  882. // 关闭7层监控
  883. c.l7Attach = false
  884. // 变更应用状态
  885. if err != nil {
  886. detachType = detachType.Error()
  887. }
  888. c.AppInfo.SetAppStatus(detachType)
  889. }
  890. }
  891. // 1.1卸载uprobe
  892. func (c *Container) DetachUprobes(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) error {
  893. // close uprobe
  894. if p := c.processes[pid]; p != nil {
  895. for _, u := range p.uprobes {
  896. err := u.Close()
  897. if err != nil {
  898. return err
  899. }
  900. }
  901. p.uprobes = []link.Link{}
  902. switch detachType {
  903. case APP_UNINSTALL, APP_FUSE:
  904. codeType := c.GetCodeTypeFromCache(pid)
  905. switch codeType {
  906. case CodeTypeJava:
  907. p.jvmAttachOnce = false
  908. case CodeTypeGo:
  909. p.goTlsUprobesChecked = false
  910. p.openSslUprobesChecked = false
  911. default:
  912. }
  913. case APP_UPROBE_ERROR:
  914. klog.Infof("[DetachUprobes] ERROR_DETACH for pid %d", pid)
  915. default:
  916. }
  917. //delete the proc info form proc_info_map(for kernel) when the uprobe detached
  918. if err := tracer.DelKProcInfo(pid); err != nil {
  919. return fmt.Errorf("[DetachUprobes] failed to delete KProcInfo for pid %d, detach type is:%s", pid, detachType)
  920. } else {
  921. klog.Infof("[DetachUprobes] delete KProcInfo success for pid %d,detachType:%d", pid, detachType)
  922. c.AppInfo.EBPFProcInfo = nil
  923. }
  924. } else {
  925. return fmt.Errorf("[DetachUprobes] cannot find uprobe for pid %d", pid)
  926. }
  927. return nil
  928. }
  929. // 1.2卸载堆栈
  930. func (c *Container) DetachStack(pid uint32, detachType APP_TYPE) error {
  931. if p := c.processes[pid]; p != nil {
  932. var err error
  933. codeType := c.GetCodeTypeFromCache(pid)
  934. switch codeType {
  935. // 1.2.1 卸载 jvm堆栈
  936. case CodeTypeJava:
  937. err = c.detachJvmStack(pid)
  938. default:
  939. err = p.closeStackUprobes()
  940. }
  941. if err != nil {
  942. klog.WithError(err).Errorln("[detachStack] failed to detach stack")
  943. return err
  944. }
  945. p.stackAttachOnce = false
  946. } else {
  947. return fmt.Errorf("[DetachStack] cannot find uprobe for pid %d", pid)
  948. }
  949. return nil
  950. }
  951. // 1.2.1 卸载 jvm堆栈
  952. func (c *Container) detachJvmStack(pid uint32) error {
  953. if p := c.processes[pid]; p != nil {
  954. //if p.stackStatus.IsStackUprobesSuccess() || len(p.stackUprobes) > 0 {
  955. //}
  956. // 卸载 JavaAgent
  957. var err error
  958. if p.stackStatus.IsJattachSuccess() {
  959. // 卸载堆栈probes
  960. err = p.closeStackUprobes()
  961. if err != nil {
  962. klog.WithError(err).Errorf("[detachJvmStack] closeStackUprobes")
  963. }
  964. err = p.uninstallJavaAgent()
  965. if err != nil {
  966. klog.WithError(err).Errorf("[detachJvmStack] uninstallJavaAgent")
  967. }
  968. }
  969. return err
  970. }
  971. return nil
  972. }
  973. func (c *Container) getRootfs() string {
  974. if c.metadata != nil && c.metadata.rootfs != "" {
  975. return path.Join(*flags.HostDirPathPrefix, c.metadata.rootfs)
  976. }
  977. return ""
  978. }
  979. func (c *Container) BuildActiveApps(runtimeApps map[uint32]AppStatusInfo, pid uint32) {
  980. if c == nil {
  981. //klog.WithField("pid", pid).Warningln("[BuildActiveApps] container_apm is nil.")
  982. return
  983. }
  984. if c.AppInfo.AppName == "" {
  985. return
  986. }
  987. klog.WithField("pid", pid).WithField("appname", c.AppInfo.AppName).Infof("[BuildActiveApps] container %s is running.", c.AppInfo.AppName)
  988. detail := AppStatusInfo{
  989. Pid: pid,
  990. ProcName: c.containerName,
  991. AppName: c.AppInfo.AppName,
  992. Language: c.AppInfo.CodeType.String(),
  993. LanguageVersion: c.AppInfo.Version,
  994. AppID: c.AppInfo.AppIdHash.IntVal,
  995. AgentID: c.AppInfo.AgentId,
  996. InstanceID: c.AppInfo.InstanceIdHash.IntVal,
  997. Sn: c.AppInfo.Sn,
  998. Sport: c.AppInfo.Sport,
  999. RegisterAt: time.Unix(c.AppInfo.RegisterAt, 0).Format("060102 15:04:05"),
  1000. PreStatus: c.AppInfo.PreStatus,
  1001. Status: c.AppInfo.Status,
  1002. Rule: c.WhiteSettingInfo.Filters,
  1003. Container: string(c.id),
  1004. }
  1005. detail.Rule = fmt.Sprintf("%s|%d", c.WhiteSettingInfo.Filters, c.WhiteSettingInfo.WhiteStackSettingInfo.OpenStack)
  1006. if c.AppInfo.UpdateAt != 0 {
  1007. detail.UpdateAt = time.Unix(c.AppInfo.UpdateAt, 0).Format("060102 15:04:05")
  1008. }
  1009. p := c.processes[pid]
  1010. if p != nil {
  1011. detail.StackStatus = p.stackStatus.String()
  1012. v := 0
  1013. if !p.versionFailed {
  1014. v = 1
  1015. }
  1016. detail.StackStatus += fmt.Sprintf("V=%d", v)
  1017. }
  1018. runtimeApps[pid] = detail
  1019. }
  1020. func (c *Container) AgentCtrl(r *Registry, pid uint32) {
  1021. if c == nil {
  1022. //klog.WithField("pid", pid).Warningln("[AgentCtrl] cannot find container.")
  1023. return
  1024. }
  1025. var err error
  1026. verifyAttachConditions, _ := c.verifyAttachConditions(r, pid)
  1027. // fusing UNINSTALL
  1028. if r.isFusing && c.Isl7AttachSuccess() {
  1029. c.Detach(r.tracer, pid, APP_FUSE)
  1030. klog.WithField("pid", pid).Infoln("[AgentCtrl] fusing")
  1031. return
  1032. }
  1033. // verify UNINSTALL
  1034. if !verifyAttachConditions && c.Isl7AttachSuccess() {
  1035. c.Detach(r.tracer, pid, APP_UNINSTALL)
  1036. klog.WithField("pid", pid).Infoln("[AgentCtrl] rule uninstall.")
  1037. return
  1038. }
  1039. if verifyAttachConditions {
  1040. err = c.RegisterAppInfo(r, pid)
  1041. if err != nil {
  1042. klog.WithError(err).Errorf("[AgentCtrl] Failed registerAppInfo.")
  1043. return
  1044. }
  1045. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes.")
  1046. err = c.AttachUprobes(r.tracer, pid, "Agentctrl")
  1047. if err != nil {
  1048. klog.WithField("pid", pid).WithError(err).Errorf("[AgentCtrl] Failed attach uprobes error!")
  1049. return
  1050. } else {
  1051. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes success!")
  1052. }
  1053. // 堆栈控制
  1054. c.ctrlStack(r, pid)
  1055. }
  1056. }