container_apm.go 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160
  1. package containers
  2. import (
  3. "bufio"
  4. "bytes"
  5. "debug/elf"
  6. "fmt"
  7. "net"
  8. "os"
  9. "path"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/cilium/ebpf/link"
  15. "github.com/coroot/coroot-node-agent/flags"
  16. "github.com/coroot/coroot-node-agent/ebpftracer"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  18. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  19. "github.com/coroot/coroot-node-agent/proc"
  20. "github.com/coroot/coroot-node-agent/tracing"
  21. "github.com/coroot/coroot-node-agent/utils"
  22. . "github.com/coroot/coroot-node-agent/utils/modelse"
  23. "github.com/pkg/errors"
  24. klog "github.com/sirupsen/logrus"
  25. semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
  26. "inet.af/netaddr"
  27. )
  28. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  29. trace, ok := c.traceMap[traceId]
  30. return trace, ok
  31. }
  32. func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
  33. c.traceMap[traceId] = trace
  34. }
  35. // 查询或创建trace信息
  36. func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
  37. trace, ok := c.getTrace(traceId)
  38. if !ok {
  39. //new trace
  40. trace = tracing.NewTraceFromEvent(string(c.id))
  41. if trace == nil {
  42. // tracer 未初始化,返回错误
  43. return nil, fmt.Errorf("tracer is not initialized, cannot create trace")
  44. }
  45. //create TraceMap
  46. c.createTraceMap(traceId, trace)
  47. //create ParentSpan
  48. trace.CreateRootSpan(traceId)
  49. }
  50. return trace, nil
  51. }
  52. // getGrpcServerNetworkInfo 获取 gRPC server 的网络信息
  53. // 返回: IP地址, 端口号, 容器ID
  54. func (c *Container) getGrpcServerNetworkInfo() (string, uint16, string) {
  55. containerID := ""
  56. if c.cgroup != nil {
  57. containerID = c.cgroup.ContainerId
  58. }
  59. ipAddr := ""
  60. ifaces, err := net.Interfaces()
  61. if err == nil {
  62. for _, iface := range ifaces {
  63. if iface.Name == "eth0" {
  64. addrs, err := iface.Addrs()
  65. if err == nil {
  66. for _, addr := range addrs {
  67. var ipnet *net.IPNet
  68. switch v := addr.(type) {
  69. case *net.IPNet:
  70. ipnet = v
  71. case *net.IPAddr:
  72. ipnet = &net.IPNet{IP: v.IP, Mask: v.IP.DefaultMask()}
  73. }
  74. if ipnet != nil && ipnet.IP.To4() != nil {
  75. ipAddr = ipnet.IP.String()
  76. break
  77. }
  78. }
  79. }
  80. break
  81. }
  82. }
  83. }
  84. klog.Debugf("grpc server ip %s", ipAddr)
  85. // 本地端口尝试从AppInfo.Sport获取
  86. port := c.AppInfo.Sport
  87. klog.Debugf("grpc server port %d", port)
  88. return ipAddr, uint16(port), containerID
  89. }
  90. // Deprecated: InitTrace not used
  91. //func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  92. // method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  93. // ip, err := netaddr.ParseIP(hostIp)
  94. // if err != nil {
  95. // //fmt.Println("host ip error")
  96. // hostIp = "127.0.0.1"
  97. // }
  98. // addr := netaddr.IPPortFrom(ip, port)
  99. // trace := tracing.NewTrace(string(c.id), addr)
  100. // if trace == nil {
  101. // return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  102. // }
  103. // c.traceMap[traceId] = trace
  104. //
  105. // trace.TraceStart(method, path, r.Status, r.Duration)
  106. // return nil
  107. //}
  108. // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
  109. // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
  110. func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
  111. if t.AllEventReady(traceID) {
  112. t.SendEvent()
  113. klog.Debugf("SendEvent %d", traceID)
  114. //fmt.Println(t.GetSpan())
  115. //fmt.Println("===============")
  116. delete(c.traceMap, traceID)
  117. }
  118. }
  119. func (c *Container) valuableTrace(traceID uint64) bool {
  120. return traceID != 0
  121. }
  122. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  123. c.lock.Lock()
  124. defer c.lock.Unlock()
  125. if r.Protocol == l7.ProtocolDNS {
  126. ip2fqdn, _type, fqdn, ttl, ips := c.onDNSRequest(r)
  127. if c.l7Attach && c.valuableTrace(r.TraceId) {
  128. apmTrace, err := c.getOrInitTrace(r.TraceId)
  129. if err == nil {
  130. apmTrace.DNSTraceQueryEvent(r, _type, fqdn, ttl, ips)
  131. c.SendEvent(apmTrace, r.TraceId)
  132. }
  133. }
  134. return ip2fqdn
  135. }
  136. //if !c.valuableTrace(r.TraceId) {
  137. // return nil
  138. //}
  139. //klog.Infof("====ProtocolTrace+++++ start==== %d %d", pid, r.TraceId)
  140. // klog.Infof("====ProtocolTrace===== start==== %d %d", r.Protocol == l7.ProtocolTrace, c.l7Attach)
  141. if c.l7Attach && c.valuableTrace(r.TraceId) {
  142. // klog.Infof("====ProtocolTrace---- start==== %d %d", pid, r.TraceId)
  143. if r.TraceStart == TRACE_STATUS {
  144. // klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId)
  145. trace, err := c.getOrInitTrace(r.TraceId)
  146. if c.AppInfo.AppName != "" {
  147. klog.Debugf("->>> [%s] -> payload:[%s]", c.AppInfo.AppName, r.Payload)
  148. }
  149. if err == nil {
  150. switch r.Protocol {
  151. case l7.ProtocolHTTP:
  152. method, requestURI, sn, sport, userAgent := l7.ParseHttpHostWithUserAgent(r.Payload, r.IsTls)
  153. // userAgent 可以在这里使用,例如传递给 trace.TraceStartEvent
  154. ip, _ := netaddr.ParseIP(sn)
  155. //codeType := c.GetCodeTypeFromCache(pid)
  156. container_id := ""
  157. if c.cgroup != nil {
  158. container_id = c.cgroup.ContainerId
  159. }
  160. trace.TraceStartEvent(method, requestURI, sn, userAgent, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo(), container_id)
  161. c.SendEvent(trace, r.TraceId)
  162. case l7.ProtocolGrpc:
  163. // gRPC
  164. ipAddr, port, containerID := c.getGrpcServerNetworkInfo()
  165. trace.GrpcServerTraceStartEvent(ipAddr, port, r, c.GetAppInfo(), containerID)
  166. c.SendEvent(trace, r.TraceId)
  167. case l7.ProtocolKafka:
  168. var sn string
  169. var sport uint16
  170. var container_id string
  171. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  172. if conn != nil {
  173. sn = conn.ActualDest.IP().String()
  174. sport = conn.ActualDest.Port()
  175. }
  176. if c.cgroup != nil {
  177. container_id = c.cgroup.ContainerId
  178. }
  179. // MQ
  180. trace.MQConsumerTraceStartEvent(sn, sport, r, c.GetAppInfo(), container_id)
  181. c.SendEvent(trace, r.TraceId)
  182. }
  183. }
  184. return nil
  185. }
  186. if r.TraceEnd == TRACE_STATUS {
  187. klog.Debugf("====ProtocolTrace end==== %d %d", pid, r.TraceId)
  188. trace, err := c.getOrInitTrace(r.TraceId)
  189. if err == nil {
  190. trace.TraceEndEvent(r)
  191. c.SendEvent(trace, r.TraceId)
  192. }
  193. return nil
  194. }
  195. }
  196. /**
  197. * HTTP
  198. */
  199. if r.Protocol == l7.ProtocolHTTP {
  200. if c.l7Attach && c.valuableTrace(r.TraceId) {
  201. // 检查是否启用 Elasticsearch 检测
  202. if *flags.EnableElasticsearchDetection {
  203. // 解析 User-Agent 以检测 Elasticsearch 请求
  204. method, requestURI, sn, sport, userAgent := l7.ParseHttpHostWithUserAgent(r.Payload, r.IsTls)
  205. // 检查是否是 Elasticsearch 请求(通过 User-Agent)
  206. isElasticsearch := strings.Contains(strings.ToLower(userAgent), "elasticsearch")
  207. apmTrace, err := c.getOrInitTrace(r.TraceId)
  208. //fmt.Println("ProtocolHTTP-----", r.TraceId, err)
  209. if err == nil {
  210. if isElasticsearch {
  211. r.Protocol = l7.ProtocolES
  212. // Elasticsearch 请求,按照 NoSQL 方式处理
  213. query := l7.ParseElasticsearch(r.Payload)
  214. if c.AppInfo.AppName != "" {
  215. klog.Debugf("[%s] ->>>>> Elasticsearch -> %s:%d Query:[%s]", c.AppInfo.AppName, sn, sport, query)
  216. }
  217. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  218. if conn == nil {
  219. conn = &ActiveConnection{
  220. Dest: r.ComponentDAddr,
  221. ActualDest: r.ComponentDAddr,
  222. Timestamp: timestamp,
  223. }
  224. }
  225. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemElasticsearch, method, query, r, conn.Src, conn.ActualDest)
  226. } else {
  227. // 普通 HTTP 请求
  228. apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
  229. }
  230. c.SendEvent(apmTrace, r.TraceId)
  231. }
  232. } else {
  233. // Elasticsearch 检测未启用,使用普通 HTTP 处理
  234. method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
  235. apmTrace, err := c.getOrInitTrace(r.TraceId)
  236. if err == nil {
  237. apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
  238. c.SendEvent(apmTrace, r.TraceId)
  239. }
  240. }
  241. }
  242. //return nil
  243. }
  244. /**
  245. * gRPC
  246. */
  247. if r.Protocol == l7.ProtocolGrpc {
  248. klog.Infoln("conn == nil r.Protocol == l7.ProtocolGrpc")
  249. klog.Infoln("enter the l7.ProtocolGrpc")
  250. if c.l7Attach && c.valuableTrace(r.TraceId) {
  251. apmTrace, err := c.getOrInitTrace(r.TraceId)
  252. if err == nil {
  253. apmTrace.GrpcClientTraceQueryEvent(r)
  254. c.SendEvent(apmTrace, r.TraceId)
  255. }
  256. }
  257. }
  258. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  259. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  260. if conn == nil {
  261. conn = &ActiveConnection{
  262. Dest: r.ComponentDAddr,
  263. ActualDest: r.ComponentDAddr,
  264. Timestamp: timestamp,
  265. }
  266. //return nil
  267. }
  268. if timestamp != 0 && conn.Timestamp != timestamp {
  269. //if r.Protocol == l7.ProtocolGrpc {
  270. // klog.Infoln("timestamp != 0 && conn.Timestamp != timestamp r.Protocol == l7.ProtocolGrpc")
  271. // klog.Infoln("enter the l7.ProtocolGrpc")
  272. // if c.l7Attach && c.valuableTrace(r.TraceId) {
  273. // apmTrace, err := c.getOrInitTrace(r.TraceId)
  274. // if err == nil {
  275. // apmTrace.GrpcClientTraceQueryEvent(r)
  276. // c.SendEvent(apmTrace, r.TraceId)
  277. // }
  278. // }
  279. //}
  280. return nil
  281. }
  282. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  283. //trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  284. switch r.Protocol {
  285. /**
  286. * HTTP
  287. */
  288. case l7.ProtocolHTTP:
  289. if c.AppInfo.AppName != "" {
  290. klog.Debugf("[%s] ->>>>> curl -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, r.Payload)
  291. }
  292. stats.observe(r.Status.Http(), "", r.Duration)
  293. /**
  294. * HTTP2
  295. */
  296. case l7.ProtocolHTTP2:
  297. if conn.http2Parser == nil {
  298. conn.http2Parser = l7.NewHttp2Parser()
  299. }
  300. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  301. for _, req := range requests {
  302. stats.observe(req.Status.Http(), "", req.Duration)
  303. //trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  304. }
  305. /**
  306. * PostgreSQL
  307. */
  308. case l7.ProtocolPostgres:
  309. if r.Method != l7.MethodStatementClose {
  310. stats.observe(r.Status.String(), "", r.Duration)
  311. }
  312. //if conn.postgresParser == nil {
  313. // conn.postgresParser = l7.NewPostgresParser()
  314. //}
  315. //query := conn.postgresParser.Parse(r.Payload)
  316. //trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  317. if c.l7Attach && c.valuableTrace(r.TraceId) {
  318. if conn.postgresParser == nil {
  319. conn.postgresParser = l7.NewPostgresParser()
  320. }
  321. query := conn.postgresParser.Parse(r.Payload)
  322. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  323. if c.AppInfo.AppName != "" {
  324. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  325. }
  326. //apmTrace, ok := c.getTrace(r.TraceId)
  327. apmTrace, err := c.getOrInitTrace(r.TraceId)
  328. //fmt.Println("mysql r.TraceId:", r.TraceId)
  329. //fmt.Println("ok:", ok)
  330. //fmt.Println("traceMap:", len(c.traceMap))
  331. if err == nil {
  332. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  333. //apmTrace.PostGreSqlTraceQueryEvent(query, r, conn.ActualDest)
  334. apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemPostgreSQL, query, r, conn.ActualDest)
  335. c.SendEvent(apmTrace, r.TraceId)
  336. }
  337. }
  338. /**
  339. * Mysql
  340. */
  341. case l7.ProtocolMysql:
  342. if r.Method != l7.MethodStatementClose {
  343. stats.observe(r.Status.String(), "", r.Duration)
  344. }
  345. if c.l7Attach && c.valuableTrace(r.TraceId) {
  346. if conn.mysqlParser == nil {
  347. conn.mysqlParser = l7.NewMysqlParser()
  348. }
  349. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  350. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  351. if c.AppInfo.AppName != "" {
  352. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  353. }
  354. //apmTrace, ok := c.getTrace(r.TraceId)
  355. apmTrace, err := c.getOrInitTrace(r.TraceId)
  356. //fmt.Println("mysql r.TraceId:", r.TraceId)
  357. //fmt.Println("ok:", ok)
  358. //fmt.Println("traceMap:", len(c.traceMap))
  359. if err == nil {
  360. dbSystem := semconv.DBSystemMySQL
  361. // 根据端口白名单确定协议类型
  362. l7Type := flags.GetProtocolByPort(uint16(conn.ActualDest.Port()))
  363. if l7Type == l7.ProtocolMariaDB {
  364. dbSystem = semconv.DBSystemMariaDB
  365. } else if l7Type == l7.ProtocolTiDB {
  366. dbSystem = semconv.DBSystemTiDB
  367. }
  368. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  369. //apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
  370. apmTrace.SQLTraceQueryEvent(l7Type, dbSystem, query, r, conn.ActualDest)
  371. c.SendEvent(apmTrace, r.TraceId)
  372. }
  373. }
  374. /**
  375. * DM (达梦数据库)
  376. */
  377. case l7.ProtocolDM:
  378. //统计dm的query次数
  379. stats.observe(r.Status.String(), "", r.Duration)
  380. //是否发送数据
  381. if c.l7Attach && c.valuableTrace(r.TraceId) {
  382. if conn.dmParser == nil {
  383. conn.dmParser = l7.NewDmParser()
  384. }
  385. query := conn.dmParser.Parse(r.Payload, r.StatementId)
  386. if c.AppInfo.AppName != "" {
  387. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  388. }
  389. apmTrace, err := c.getOrInitTrace(r.TraceId)
  390. if err == nil {
  391. //apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest)
  392. apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemDaMengDB, query, r, conn.ActualDest)
  393. c.SendEvent(apmTrace, r.TraceId)
  394. }
  395. }
  396. /**
  397. * Memcached
  398. */
  399. case l7.ProtocolMemcached:
  400. stats.observe(r.Status.String(), "", r.Duration)
  401. if c.l7Attach && c.valuableTrace(r.TraceId) {
  402. cmd, items := l7.ParseMemcached(r.Payload)
  403. if c.AppInfo.AppName != "" {
  404. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd+" "+strings.Join(items, " "))
  405. }
  406. apmTrace, err := c.getOrInitTrace(r.TraceId)
  407. if err == nil {
  408. statement := cmd
  409. if len(items) == 1 {
  410. statement += " " + items[0]
  411. } else if len(items) > 1 {
  412. joined := fmt.Sprintf("[%s]", strings.Join(items, " "))
  413. statement += " " + joined
  414. }
  415. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMemcached, cmd, statement, r, conn.Src, conn.ActualDest)
  416. c.SendEvent(apmTrace, r.TraceId)
  417. }
  418. }
  419. /**
  420. * Redis
  421. */
  422. case l7.ProtocolRedis:
  423. stats.observe(r.Status.String(), "", r.Duration)
  424. if c.l7Attach && c.valuableTrace(r.TraceId) {
  425. cmd, args := l7.ParseRedis(r.Payload)
  426. //fmt.Println("cmd", cmd)
  427. //fmt.Println("args", args)
  428. //apmTrace, ok := c.getTrace(r.TraceId)
  429. if c.AppInfo.AppName != "" {
  430. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd)
  431. }
  432. apmTrace, err := c.getOrInitTrace(r.TraceId)
  433. if err == nil {
  434. statement := cmd
  435. if args != "" {
  436. statement += " " + args
  437. }
  438. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemRedis, cmd, statement, r, conn.Src, conn.ActualDest)
  439. c.SendEvent(apmTrace, r.TraceId)
  440. }
  441. }
  442. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  443. /**
  444. * gRPC
  445. */
  446. case l7.ProtocolGrpc:
  447. klog.Debugln("enter the l7.ProtocolGrpc")
  448. stats.observe(r.Status.String(), "", r.Duration)
  449. if c.l7Attach && c.valuableTrace(r.TraceId) {
  450. apmTrace, err := c.getOrInitTrace(r.TraceId)
  451. if err == nil {
  452. apmTrace.GrpcClientTraceQueryEvent(r)
  453. c.SendEvent(apmTrace, r.TraceId)
  454. }
  455. }
  456. /**
  457. * MongoDB
  458. */
  459. case l7.ProtocolMongo:
  460. stats.observe(r.Status.String(), "", r.Duration)
  461. if c.l7Attach && c.valuableTrace(r.TraceId) {
  462. query := l7.ParseMongo(r.Payload)
  463. if c.AppInfo.AppName != "" {
  464. klog.Debugf("[%s] ->>>>> MongoDB -> %s SQL:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
  465. }
  466. apmTrace, err := c.getOrInitTrace(r.TraceId)
  467. if err == nil {
  468. // MongoDB query 格式通常是 JSON,如 {"insert":"users"} 或 {"find":"users","filter":{...}}
  469. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMongoDB, "", query, r, conn.Src, conn.ActualDest)
  470. c.SendEvent(apmTrace, r.TraceId)
  471. }
  472. }
  473. /**
  474. * Cassandra
  475. */
  476. case l7.ProtocolCassandra:
  477. stats.observe(r.Status.String(), "", r.Duration)
  478. if c.l7Attach && c.valuableTrace(r.TraceId) {
  479. if conn.cassandraParser == nil {
  480. conn.cassandraParser = l7.NewCassandraParser()
  481. }
  482. var query string
  483. query = string(r.Payload)
  484. //query := conn.cassandraParser.Parse(r.Payload)
  485. if c.AppInfo.AppName != "" {
  486. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  487. }
  488. apmTrace, err := c.getOrInitTrace(r.TraceId)
  489. if err == nil {
  490. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemCassandra, "", query, r, conn.Src, conn.ActualDest)
  491. c.SendEvent(apmTrace, r.TraceId)
  492. }
  493. }
  494. /**
  495. * Kafka
  496. */
  497. case l7.ProtocolKafka:
  498. stats.observe(r.Status.String(), "", r.Duration)
  499. if c.l7Attach && c.valuableTrace(r.TraceId) {
  500. if c.AppInfo.AppName != "" {
  501. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, r.DestAddrString)
  502. }
  503. apmTrace, err := c.getOrInitTrace(r.TraceId)
  504. if err == nil {
  505. apmTrace.MQTraceQueryEvent(r.Protocol, semconv.MessagingKafkaClientID("kafka"), "", "", r, conn.Src, conn.ActualDest)
  506. c.SendEvent(apmTrace, r.TraceId)
  507. }
  508. }
  509. /**
  510. * RabbitMQ / NATS
  511. */
  512. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  513. stats.observe(r.Status.String(), r.Method.String(), 0)
  514. if c.l7Attach && c.valuableTrace(r.TraceId) {
  515. }
  516. /**
  517. * Dubbo2
  518. */
  519. case l7.ProtocolDubbo2:
  520. stats.observe(r.Status.String(), "", r.Duration)
  521. if c.l7Attach && c.valuableTrace(r.TraceId) {
  522. }
  523. }
  524. return nil
  525. }
  526. func (c *Container) buildIDs(pid uint32) bool {
  527. c.lock.Lock()
  528. defer c.lock.Unlock()
  529. p := c.processes[pid]
  530. if p != nil {
  531. p.cmdline = string(proc.GetRealCmdline(pid))
  532. }
  533. var sns []string
  534. var sport uint16
  535. for address, val := range c.getListens() {
  536. if val == 1 {
  537. ip := address.IP()
  538. if ip.Is4() && !ip.IsLoopback() {
  539. // 获取端口号
  540. sport = address.Port()
  541. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  542. ////c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  543. //c.AppInfo.Sn = ip.String()
  544. //c.AppInfo.Sport = int(port)
  545. //strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip.String(), port))
  546. //fmt.Println(port)
  547. ////os.Exit(1)
  548. //c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  549. //c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  550. ////c.AppInfo.InstanceId = c.instanceID.IntVal
  551. //strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid))))
  552. //c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  553. //c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  554. //return true
  555. }
  556. }
  557. }
  558. if len(sns) > 0 {
  559. //c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  560. snsStr := strings.Join(sns, ",")
  561. c.AppInfo.Sn = snsStr
  562. c.AppInfo.Sport = int(sport)
  563. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  564. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  565. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  566. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  567. // c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  568. // c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  569. return true
  570. }
  571. return false
  572. }
  573. func (c *Container) ReBuildIds(pid uint32) {
  574. c.lock.Lock()
  575. defer c.lock.Unlock()
  576. var sns []string
  577. var sport uint16
  578. for address, val := range c.getListens() {
  579. if val == 1 {
  580. ip := address.IP()
  581. if ip.Is4() && !ip.IsLoopback() {
  582. // 获取端口号
  583. sport = address.Port()
  584. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  585. }
  586. }
  587. }
  588. if len(sns) > 0 {
  589. snsStr := strings.Join(sns, ",")
  590. c.AppInfo.Sn = snsStr
  591. c.AppInfo.Sport = int(sport)
  592. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  593. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  594. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  595. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  596. strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", utils.GetHostIP(), c.AppInfo.AppIdHash.IntVal))
  597. c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  598. c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  599. }
  600. }
  601. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  602. c.lock.Lock()
  603. defer c.lock.Unlock()
  604. // get the associated uprobe
  605. uprobe, err := c.GetUprobe(event, tracer)
  606. if err != nil {
  607. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  608. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  609. return
  610. }
  611. if event.TraceId <= 0 {
  612. //fmt.Println("StackProcess TraceId id 0")
  613. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  614. return
  615. }
  616. // fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  617. stackFun := ebpftracer.StackFunEvent{}
  618. stackFun.Uprobe = &uprobe
  619. stackFun.StackEvent = event
  620. apmTrace, ok := c.getTrace(event.TraceId)
  621. if ok {
  622. apmTrace.FunAdd(stackFun)
  623. }
  624. }
  625. func byteExtractString(nameString [100]byte) string {
  626. n := bytes.IndexFunc(nameString[:], func(r rune) bool {
  627. return r == 0 || r < 32 || r > 126 // 截取到第一个零值或非打印字符
  628. })
  629. if n == -1 {
  630. n = len(nameString) // 没找到零值或非打印字符,使用数组长度
  631. }
  632. return string(nameString[:n])
  633. }
  634. func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  635. c.lock.Lock()
  636. defer c.lock.Unlock()
  637. // get the associated uprobe
  638. switch event.Location {
  639. case 0: // ret
  640. Funcname := ""
  641. if event.Type != uint64(CodeTypeJava) {
  642. uprobe, err := c.GetUprobe(event, tracer)
  643. if err != nil {
  644. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  645. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  646. return
  647. }
  648. Funcname = uprobe.Funcname
  649. } else {
  650. ClassName := byteExtractString(event.ClassName)
  651. MethedName := byteExtractString(event.MethedName)
  652. Funcname = ClassName + "." + MethedName
  653. }
  654. if event.TraceId <= 0 {
  655. //fmt.Println("StackProcess TraceId id 0")
  656. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  657. return
  658. }
  659. //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  660. apmTrace, err := c.getOrInitTrace(event.TraceId)
  661. if err == nil {
  662. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  663. duration := event.TimeNsEnd - event.TimeNsStart
  664. apmTrace.FuncTraceQuery(Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
  665. c.SendEvent(apmTrace, event.TraceId)
  666. }
  667. }
  668. }
  669. // ResolveAddress returns the symbol(s) and offset of the given address.
  670. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  671. if addr == 0 {
  672. // err = errors.Wrapf(SymbolNotFoundError, "0")
  673. return
  674. }
  675. // symbols, _, err := e.Symbols()
  676. if err != nil {
  677. return
  678. }
  679. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  680. if idx == 0 {
  681. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  682. return
  683. }
  684. // why diff symbol may contains the same addr?
  685. sym := symbols[idx-1]
  686. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  687. syms = append(syms, symbols[i])
  688. }
  689. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  690. syms = append(syms, symbols[i])
  691. }
  692. return syms, uint(addr - sym.Value), nil
  693. }
  694. type MemoryMap struct {
  695. Start, End uint64
  696. }
  697. // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
  698. func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
  699. file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
  700. if err != nil {
  701. return nil, err
  702. }
  703. defer file.Close()
  704. scanner := bufio.NewScanner(file)
  705. if scanner.Scan() {
  706. fields := strings.Fields(scanner.Text())
  707. addresses := strings.Split(fields[0], "-")
  708. if len(addresses) != 2 {
  709. return nil, errors.New("unexpected format in /proc/<pid>/maps")
  710. }
  711. start, err := strconv.ParseUint(addresses[0], 16, 64)
  712. if err != nil {
  713. return nil, err
  714. }
  715. end, err := strconv.ParseUint(addresses[1], 16, 64)
  716. if err != nil {
  717. return nil, err
  718. }
  719. return &MemoryMap{
  720. Start: start,
  721. End: end,
  722. }, nil
  723. }
  724. if err := scanner.Err(); err != nil {
  725. return nil, err
  726. }
  727. return nil, errors.New("empty /proc/<pid>/maps")
  728. }
  729. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  730. //fmt.Println("GetUprobe entory:")
  731. memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
  732. Address := event.Ip - memoryMap.Start
  733. // fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
  734. for _, fun := range c.UprobesMap {
  735. funAddress := fun.Address + fun.AbsOffset
  736. // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
  737. if funAddress == Address {
  738. // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x ---- %s--%x\n", memoryMap.Start, event.Ip, fun.Funcname, fun.Address)
  739. return fun, nil
  740. }
  741. }
  742. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  743. if err != nil {
  744. return
  745. }
  746. for _, sym := range syms {
  747. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  748. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
  749. if ok {
  750. return uprobe, nil
  751. }
  752. }
  753. err = errors.New("uprobe not found")
  754. return
  755. }
  756. func (c *Container) GetAppInfo() AppInfo {
  757. return c.AppInfo
  758. }
  759. // 可注入前置
  760. func (c *Container) checkEventReady() bool {
  761. c.lock.Lock()
  762. defer c.lock.Unlock()
  763. return c.l7EventReady
  764. }
  765. func (c *Container) eventReady() {
  766. c.lock.Lock()
  767. defer c.lock.Unlock()
  768. c.l7EventReady = true
  769. }
  770. // uprobe前置
  771. func (c *Container) Isl7AttachSuccess() bool {
  772. c.lock.Lock()
  773. defer c.lock.Unlock()
  774. return c.l7Attach
  775. }
  776. func (c *Container) l7AttachSuccess() {
  777. c.lock.Lock()
  778. defer c.lock.Unlock()
  779. c.l7Attach = true
  780. }
  781. func (c *Container) IsLicenseFuse() bool {
  782. c.lock.Lock()
  783. defer c.lock.Unlock()
  784. return c.licenseFuse
  785. }
  786. func (c *Container) LicenseFuse() {
  787. c.lock.Lock()
  788. defer c.lock.Unlock()
  789. c.licenseFuse = true
  790. }
  791. func (c *Container) ClearLicenseFuse() {
  792. c.lock.Lock()
  793. defer c.lock.Unlock()
  794. c.licenseFuse = false
  795. }
  796. func (c *Container) ctrlStack(r *Registry, pid uint32) {
  797. resp, err := c.GetCodeSetting(r)
  798. if err != nil {
  799. klog.WithField("pid", pid).WithError(err).Error("[ctrlStack] GetCodeSetting failed.")
  800. return
  801. }
  802. if resp.BlackWhiteSettings.CollectStack == OPEN_STACK {
  803. // 有黑白名单规则 &&
  804. // 之前有注入 先卸载再注入
  805. // 之前没注入 直接注入
  806. // 没有有黑白名单 直接卸载
  807. if c.hasStackRule(resp) {
  808. if c.stackRuleUpdate(resp) {
  809. // 重新注入
  810. err = c.DetachStack(pid, APP_UNINSTALL)
  811. if err != nil {
  812. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  813. }
  814. }
  815. klog.WithField("pid", pid).Infoln("[ctrlStack] Attach app stack.")
  816. c.saveWhiteStackSettingInfo(resp)
  817. err = c.AttachStack(r.tracer, pid)
  818. if err != nil {
  819. c.AppInfo.SetAppStackError()
  820. klog.WithField("pid", pid).WithError(err).Errorf("[ctrlStack][end] Failed attach stack trace!")
  821. }
  822. } else {
  823. if c.noOrigRule() {
  824. return
  825. }
  826. c.saveWhiteStackSettingInfo(resp)
  827. // 关闭堆栈
  828. err = c.DetachStack(pid, APP_UNINSTALL)
  829. if err != nil {
  830. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  831. }
  832. }
  833. } else {
  834. if c.noOrigRule() {
  835. return
  836. }
  837. c.saveWhiteStackSettingInfo(resp)
  838. // 关闭堆栈
  839. err = c.DetachStack(pid, APP_UNINSTALL)
  840. if err != nil {
  841. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  842. }
  843. }
  844. }
  845. func (c *Container) verifyAttachConditions(r *Registry, pid uint32) (bool, int) {
  846. p := c.processes[pid]
  847. if p != nil && c.checkEventReady() {
  848. codeType := c.GetCodeTypeFromCache(pid)
  849. if codeType.IsUnknownCode() {
  850. klog.WithField("pid", pid).Debug("[verify] unknown language.")
  851. return false, 0
  852. }
  853. cmdline := p.GetCmdline()
  854. if len(cmdline) == 0 {
  855. return false, 0
  856. }
  857. //whiteListByCode := r.getWhiteListByCodeType(codeType)
  858. whiteListByCode := r.getWhiteListAll()
  859. //klog.WithField("pid", pid).WithField("codeType", codeType.String()).
  860. // Infof("[verify] white list %v", utils.ToString(whiteListByCode))
  861. // 当前语言的白名单规则
  862. for _, setting := range whiteListByCode {
  863. ruleVal := setting.Filters
  864. if ruleVal == "" {
  865. continue
  866. }
  867. // 判断规则
  868. if strings.Contains(cmdline, ruleVal) {
  869. //if !codeType.IsJvmCode() {
  870. // klog.WithField("pid", pid).Warning("[verify] This agent version only supports JVM applications.")
  871. // return false, 0
  872. //}
  873. c.WhiteSettingInfo.AppName = setting.AppName
  874. c.WhiteSettingInfo.Filters = setting.Filters
  875. klog.WithField("pid", pid).
  876. WithField("codeType", codeType.String()).
  877. WithField("ruleVal", ruleVal).
  878. WithField("cmdline", cmdline).
  879. //WithField("stack", setting.OpenStack).
  880. WithField("white list", utils.ToString(whiteListByCode)).
  881. Infoln("[verify] check successful.")
  882. return true, 0
  883. }
  884. }
  885. }
  886. return false, 0
  887. }
  888. // 1.卸载入口
  889. func (c *Container) Detach(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) {
  890. c.lock.Lock()
  891. defer c.lock.Unlock()
  892. if p := c.processes[pid]; p != nil {
  893. err := c.DetachUprobes(tracer, pid, detachType)
  894. if err != nil {
  895. klog.WithError(err).Errorln("DetachUprobes Error.")
  896. }
  897. err = c.DetachStack(pid, detachType)
  898. if err != nil {
  899. klog.WithError(err).Errorln("DetachStack Error.")
  900. }
  901. // 关闭7层监控
  902. c.l7Attach = false
  903. // 变更应用状态
  904. if err != nil {
  905. detachType = detachType.Error()
  906. }
  907. c.AppInfo.SetAppStatus(detachType)
  908. }
  909. }
  910. // 1.1卸载uprobe
  911. func (c *Container) DetachUprobes(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) error {
  912. // close uprobe
  913. if p := c.processes[pid]; p != nil {
  914. for _, u := range p.uprobes {
  915. err := u.Close()
  916. if err != nil {
  917. return err
  918. }
  919. }
  920. p.uprobes = []link.Link{}
  921. switch detachType {
  922. case APP_UNINSTALL, APP_FUSE, APP_LICENSE_FUSE:
  923. codeType := c.GetCodeTypeFromCache(pid)
  924. switch codeType {
  925. case CodeTypeJava:
  926. p.jvmAttachOnce = false
  927. case CodeTypeGo:
  928. p.goTlsUprobesChecked = false
  929. p.openSslUprobesChecked = false
  930. default:
  931. }
  932. case APP_UPROBE_ERROR:
  933. klog.Infof("[DetachUprobes] ERROR_DETACH for pid %d", pid)
  934. default:
  935. }
  936. //delete the proc info form proc_info_map(for kernel) when the uprobe detached
  937. if err := tracer.DelKProcInfo(pid); err != nil {
  938. return fmt.Errorf("[DetachUprobes] failed to delete KProcInfo for pid %d, detach type is:%s", pid, detachType)
  939. } else {
  940. klog.Infof("[DetachUprobes] delete KProcInfo success for pid %d,detachType:%s", pid, detachType.String())
  941. c.AppInfo.EBPFProcInfo = nil
  942. }
  943. } else {
  944. return fmt.Errorf("[DetachUprobes] cannot find uprobe for pid %d", pid)
  945. }
  946. return nil
  947. }
  948. // 1.2卸载堆栈
  949. func (c *Container) DetachStack(pid uint32, detachType APP_TYPE) error {
  950. if p := c.processes[pid]; p != nil {
  951. var err error
  952. codeType := c.GetCodeTypeFromCache(pid)
  953. switch codeType {
  954. // 1.2.1 卸载 jvm堆栈
  955. case CodeTypeJava:
  956. err = c.detachJvmStack(pid)
  957. default:
  958. err = p.closeStackUprobes()
  959. }
  960. if err != nil {
  961. klog.WithError(err).Errorln("[detachStack] failed to detach stack")
  962. return err
  963. }
  964. p.stackAttachOnce = false
  965. } else {
  966. return fmt.Errorf("[DetachStack] cannot find uprobe for pid %d", pid)
  967. }
  968. return nil
  969. }
  970. // 1.2.1 卸载 jvm堆栈
  971. func (c *Container) detachJvmStack(pid uint32) error {
  972. if p := c.processes[pid]; p != nil {
  973. //if p.stackStatus.IsStackUprobesSuccess() || len(p.stackUprobes) > 0 {
  974. //}
  975. // 卸载 JavaAgent
  976. var err error
  977. if p.stackStatus.IsJattachSuccess() {
  978. // 卸载堆栈probes
  979. err = p.closeStackUprobes()
  980. if err != nil {
  981. klog.WithError(err).Errorf("[detachJvmStack] closeStackUprobes")
  982. }
  983. err = p.uninstallJavaAgent()
  984. if err != nil {
  985. klog.WithError(err).Errorf("[detachJvmStack] uninstallJavaAgent")
  986. }
  987. }
  988. return err
  989. }
  990. return nil
  991. }
  992. func (c *Container) getRootfs() string {
  993. if c.metadata != nil && c.metadata.rootfs != "" {
  994. return path.Join(*flags.HostDirPathPrefix, c.metadata.rootfs)
  995. }
  996. return ""
  997. }
  998. func (c *Container) BuildActiveApps(runtimeApps map[uint32]AppStatusInfo, pid uint32) {
  999. if c == nil {
  1000. //klog.WithField("pid", pid).Warningln("[BuildActiveApps] container_apm is nil.")
  1001. return
  1002. }
  1003. if c.AppInfo.AppName == "" {
  1004. return
  1005. }
  1006. klog.WithField("pid", pid).WithField("appname", c.AppInfo.AppName).Infof("[BuildActiveApps] container %s is running.", c.AppInfo.AppName)
  1007. detail := AppStatusInfo{
  1008. Pid: pid,
  1009. ProcName: c.containerName,
  1010. AppName: c.AppInfo.AppName,
  1011. Language: c.AppInfo.CodeType.String(),
  1012. LanguageVersion: c.AppInfo.Version,
  1013. AppID: c.AppInfo.AppIdHash.IntVal,
  1014. AgentID: c.AppInfo.AgentId,
  1015. InstanceID: c.AppInfo.InstanceIdHash.IntVal,
  1016. Sn: c.AppInfo.Sn,
  1017. Sport: c.AppInfo.Sport,
  1018. RegisterAt: time.Unix(c.AppInfo.RegisterAt, 0).Format("060102 15:04:05"),
  1019. PreStatus: c.AppInfo.PreStatus,
  1020. Status: c.AppInfo.Status,
  1021. Rule: c.WhiteSettingInfo.Filters,
  1022. Container: string(c.id),
  1023. }
  1024. detail.Rule = fmt.Sprintf("%s|%d", c.WhiteSettingInfo.Filters, c.WhiteSettingInfo.WhiteStackSettingInfo.OpenStack)
  1025. if c.AppInfo.UpdateAt != 0 {
  1026. detail.UpdateAt = time.Unix(c.AppInfo.UpdateAt, 0).Format("060102 15:04:05")
  1027. }
  1028. p := c.processes[pid]
  1029. if p != nil {
  1030. detail.StackStatus = p.stackStatus.String()
  1031. v := 0
  1032. if !p.versionFailed {
  1033. v = 1
  1034. }
  1035. detail.StackStatus += fmt.Sprintf("V=%d", v)
  1036. }
  1037. runtimeApps[pid] = detail
  1038. }
  1039. func (c *Container) AgentCtrl(r *Registry, pid uint32) {
  1040. if c == nil {
  1041. //klog.WithField("pid", pid).Warningln("[AgentCtrl] cannot find container.")
  1042. return
  1043. }
  1044. var err error
  1045. verifyAttachConditions, _ := c.verifyAttachConditions(r, pid)
  1046. // License fuse UNINSTALL(仅在开启 License 校验时)
  1047. if *flags.EnableLicenseCheck && c.IsLicenseFuse() {
  1048. if c.Isl7AttachSuccess() {
  1049. c.Detach(r.tracer, pid, APP_LICENSE_FUSE)
  1050. }
  1051. klog.WithField("pid", pid).Infoln("[AgentCtrl] License fusing.")
  1052. return
  1053. }
  1054. // fusing UNINSTALL
  1055. if r.isFusing && c.Isl7AttachSuccess() {
  1056. c.Detach(r.tracer, pid, APP_FUSE)
  1057. klog.WithField("pid", pid).Infoln("[AgentCtrl] fusing")
  1058. return
  1059. }
  1060. // verify UNINSTALL
  1061. if !verifyAttachConditions && c.Isl7AttachSuccess() {
  1062. c.Detach(r.tracer, pid, APP_UNINSTALL)
  1063. klog.WithField("pid", pid).Infoln("[AgentCtrl] rule uninstall.")
  1064. return
  1065. }
  1066. if verifyAttachConditions {
  1067. err = c.RegisterAppInfo(r, pid)
  1068. if err != nil {
  1069. klog.WithError(err).Errorf("[AgentCtrl] Failed registerAppInfo.")
  1070. return
  1071. }
  1072. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes.")
  1073. err = c.AttachUprobes(r.tracer, pid, "Agentctrl")
  1074. if err != nil {
  1075. klog.WithField("pid", pid).WithError(err).Errorf("[AgentCtrl] Failed attach uprobes error!")
  1076. return
  1077. } else {
  1078. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes success!")
  1079. }
  1080. // 堆栈控制
  1081. c.ctrlStack(r, pid)
  1082. }
  1083. }