container_apm.go 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204
  1. package containers
  2. import (
  3. "bufio"
  4. "bytes"
  5. "debug/elf"
  6. "fmt"
  7. "net"
  8. "os"
  9. "path"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/cilium/ebpf/link"
  15. "github.com/coroot/coroot-node-agent/flags"
  16. "github.com/coroot/coroot-node-agent/ebpftracer"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  18. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  19. "github.com/coroot/coroot-node-agent/proc"
  20. "github.com/coroot/coroot-node-agent/tracing"
  21. "github.com/coroot/coroot-node-agent/utils"
  22. . "github.com/coroot/coroot-node-agent/utils/modelse"
  23. "github.com/pkg/errors"
  24. klog "github.com/sirupsen/logrus"
  25. "go.opentelemetry.io/otel/attribute"
  26. semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
  27. "inet.af/netaddr"
  28. )
  29. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  30. trace, ok := c.traceMap[traceId]
  31. return trace, ok
  32. }
  33. func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
  34. c.traceMap[traceId] = trace
  35. }
  36. // 查询或创建trace信息
  37. func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
  38. trace, ok := c.getTrace(traceId)
  39. if !ok {
  40. //new trace
  41. trace = tracing.NewTraceFromEvent(string(c.id))
  42. if trace == nil {
  43. // tracer 未初始化,返回错误
  44. return nil, fmt.Errorf("tracer is not initialized, cannot create trace")
  45. }
  46. //create TraceMap
  47. c.createTraceMap(traceId, trace)
  48. //create ParentSpan
  49. trace.CreateRootSpan(traceId)
  50. }
  51. return trace, nil
  52. }
  53. // getGrpcServerNetworkInfo 获取 gRPC server 的网络信息
  54. // 返回: IP地址, 端口号, 容器ID
  55. func (c *Container) getGrpcServerNetworkInfo() (string, uint16, string) {
  56. containerID := ""
  57. if c.cgroup != nil {
  58. containerID = c.cgroup.ContainerId
  59. }
  60. ipAddr := ""
  61. ifaces, err := net.Interfaces()
  62. if err == nil {
  63. for _, iface := range ifaces {
  64. if iface.Name == "eth0" {
  65. addrs, err := iface.Addrs()
  66. if err == nil {
  67. for _, addr := range addrs {
  68. var ipnet *net.IPNet
  69. switch v := addr.(type) {
  70. case *net.IPNet:
  71. ipnet = v
  72. case *net.IPAddr:
  73. ipnet = &net.IPNet{IP: v.IP, Mask: v.IP.DefaultMask()}
  74. }
  75. if ipnet != nil && ipnet.IP.To4() != nil {
  76. ipAddr = ipnet.IP.String()
  77. break
  78. }
  79. }
  80. }
  81. break
  82. }
  83. }
  84. }
  85. klog.Debugf("grpc server ip %s", ipAddr)
  86. // 本地端口尝试从AppInfo.Sport获取
  87. port := c.AppInfo.Sport
  88. klog.Debugf("grpc server port %d", port)
  89. return ipAddr, uint16(port), containerID
  90. }
  91. // Deprecated: InitTrace not used
  92. //func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  93. // method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  94. // ip, err := netaddr.ParseIP(hostIp)
  95. // if err != nil {
  96. // //fmt.Println("host ip error")
  97. // hostIp = "127.0.0.1"
  98. // }
  99. // addr := netaddr.IPPortFrom(ip, port)
  100. // trace := tracing.NewTrace(string(c.id), addr)
  101. // if trace == nil {
  102. // return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  103. // }
  104. // c.traceMap[traceId] = trace
  105. //
  106. // trace.TraceStart(method, path, r.Status, r.Duration)
  107. // return nil
  108. //}
  109. // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
  110. // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
  111. func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
  112. if t.AllEventReady(traceID) {
  113. t.SendEvent()
  114. klog.Debugf("SendEvent %d", traceID)
  115. //fmt.Println(t.GetSpan())
  116. //fmt.Println("===============")
  117. delete(c.traceMap, traceID)
  118. }
  119. }
  120. func (c *Container) valuableTrace(traceID uint64) bool {
  121. return traceID != 0
  122. }
  123. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  124. c.lock.Lock()
  125. defer c.lock.Unlock()
  126. if r.Protocol == l7.ProtocolDNS {
  127. ip2fqdn, _type, fqdn, ttl, ips := c.onDNSRequest(r)
  128. if c.l7Attach && c.valuableTrace(r.TraceId) {
  129. apmTrace, err := c.getOrInitTrace(r.TraceId)
  130. if err == nil {
  131. apmTrace.DNSTraceQueryEvent(r, _type, fqdn, ttl, ips)
  132. c.SendEvent(apmTrace, r.TraceId)
  133. }
  134. }
  135. return ip2fqdn
  136. }
  137. //if !c.valuableTrace(r.TraceId) {
  138. // return nil
  139. //}
  140. //klog.Infof("====ProtocolTrace+++++ start==== %d %d", pid, r.TraceId)
  141. // klog.Infof("====ProtocolTrace===== start==== %d %d", r.Protocol == l7.ProtocolTrace, c.l7Attach)
  142. if c.l7Attach && c.valuableTrace(r.TraceId) {
  143. // klog.Infof("====ProtocolTrace---- start==== %d %d", pid, r.TraceId)
  144. if r.TraceStart == TRACE_STATUS {
  145. // klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId)
  146. trace, err := c.getOrInitTrace(r.TraceId)
  147. if c.AppInfo.AppName != "" {
  148. klog.Debugf("->>> [%s] -> payload:[%s]", c.AppInfo.AppName, r.Payload)
  149. }
  150. if err == nil {
  151. switch r.Protocol {
  152. case l7.ProtocolHTTP:
  153. method, requestURI, sn, sport, userAgent := l7.ParseHttpHostWithUserAgent(r.Payload, r.IsTls)
  154. // userAgent 可以在这里使用,例如传递给 trace.TraceStartEvent
  155. ip, _ := netaddr.ParseIP(sn)
  156. //codeType := c.GetCodeTypeFromCache(pid)
  157. container_id := ""
  158. if c.cgroup != nil {
  159. container_id = c.cgroup.ContainerId
  160. }
  161. trace.TraceStartEvent(method, requestURI, sn, userAgent, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo(), container_id)
  162. c.SendEvent(trace, r.TraceId)
  163. case l7.ProtocolGrpc:
  164. // gRPC
  165. ipAddr, port, containerID := c.getGrpcServerNetworkInfo()
  166. trace.GrpcServerTraceStartEvent(ipAddr, port, r, c.GetAppInfo(), containerID)
  167. c.SendEvent(trace, r.TraceId)
  168. case l7.ProtocolKafka:
  169. var sn string
  170. var sport uint16
  171. var container_id string
  172. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  173. if conn != nil {
  174. sn = conn.ActualDest.IP().String()
  175. sport = conn.ActualDest.Port()
  176. }
  177. if c.cgroup != nil {
  178. container_id = c.cgroup.ContainerId
  179. }
  180. // MQ
  181. trace.MQConsumerTraceStartEvent(sn, sport, r, c.GetAppInfo(), container_id)
  182. c.SendEvent(trace, r.TraceId)
  183. }
  184. }
  185. return nil
  186. }
  187. if r.TraceEnd == TRACE_STATUS {
  188. klog.Debugf("====ProtocolTrace end==== %d %d", pid, r.TraceId)
  189. trace, err := c.getOrInitTrace(r.TraceId)
  190. if err == nil {
  191. trace.TraceEndEvent(r)
  192. c.SendEvent(trace, r.TraceId)
  193. }
  194. return nil
  195. }
  196. }
  197. /**
  198. * HTTP
  199. */
  200. if r.Protocol == l7.ProtocolHTTP {
  201. if c.l7Attach && c.valuableTrace(r.TraceId) {
  202. // 检查是否启用 Elasticsearch 检测
  203. if *flags.EnableElasticsearchDetection {
  204. // 解析 User-Agent 以检测 Elasticsearch 请求
  205. method, requestURI, sn, sport, userAgent := l7.ParseHttpHostWithUserAgent(r.Payload, r.IsTls)
  206. // 检查是否是 Elasticsearch 请求(通过 User-Agent)
  207. isElasticsearch := strings.Contains(strings.ToLower(userAgent), "elasticsearch")
  208. apmTrace, err := c.getOrInitTrace(r.TraceId)
  209. //fmt.Println("ProtocolHTTP-----", r.TraceId, err)
  210. if err == nil {
  211. if isElasticsearch {
  212. r.Protocol = l7.ProtocolES
  213. // Elasticsearch 请求,按照 NoSQL 方式处理
  214. query := l7.ParseElasticsearch(r.Payload)
  215. if c.AppInfo.AppName != "" {
  216. klog.Debugf("[%s] ->>>>> Elasticsearch -> %s:%d Query:[%s]", c.AppInfo.AppName, sn, sport, query)
  217. }
  218. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  219. if conn == nil {
  220. conn = &ActiveConnection{
  221. Dest: r.ComponentDAddr,
  222. ActualDest: r.ComponentDAddr,
  223. Timestamp: timestamp,
  224. }
  225. }
  226. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemElasticsearch, method, query, r, conn.Src, conn.ActualDest)
  227. } else {
  228. // 普通 HTTP 请求
  229. apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
  230. }
  231. c.SendEvent(apmTrace, r.TraceId)
  232. }
  233. } else {
  234. // Elasticsearch 检测未启用,使用普通 HTTP 处理
  235. method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
  236. apmTrace, err := c.getOrInitTrace(r.TraceId)
  237. if err == nil {
  238. apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
  239. c.SendEvent(apmTrace, r.TraceId)
  240. }
  241. }
  242. }
  243. //return nil
  244. }
  245. /**
  246. * gRPC
  247. */
  248. if r.Protocol == l7.ProtocolGrpc {
  249. klog.Infoln("conn == nil r.Protocol == l7.ProtocolGrpc")
  250. klog.Infoln("enter the l7.ProtocolGrpc")
  251. if c.l7Attach && c.valuableTrace(r.TraceId) {
  252. apmTrace, err := c.getOrInitTrace(r.TraceId)
  253. if err == nil {
  254. apmTrace.GrpcClientTraceQueryEvent(r)
  255. c.SendEvent(apmTrace, r.TraceId)
  256. }
  257. }
  258. }
  259. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  260. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  261. if conn == nil {
  262. conn = &ActiveConnection{
  263. Dest: r.ComponentDAddr,
  264. ActualDest: r.ComponentDAddr,
  265. Timestamp: timestamp,
  266. }
  267. //return nil
  268. }
  269. if timestamp != 0 && conn.Timestamp != timestamp {
  270. //if r.Protocol == l7.ProtocolGrpc {
  271. // klog.Infoln("timestamp != 0 && conn.Timestamp != timestamp r.Protocol == l7.ProtocolGrpc")
  272. // klog.Infoln("enter the l7.ProtocolGrpc")
  273. // if c.l7Attach && c.valuableTrace(r.TraceId) {
  274. // apmTrace, err := c.getOrInitTrace(r.TraceId)
  275. // if err == nil {
  276. // apmTrace.GrpcClientTraceQueryEvent(r)
  277. // c.SendEvent(apmTrace, r.TraceId)
  278. // }
  279. // }
  280. //}
  281. return nil
  282. }
  283. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  284. //trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  285. switch r.Protocol {
  286. /**
  287. * HTTP
  288. */
  289. case l7.ProtocolHTTP:
  290. if c.AppInfo.AppName != "" {
  291. klog.Debugf("[%s] ->>>>> curl -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, r.Payload)
  292. }
  293. stats.observe(r.Status.Http(), "", r.Duration)
  294. /**
  295. * HTTP2
  296. */
  297. case l7.ProtocolHTTP2:
  298. if conn.http2Parser == nil {
  299. conn.http2Parser = l7.NewHttp2Parser()
  300. }
  301. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  302. for _, req := range requests {
  303. stats.observe(req.Status.Http(), "", req.Duration)
  304. //trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  305. }
  306. /**
  307. * PostgreSQL
  308. */
  309. case l7.ProtocolPostgres:
  310. if r.Method != l7.MethodStatementClose {
  311. stats.observe(r.Status.String(), "", r.Duration)
  312. }
  313. //if conn.postgresParser == nil {
  314. // conn.postgresParser = l7.NewPostgresParser()
  315. //}
  316. //query := conn.postgresParser.Parse(r.Payload)
  317. //trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  318. if c.l7Attach && c.valuableTrace(r.TraceId) {
  319. if conn.postgresParser == nil {
  320. conn.postgresParser = l7.NewPostgresParser()
  321. }
  322. query := conn.postgresParser.Parse(r.Payload)
  323. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  324. if c.AppInfo.AppName != "" {
  325. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  326. }
  327. //apmTrace, ok := c.getTrace(r.TraceId)
  328. apmTrace, err := c.getOrInitTrace(r.TraceId)
  329. //fmt.Println("mysql r.TraceId:", r.TraceId)
  330. //fmt.Println("ok:", ok)
  331. //fmt.Println("traceMap:", len(c.traceMap))
  332. if err == nil {
  333. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  334. //apmTrace.PostGreSqlTraceQueryEvent(query, r, conn.ActualDest)
  335. apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemPostgreSQL, query, r, conn.ActualDest)
  336. c.SendEvent(apmTrace, r.TraceId)
  337. }
  338. }
  339. /**
  340. * Mysql
  341. */
  342. case l7.ProtocolMysql:
  343. if r.Method != l7.MethodStatementClose {
  344. stats.observe(r.Status.String(), "", r.Duration)
  345. }
  346. if c.l7Attach && c.valuableTrace(r.TraceId) {
  347. if conn.mysqlParser == nil {
  348. conn.mysqlParser = l7.NewMysqlParser()
  349. }
  350. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  351. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  352. if c.AppInfo.AppName != "" {
  353. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  354. }
  355. //apmTrace, ok := c.getTrace(r.TraceId)
  356. apmTrace, err := c.getOrInitTrace(r.TraceId)
  357. //fmt.Println("mysql r.TraceId:", r.TraceId)
  358. //fmt.Println("ok:", ok)
  359. //fmt.Println("traceMap:", len(c.traceMap))
  360. if err == nil {
  361. dbSystem := semconv.DBSystemMySQL
  362. // 根据端口白名单确定协议类型
  363. l7Type := flags.GetProtocolByPort(uint16(conn.ActualDest.Port()))
  364. if l7Type == l7.ProtocolMariaDB {
  365. dbSystem = semconv.DBSystemMariaDB
  366. } else if l7Type == l7.ProtocolTiDB {
  367. dbSystem = semconv.DBSystemTiDB
  368. }
  369. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  370. //apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
  371. apmTrace.SQLTraceQueryEvent(l7Type, dbSystem, query, r, conn.ActualDest)
  372. c.SendEvent(apmTrace, r.TraceId)
  373. }
  374. }
  375. /**
  376. * DM (达梦数据库)
  377. */
  378. case l7.ProtocolDM:
  379. //统计dm的query次数
  380. stats.observe(r.Status.String(), "", r.Duration)
  381. //是否发送数据
  382. if c.l7Attach && c.valuableTrace(r.TraceId) {
  383. if conn.dmParser == nil {
  384. conn.dmParser = l7.NewDmParser()
  385. }
  386. query := conn.dmParser.Parse(r.Payload, r.StatementId)
  387. if c.AppInfo.AppName != "" {
  388. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  389. }
  390. apmTrace, err := c.getOrInitTrace(r.TraceId)
  391. if err == nil {
  392. //apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest)
  393. apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemDaMengDB, query, r, conn.ActualDest)
  394. c.SendEvent(apmTrace, r.TraceId)
  395. }
  396. }
  397. /**
  398. * Memcached
  399. */
  400. case l7.ProtocolMemcached:
  401. stats.observe(r.Status.String(), "", r.Duration)
  402. if c.l7Attach && c.valuableTrace(r.TraceId) {
  403. cmd, items := l7.ParseMemcached(r.Payload)
  404. if c.AppInfo.AppName != "" {
  405. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd+" "+strings.Join(items, " "))
  406. }
  407. apmTrace, err := c.getOrInitTrace(r.TraceId)
  408. if err == nil {
  409. statement := cmd
  410. if len(items) == 1 {
  411. statement += " " + items[0]
  412. } else if len(items) > 1 {
  413. joined := fmt.Sprintf("[%s]", strings.Join(items, " "))
  414. statement += " " + joined
  415. }
  416. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMemcached, cmd, statement, r, conn.Src, conn.ActualDest)
  417. c.SendEvent(apmTrace, r.TraceId)
  418. }
  419. }
  420. /**
  421. * Redis
  422. */
  423. case l7.ProtocolRedis:
  424. stats.observe(r.Status.String(), "", r.Duration)
  425. if c.l7Attach && c.valuableTrace(r.TraceId) {
  426. cmd, args := l7.ParseRedis(r.Payload)
  427. //fmt.Println("cmd", cmd)
  428. //fmt.Println("args", args)
  429. //apmTrace, ok := c.getTrace(r.TraceId)
  430. if c.AppInfo.AppName != "" {
  431. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd)
  432. }
  433. apmTrace, err := c.getOrInitTrace(r.TraceId)
  434. if err == nil {
  435. statement := cmd
  436. if args != "" {
  437. statement += " " + args
  438. }
  439. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemRedis, cmd, statement, r, conn.Src, conn.ActualDest)
  440. c.SendEvent(apmTrace, r.TraceId)
  441. }
  442. }
  443. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  444. /**
  445. * gRPC
  446. */
  447. case l7.ProtocolGrpc:
  448. klog.Debugln("enter the l7.ProtocolGrpc")
  449. stats.observe(r.Status.String(), "", r.Duration)
  450. if c.l7Attach && c.valuableTrace(r.TraceId) {
  451. apmTrace, err := c.getOrInitTrace(r.TraceId)
  452. if err == nil {
  453. apmTrace.GrpcClientTraceQueryEvent(r)
  454. c.SendEvent(apmTrace, r.TraceId)
  455. }
  456. }
  457. /**
  458. * MongoDB
  459. */
  460. case l7.ProtocolMongo:
  461. stats.observe(r.Status.String(), "", r.Duration)
  462. if c.l7Attach && c.valuableTrace(r.TraceId) {
  463. query := l7.ParseMongo(r.Payload)
  464. if c.AppInfo.AppName != "" {
  465. klog.Debugf("[%s] ->>>>> MongoDB -> %s SQL:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
  466. }
  467. apmTrace, err := c.getOrInitTrace(r.TraceId)
  468. if err == nil {
  469. // MongoDB query 格式通常是 JSON,如 {"insert":"users"} 或 {"find":"users","filter":{...}}
  470. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMongoDB, "", query, r, conn.Src, conn.ActualDest)
  471. c.SendEvent(apmTrace, r.TraceId)
  472. }
  473. }
  474. /**
  475. * Cassandra
  476. */
  477. case l7.ProtocolCassandra:
  478. stats.observe(r.Status.String(), "", r.Duration)
  479. if c.l7Attach && c.valuableTrace(r.TraceId) {
  480. if conn.cassandraParser == nil {
  481. conn.cassandraParser = l7.NewCassandraParser()
  482. }
  483. var query string
  484. query = string(r.Payload)
  485. //query := conn.cassandraParser.Parse(r.Payload)
  486. if c.AppInfo.AppName != "" {
  487. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  488. }
  489. apmTrace, err := c.getOrInitTrace(r.TraceId)
  490. if err == nil {
  491. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemCassandra, "", query, r, conn.Src, conn.ActualDest)
  492. c.SendEvent(apmTrace, r.TraceId)
  493. }
  494. }
  495. /**
  496. * BuntDB
  497. */
  498. case l7.ProtocolBuntDB:
  499. stats.observe(r.Status.String(), "", r.Duration)
  500. if c.l7Attach && c.valuableTrace(r.TraceId) {
  501. query := string(r.Payload)
  502. if c.AppInfo.AppName != "" {
  503. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  504. }
  505. apmTrace, err := c.getOrInitTrace(r.TraceId)
  506. if err == nil {
  507. // BuntDB 是键值数据库,使用 NoSQLTraceQueryEvent,operation 从 payload 中提取(如 "Set: key_name")
  508. operation := ""
  509. statement := query
  510. dbSystem := attribute.String("db.system", "buntdb")
  511. defaultLocalAddr := netaddr.IPPortFrom(netaddr.MustParseIP("127.0.0.1"), 0)
  512. apmTrace.NoSQLTraceQueryEvent(r.Protocol, dbSystem, operation, statement, r, defaultLocalAddr, defaultLocalAddr)
  513. c.SendEvent(apmTrace, r.TraceId)
  514. }
  515. }
  516. /**
  517. * LevelDB
  518. */
  519. case l7.ProtocolLevelDB:
  520. stats.observe(r.Status.String(), "", r.Duration)
  521. if c.l7Attach && c.valuableTrace(r.TraceId) {
  522. query := string(r.Payload)
  523. if c.AppInfo.AppName != "" {
  524. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  525. }
  526. apmTrace, err := c.getOrInitTrace(r.TraceId)
  527. if err == nil {
  528. operation := ""
  529. statement := query
  530. dbSystem := attribute.String("db.system", "leveldb")
  531. defaultLocalAddr := netaddr.IPPortFrom(netaddr.MustParseIP("127.0.0.1"), 0)
  532. apmTrace.NoSQLTraceQueryEvent(r.Protocol, dbSystem, operation, statement, r, defaultLocalAddr, defaultLocalAddr)
  533. c.SendEvent(apmTrace, r.TraceId)
  534. }
  535. }
  536. /**
  537. * Kafka
  538. */
  539. case l7.ProtocolKafka:
  540. stats.observe(r.Status.String(), "", r.Duration)
  541. if c.l7Attach && c.valuableTrace(r.TraceId) {
  542. if c.AppInfo.AppName != "" {
  543. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, r.DestAddrString)
  544. }
  545. apmTrace, err := c.getOrInitTrace(r.TraceId)
  546. if err == nil {
  547. apmTrace.MQTraceQueryEvent(r.Protocol, semconv.MessagingKafkaClientID("kafka"), "", "", r, conn.Src, conn.ActualDest)
  548. c.SendEvent(apmTrace, r.TraceId)
  549. }
  550. }
  551. /**
  552. * RabbitMQ / NATS
  553. */
  554. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  555. stats.observe(r.Status.String(), r.Method.String(), 0)
  556. if c.l7Attach && c.valuableTrace(r.TraceId) {
  557. }
  558. /**
  559. * Dubbo2
  560. */
  561. case l7.ProtocolDubbo2:
  562. stats.observe(r.Status.String(), "", r.Duration)
  563. if c.l7Attach && c.valuableTrace(r.TraceId) {
  564. }
  565. }
  566. return nil
  567. }
  568. func (c *Container) buildIDs(pid uint32) bool {
  569. c.lock.Lock()
  570. defer c.lock.Unlock()
  571. p := c.processes[pid]
  572. if p != nil {
  573. p.cmdline = string(proc.GetRealCmdline(pid))
  574. }
  575. var sns []string
  576. var sport uint16
  577. for address, val := range c.getListens() {
  578. if val == 1 {
  579. ip := address.IP()
  580. if ip.Is4() && !ip.IsLoopback() {
  581. // 获取端口号
  582. sport = address.Port()
  583. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  584. ////c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  585. //c.AppInfo.Sn = ip.String()
  586. //c.AppInfo.Sport = int(port)
  587. //strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip.String(), port))
  588. //fmt.Println(port)
  589. ////os.Exit(1)
  590. //c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  591. //c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  592. ////c.AppInfo.InstanceId = c.instanceID.IntVal
  593. //strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid))))
  594. //c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  595. //c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  596. //return true
  597. }
  598. }
  599. }
  600. if len(sns) > 0 {
  601. //c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  602. snsStr := strings.Join(sns, ",")
  603. c.AppInfo.Sn = snsStr
  604. c.AppInfo.Sport = int(sport)
  605. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  606. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  607. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  608. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  609. // c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  610. // c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  611. return true
  612. }
  613. return false
  614. }
  615. func (c *Container) ReBuildIds(pid uint32) {
  616. c.lock.Lock()
  617. defer c.lock.Unlock()
  618. var sns []string
  619. var sport uint16
  620. for address, val := range c.getListens() {
  621. if val == 1 {
  622. ip := address.IP()
  623. if ip.Is4() && !ip.IsLoopback() {
  624. // 获取端口号
  625. sport = address.Port()
  626. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  627. }
  628. }
  629. }
  630. if len(sns) > 0 {
  631. snsStr := strings.Join(sns, ",")
  632. c.AppInfo.Sn = snsStr
  633. c.AppInfo.Sport = int(sport)
  634. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  635. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  636. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  637. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  638. strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", utils.GetHostIP(), c.AppInfo.AppIdHash.IntVal))
  639. c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  640. c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  641. }
  642. }
  643. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  644. c.lock.Lock()
  645. defer c.lock.Unlock()
  646. // get the associated uprobe
  647. uprobe, err := c.GetUprobe(event, tracer)
  648. if err != nil {
  649. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  650. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  651. return
  652. }
  653. if event.TraceId <= 0 {
  654. //fmt.Println("StackProcess TraceId id 0")
  655. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  656. return
  657. }
  658. // fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  659. stackFun := ebpftracer.StackFunEvent{}
  660. stackFun.Uprobe = &uprobe
  661. stackFun.StackEvent = event
  662. apmTrace, ok := c.getTrace(event.TraceId)
  663. if ok {
  664. apmTrace.FunAdd(stackFun)
  665. }
  666. }
  667. func byteExtractString(nameString [100]byte) string {
  668. n := bytes.IndexFunc(nameString[:], func(r rune) bool {
  669. return r == 0 || r < 32 || r > 126 // 截取到第一个零值或非打印字符
  670. })
  671. if n == -1 {
  672. n = len(nameString) // 没找到零值或非打印字符,使用数组长度
  673. }
  674. return string(nameString[:n])
  675. }
  676. func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  677. c.lock.Lock()
  678. defer c.lock.Unlock()
  679. // get the associated uprobe
  680. switch event.Location {
  681. case 0: // ret
  682. Funcname := ""
  683. if event.Type != uint64(CodeTypeJava) {
  684. uprobe, err := c.GetUprobe(event, tracer)
  685. if err != nil {
  686. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  687. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  688. return
  689. }
  690. Funcname = uprobe.Funcname
  691. } else {
  692. ClassName := byteExtractString(event.ClassName)
  693. MethedName := byteExtractString(event.MethedName)
  694. Funcname = ClassName + "." + MethedName
  695. }
  696. if event.TraceId <= 0 {
  697. //fmt.Println("StackProcess TraceId id 0")
  698. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  699. return
  700. }
  701. //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  702. apmTrace, err := c.getOrInitTrace(event.TraceId)
  703. if err == nil {
  704. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  705. duration := event.TimeNsEnd - event.TimeNsStart
  706. apmTrace.FuncTraceQuery(Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
  707. c.SendEvent(apmTrace, event.TraceId)
  708. }
  709. }
  710. }
  711. // ResolveAddress returns the symbol(s) and offset of the given address.
  712. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  713. if addr == 0 {
  714. // err = errors.Wrapf(SymbolNotFoundError, "0")
  715. return
  716. }
  717. // symbols, _, err := e.Symbols()
  718. if err != nil {
  719. return
  720. }
  721. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  722. if idx == 0 {
  723. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  724. return
  725. }
  726. // why diff symbol may contains the same addr?
  727. sym := symbols[idx-1]
  728. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  729. syms = append(syms, symbols[i])
  730. }
  731. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  732. syms = append(syms, symbols[i])
  733. }
  734. return syms, uint(addr - sym.Value), nil
  735. }
  736. type MemoryMap struct {
  737. Start, End uint64
  738. }
  739. // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
  740. func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
  741. file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
  742. if err != nil {
  743. return nil, err
  744. }
  745. defer file.Close()
  746. scanner := bufio.NewScanner(file)
  747. if scanner.Scan() {
  748. fields := strings.Fields(scanner.Text())
  749. addresses := strings.Split(fields[0], "-")
  750. if len(addresses) != 2 {
  751. return nil, errors.New("unexpected format in /proc/<pid>/maps")
  752. }
  753. start, err := strconv.ParseUint(addresses[0], 16, 64)
  754. if err != nil {
  755. return nil, err
  756. }
  757. end, err := strconv.ParseUint(addresses[1], 16, 64)
  758. if err != nil {
  759. return nil, err
  760. }
  761. return &MemoryMap{
  762. Start: start,
  763. End: end,
  764. }, nil
  765. }
  766. if err := scanner.Err(); err != nil {
  767. return nil, err
  768. }
  769. return nil, errors.New("empty /proc/<pid>/maps")
  770. }
  771. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  772. //fmt.Println("GetUprobe entory:")
  773. memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
  774. Address := event.Ip - memoryMap.Start
  775. // fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
  776. for _, fun := range c.UprobesMap {
  777. funAddress := fun.Address + fun.AbsOffset
  778. // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
  779. if funAddress == Address {
  780. // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x ---- %s--%x\n", memoryMap.Start, event.Ip, fun.Funcname, fun.Address)
  781. return fun, nil
  782. }
  783. }
  784. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  785. if err != nil {
  786. return
  787. }
  788. for _, sym := range syms {
  789. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  790. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
  791. if ok {
  792. return uprobe, nil
  793. }
  794. }
  795. err = errors.New("uprobe not found")
  796. return
  797. }
  798. func (c *Container) GetAppInfo() AppInfo {
  799. return c.AppInfo
  800. }
  801. // 可注入前置
  802. func (c *Container) checkEventReady() bool {
  803. c.lock.Lock()
  804. defer c.lock.Unlock()
  805. return c.l7EventReady
  806. }
  807. func (c *Container) eventReady() {
  808. c.lock.Lock()
  809. defer c.lock.Unlock()
  810. c.l7EventReady = true
  811. }
  812. // uprobe前置
  813. func (c *Container) Isl7AttachSuccess() bool {
  814. c.lock.Lock()
  815. defer c.lock.Unlock()
  816. return c.l7Attach
  817. }
  818. func (c *Container) l7AttachSuccess() {
  819. c.lock.Lock()
  820. defer c.lock.Unlock()
  821. c.l7Attach = true
  822. }
  823. func (c *Container) IsLicenseFuse() bool {
  824. c.lock.Lock()
  825. defer c.lock.Unlock()
  826. return c.licenseFuse
  827. }
  828. func (c *Container) LicenseFuse() {
  829. c.lock.Lock()
  830. defer c.lock.Unlock()
  831. c.licenseFuse = true
  832. }
  833. func (c *Container) ClearLicenseFuse() {
  834. c.lock.Lock()
  835. defer c.lock.Unlock()
  836. c.licenseFuse = false
  837. }
  838. func (c *Container) ctrlStack(r *Registry, pid uint32) {
  839. resp, err := c.GetCodeSetting(r)
  840. if err != nil {
  841. klog.WithField("pid", pid).WithError(err).Error("[ctrlStack] GetCodeSetting failed.")
  842. return
  843. }
  844. if resp.BlackWhiteSettings.CollectStack == OPEN_STACK {
  845. // 有黑白名单规则 &&
  846. // 之前有注入 先卸载再注入
  847. // 之前没注入 直接注入
  848. // 没有有黑白名单 直接卸载
  849. if c.hasStackRule(resp) {
  850. if c.stackRuleUpdate(resp) {
  851. // 重新注入
  852. err = c.DetachStack(pid, APP_UNINSTALL)
  853. if err != nil {
  854. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  855. }
  856. }
  857. klog.WithField("pid", pid).Infoln("[ctrlStack] Attach app stack.")
  858. c.saveWhiteStackSettingInfo(resp)
  859. err = c.AttachStack(r.tracer, pid)
  860. if err != nil {
  861. c.AppInfo.SetAppStackError()
  862. klog.WithField("pid", pid).WithError(err).Errorf("[ctrlStack][end] Failed attach stack trace!")
  863. }
  864. } else {
  865. if c.noOrigRule() {
  866. return
  867. }
  868. c.saveWhiteStackSettingInfo(resp)
  869. // 关闭堆栈
  870. err = c.DetachStack(pid, APP_UNINSTALL)
  871. if err != nil {
  872. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  873. }
  874. }
  875. } else {
  876. if c.noOrigRule() {
  877. return
  878. }
  879. c.saveWhiteStackSettingInfo(resp)
  880. // 关闭堆栈
  881. err = c.DetachStack(pid, APP_UNINSTALL)
  882. if err != nil {
  883. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  884. }
  885. }
  886. }
  887. func (c *Container) verifyAttachConditions(r *Registry, pid uint32) (bool, int) {
  888. p := c.processes[pid]
  889. if p != nil && c.checkEventReady() {
  890. codeType := c.GetCodeTypeFromCache(pid)
  891. if codeType.IsUnknownCode() {
  892. klog.WithField("pid", pid).Debug("[verify] unknown language.")
  893. return false, 0
  894. }
  895. cmdline := p.GetCmdline()
  896. if len(cmdline) == 0 {
  897. return false, 0
  898. }
  899. //whiteListByCode := r.getWhiteListByCodeType(codeType)
  900. whiteListByCode := r.getWhiteListAll()
  901. //klog.WithField("pid", pid).WithField("codeType", codeType.String()).
  902. // Infof("[verify] white list %v", utils.ToString(whiteListByCode))
  903. // 当前语言的白名单规则
  904. for _, setting := range whiteListByCode {
  905. ruleVal := setting.Filters
  906. if ruleVal == "" {
  907. continue
  908. }
  909. // 判断规则
  910. if strings.Contains(cmdline, ruleVal) {
  911. //if !codeType.IsJvmCode() {
  912. // klog.WithField("pid", pid).Warning("[verify] This agent version only supports JVM applications.")
  913. // return false, 0
  914. //}
  915. c.WhiteSettingInfo.AppName = setting.AppName
  916. c.WhiteSettingInfo.Filters = setting.Filters
  917. klog.WithField("pid", pid).
  918. WithField("codeType", codeType.String()).
  919. WithField("ruleVal", ruleVal).
  920. WithField("cmdline", cmdline).
  921. //WithField("stack", setting.OpenStack).
  922. WithField("white list", utils.ToString(whiteListByCode)).
  923. Infoln("[verify] check successful.")
  924. return true, 0
  925. }
  926. }
  927. }
  928. return false, 0
  929. }
  930. // 1.卸载入口
  931. func (c *Container) Detach(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) {
  932. c.lock.Lock()
  933. defer c.lock.Unlock()
  934. if p := c.processes[pid]; p != nil {
  935. err := c.DetachUprobes(tracer, pid, detachType)
  936. if err != nil {
  937. klog.WithError(err).Errorln("DetachUprobes Error.")
  938. }
  939. err = c.DetachStack(pid, detachType)
  940. if err != nil {
  941. klog.WithError(err).Errorln("DetachStack Error.")
  942. }
  943. // 关闭7层监控
  944. c.l7Attach = false
  945. // 变更应用状态
  946. if err != nil {
  947. detachType = detachType.Error()
  948. }
  949. c.AppInfo.SetAppStatus(detachType)
  950. }
  951. }
  952. // 1.1卸载uprobe
  953. func (c *Container) DetachUprobes(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) error {
  954. // close uprobe
  955. if p := c.processes[pid]; p != nil {
  956. for _, u := range p.uprobes {
  957. err := u.Close()
  958. if err != nil {
  959. return err
  960. }
  961. }
  962. p.uprobes = []link.Link{}
  963. switch detachType {
  964. case APP_UNINSTALL, APP_FUSE, APP_LICENSE_FUSE:
  965. codeType := c.GetCodeTypeFromCache(pid)
  966. switch codeType {
  967. case CodeTypeJava:
  968. p.jvmAttachOnce = false
  969. case CodeTypeGo:
  970. p.goTlsUprobesChecked = false
  971. p.openSslUprobesChecked = false
  972. default:
  973. }
  974. case APP_UPROBE_ERROR:
  975. klog.Infof("[DetachUprobes] ERROR_DETACH for pid %d", pid)
  976. default:
  977. }
  978. //delete the proc info form proc_info_map(for kernel) when the uprobe detached
  979. if err := tracer.DelKProcInfo(pid); err != nil {
  980. return fmt.Errorf("[DetachUprobes] failed to delete KProcInfo for pid %d, detach type is:%s", pid, detachType)
  981. } else {
  982. klog.Infof("[DetachUprobes] delete KProcInfo success for pid %d,detachType:%s", pid, detachType.String())
  983. c.AppInfo.EBPFProcInfo = nil
  984. }
  985. } else {
  986. return fmt.Errorf("[DetachUprobes] cannot find uprobe for pid %d", pid)
  987. }
  988. return nil
  989. }
  990. // 1.2卸载堆栈
  991. func (c *Container) DetachStack(pid uint32, detachType APP_TYPE) error {
  992. if p := c.processes[pid]; p != nil {
  993. var err error
  994. codeType := c.GetCodeTypeFromCache(pid)
  995. switch codeType {
  996. // 1.2.1 卸载 jvm堆栈
  997. case CodeTypeJava:
  998. err = c.detachJvmStack(pid)
  999. default:
  1000. err = p.closeStackUprobes()
  1001. }
  1002. if err != nil {
  1003. klog.WithError(err).Errorln("[detachStack] failed to detach stack")
  1004. return err
  1005. }
  1006. p.stackAttachOnce = false
  1007. } else {
  1008. return fmt.Errorf("[DetachStack] cannot find uprobe for pid %d", pid)
  1009. }
  1010. return nil
  1011. }
  1012. // 1.2.1 卸载 jvm堆栈
  1013. func (c *Container) detachJvmStack(pid uint32) error {
  1014. if p := c.processes[pid]; p != nil {
  1015. //if p.stackStatus.IsStackUprobesSuccess() || len(p.stackUprobes) > 0 {
  1016. //}
  1017. // 卸载 JavaAgent
  1018. var err error
  1019. if p.stackStatus.IsJattachSuccess() {
  1020. // 卸载堆栈probes
  1021. err = p.closeStackUprobes()
  1022. if err != nil {
  1023. klog.WithError(err).Errorf("[detachJvmStack] closeStackUprobes")
  1024. }
  1025. err = p.uninstallJavaAgent()
  1026. if err != nil {
  1027. klog.WithError(err).Errorf("[detachJvmStack] uninstallJavaAgent")
  1028. }
  1029. }
  1030. return err
  1031. }
  1032. return nil
  1033. }
  1034. func (c *Container) getRootfs() string {
  1035. if c.metadata != nil && c.metadata.rootfs != "" {
  1036. return path.Join(*flags.HostDirPathPrefix, c.metadata.rootfs)
  1037. }
  1038. return ""
  1039. }
  1040. func (c *Container) BuildActiveApps(runtimeApps map[uint32]AppStatusInfo, pid uint32) {
  1041. if c == nil {
  1042. //klog.WithField("pid", pid).Warningln("[BuildActiveApps] container_apm is nil.")
  1043. return
  1044. }
  1045. if c.AppInfo.AppName == "" {
  1046. return
  1047. }
  1048. klog.WithField("pid", pid).WithField("appname", c.AppInfo.AppName).Infof("[BuildActiveApps] container %s is running.", c.AppInfo.AppName)
  1049. detail := AppStatusInfo{
  1050. Pid: pid,
  1051. ProcName: c.containerName,
  1052. AppName: c.AppInfo.AppName,
  1053. Language: c.AppInfo.CodeType.String(),
  1054. LanguageVersion: c.AppInfo.Version,
  1055. AppID: c.AppInfo.AppIdHash.IntVal,
  1056. AgentID: c.AppInfo.AgentId,
  1057. InstanceID: c.AppInfo.InstanceIdHash.IntVal,
  1058. Sn: c.AppInfo.Sn,
  1059. Sport: c.AppInfo.Sport,
  1060. RegisterAt: time.Unix(c.AppInfo.RegisterAt, 0).Format("060102 15:04:05"),
  1061. PreStatus: c.AppInfo.PreStatus,
  1062. Status: c.AppInfo.Status,
  1063. Rule: c.WhiteSettingInfo.Filters,
  1064. Container: string(c.id),
  1065. }
  1066. detail.Rule = fmt.Sprintf("%s|%d", c.WhiteSettingInfo.Filters, c.WhiteSettingInfo.WhiteStackSettingInfo.OpenStack)
  1067. if c.AppInfo.UpdateAt != 0 {
  1068. detail.UpdateAt = time.Unix(c.AppInfo.UpdateAt, 0).Format("060102 15:04:05")
  1069. }
  1070. p := c.processes[pid]
  1071. if p != nil {
  1072. detail.StackStatus = p.stackStatus.String()
  1073. v := 0
  1074. if !p.versionFailed {
  1075. v = 1
  1076. }
  1077. detail.StackStatus += fmt.Sprintf("V=%d", v)
  1078. }
  1079. runtimeApps[pid] = detail
  1080. }
  1081. func (c *Container) AgentCtrl(r *Registry, pid uint32) {
  1082. if c == nil {
  1083. //klog.WithField("pid", pid).Warningln("[AgentCtrl] cannot find container.")
  1084. return
  1085. }
  1086. var err error
  1087. verifyAttachConditions, _ := c.verifyAttachConditions(r, pid)
  1088. // License fuse UNINSTALL(仅在开启 License 校验时)
  1089. if *flags.EnableLicenseCheck && c.IsLicenseFuse() {
  1090. if c.Isl7AttachSuccess() {
  1091. c.Detach(r.tracer, pid, APP_LICENSE_FUSE)
  1092. }
  1093. klog.WithField("pid", pid).Infoln("[AgentCtrl] License fusing.")
  1094. return
  1095. }
  1096. // fusing UNINSTALL
  1097. if r.isFusing && c.Isl7AttachSuccess() {
  1098. c.Detach(r.tracer, pid, APP_FUSE)
  1099. klog.WithField("pid", pid).Infoln("[AgentCtrl] fusing")
  1100. return
  1101. }
  1102. // verify UNINSTALL
  1103. if !verifyAttachConditions && c.Isl7AttachSuccess() {
  1104. c.Detach(r.tracer, pid, APP_UNINSTALL)
  1105. klog.WithField("pid", pid).Infoln("[AgentCtrl] rule uninstall.")
  1106. return
  1107. }
  1108. if verifyAttachConditions {
  1109. err = c.RegisterAppInfo(r, pid)
  1110. if err != nil {
  1111. klog.WithError(err).Errorf("[AgentCtrl] Failed registerAppInfo.")
  1112. return
  1113. }
  1114. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes.")
  1115. err = c.AttachUprobes(r.tracer, pid, "Agentctrl")
  1116. if err != nil {
  1117. klog.WithField("pid", pid).WithError(err).Errorf("[AgentCtrl] Failed attach uprobes error!")
  1118. return
  1119. } else {
  1120. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes success!")
  1121. }
  1122. // 堆栈控制
  1123. c.ctrlStack(r, pid)
  1124. }
  1125. }