container_apm.go 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127
  1. package containers
  2. import (
  3. "bufio"
  4. "bytes"
  5. "debug/elf"
  6. "fmt"
  7. "net"
  8. "os"
  9. "path"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/cilium/ebpf/link"
  15. "github.com/coroot/coroot-node-agent/flags"
  16. "github.com/coroot/coroot-node-agent/ebpftracer"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  18. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  19. "github.com/coroot/coroot-node-agent/proc"
  20. "github.com/coroot/coroot-node-agent/tracing"
  21. "github.com/coroot/coroot-node-agent/utils"
  22. . "github.com/coroot/coroot-node-agent/utils/modelse"
  23. "github.com/pkg/errors"
  24. klog "github.com/sirupsen/logrus"
  25. semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
  26. "inet.af/netaddr"
  27. )
  28. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  29. trace, ok := c.traceMap[traceId]
  30. return trace, ok
  31. }
  32. func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
  33. c.traceMap[traceId] = trace
  34. }
  35. // 查询或创建trace信息
  36. func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
  37. trace, ok := c.getTrace(traceId)
  38. if !ok {
  39. //new trace
  40. trace = tracing.NewTraceFromEvent(string(c.id))
  41. //create TraceMap
  42. c.createTraceMap(traceId, trace)
  43. //create ParentSpan
  44. trace.CreateRootSpan(traceId)
  45. }
  46. return trace, nil
  47. }
  48. // getGrpcServerNetworkInfo 获取 gRPC server 的网络信息
  49. // 返回: IP地址, 端口号, 容器ID
  50. func (c *Container) getGrpcServerNetworkInfo() (string, uint16, string) {
  51. containerID := ""
  52. if c.cgroup != nil {
  53. containerID = c.cgroup.ContainerId
  54. }
  55. ipAddr := ""
  56. ifaces, err := net.Interfaces()
  57. if err == nil {
  58. for _, iface := range ifaces {
  59. if iface.Name == "eth0" {
  60. addrs, err := iface.Addrs()
  61. if err == nil {
  62. for _, addr := range addrs {
  63. var ipnet *net.IPNet
  64. switch v := addr.(type) {
  65. case *net.IPNet:
  66. ipnet = v
  67. case *net.IPAddr:
  68. ipnet = &net.IPNet{IP: v.IP, Mask: v.IP.DefaultMask()}
  69. }
  70. if ipnet != nil && ipnet.IP.To4() != nil {
  71. ipAddr = ipnet.IP.String()
  72. break
  73. }
  74. }
  75. }
  76. break
  77. }
  78. }
  79. }
  80. klog.Debugf("grpc server ip %s", ipAddr)
  81. // 本地端口尝试从AppInfo.Sport获取
  82. port := c.AppInfo.Sport
  83. klog.Debugf("grpc server port %d", port)
  84. return ipAddr, uint16(port), containerID
  85. }
  86. // Deprecated: InitTrace not used
  87. //func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  88. // method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  89. // ip, err := netaddr.ParseIP(hostIp)
  90. // if err != nil {
  91. // //fmt.Println("host ip error")
  92. // hostIp = "127.0.0.1"
  93. // }
  94. // addr := netaddr.IPPortFrom(ip, port)
  95. // trace := tracing.NewTrace(string(c.id), addr)
  96. // if trace == nil {
  97. // return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  98. // }
  99. // c.traceMap[traceId] = trace
  100. //
  101. // trace.TraceStart(method, path, r.Status, r.Duration)
  102. // return nil
  103. //}
  104. // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
  105. // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
  106. func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
  107. if t.AllEventReady(traceID) {
  108. t.SendEvent()
  109. klog.Debugf("SendEvent %d", traceID)
  110. //fmt.Println(t.GetSpan())
  111. //fmt.Println("===============")
  112. delete(c.traceMap, traceID)
  113. }
  114. }
  115. func (c *Container) valuableTrace(traceID uint64) bool {
  116. return traceID != 0
  117. }
  118. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  119. c.lock.Lock()
  120. defer c.lock.Unlock()
  121. if r.Protocol == l7.ProtocolDNS {
  122. ip2fqdn, _type, fqdn, ttl, ips := c.onDNSRequest(r)
  123. if c.l7Attach && c.valuableTrace(r.TraceId) {
  124. apmTrace, err := c.getOrInitTrace(r.TraceId)
  125. if err == nil {
  126. apmTrace.DNSTraceQueryEvent(r, _type, fqdn, ttl, ips)
  127. c.SendEvent(apmTrace, r.TraceId)
  128. }
  129. }
  130. return ip2fqdn
  131. }
  132. //if !c.valuableTrace(r.TraceId) {
  133. // return nil
  134. //}
  135. //klog.Infof("====ProtocolTrace+++++ start==== %d %d", pid, r.TraceId)
  136. // klog.Infof("====ProtocolTrace===== start==== %d %d", r.Protocol == l7.ProtocolTrace, c.l7Attach)
  137. if c.l7Attach && c.valuableTrace(r.TraceId) {
  138. // klog.Infof("====ProtocolTrace---- start==== %d %d", pid, r.TraceId)
  139. if r.TraceStart == TRACE_STATUS {
  140. // klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId)
  141. trace, err := c.getOrInitTrace(r.TraceId)
  142. if c.AppInfo.AppName != "" {
  143. klog.Debugf("->>> [%s] -> payload:[%s]", c.AppInfo.AppName, r.Payload)
  144. }
  145. if err == nil {
  146. switch r.Protocol {
  147. case l7.ProtocolHTTP:
  148. method, requestURI, sn, sport, userAgent := l7.ParseHttpHostWithUserAgent(r.Payload, r.IsTls)
  149. // userAgent 可以在这里使用,例如传递给 trace.TraceStartEvent
  150. ip, _ := netaddr.ParseIP(sn)
  151. //codeType := c.GetCodeTypeFromCache(pid)
  152. container_id := ""
  153. if c.cgroup != nil {
  154. container_id = c.cgroup.ContainerId
  155. }
  156. trace.TraceStartEvent(method, requestURI, sn, userAgent, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo(), container_id)
  157. c.SendEvent(trace, r.TraceId)
  158. case l7.ProtocolGrpc:
  159. // gRPC
  160. ipAddr, port, containerID := c.getGrpcServerNetworkInfo()
  161. trace.GrpcServerTraceStartEvent(ipAddr, port, r, c.GetAppInfo(), containerID)
  162. c.SendEvent(trace, r.TraceId)
  163. case l7.ProtocolKafka:
  164. var sn string
  165. var sport uint16
  166. var container_id string
  167. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  168. if conn != nil {
  169. sn = conn.ActualDest.IP().String()
  170. sport = conn.ActualDest.Port()
  171. }
  172. if c.cgroup != nil {
  173. container_id = c.cgroup.ContainerId
  174. }
  175. // MQ
  176. trace.MQConsumerTraceStartEvent(sn, sport, r, c.GetAppInfo(), container_id)
  177. c.SendEvent(trace, r.TraceId)
  178. }
  179. }
  180. return nil
  181. }
  182. if r.TraceEnd == TRACE_STATUS {
  183. klog.Debugf("====ProtocolTrace end==== %d %d", pid, r.TraceId)
  184. trace, err := c.getOrInitTrace(r.TraceId)
  185. if err == nil {
  186. trace.TraceEndEvent(r)
  187. c.SendEvent(trace, r.TraceId)
  188. }
  189. return nil
  190. }
  191. }
  192. /**
  193. * HTTP
  194. */
  195. if r.Protocol == l7.ProtocolHTTP {
  196. if c.l7Attach && c.valuableTrace(r.TraceId) {
  197. // 检查是否启用 Elasticsearch 检测
  198. if *flags.EnableElasticsearchDetection {
  199. // 解析 User-Agent 以检测 Elasticsearch 请求
  200. method, requestURI, sn, sport, userAgent := l7.ParseHttpHostWithUserAgent(r.Payload, r.IsTls)
  201. // 检查是否是 Elasticsearch 请求(通过 User-Agent)
  202. isElasticsearch := strings.Contains(strings.ToLower(userAgent), "elasticsearch")
  203. apmTrace, err := c.getOrInitTrace(r.TraceId)
  204. //fmt.Println("ProtocolHTTP-----", r.TraceId, err)
  205. if err == nil {
  206. if isElasticsearch {
  207. r.Protocol = l7.ProtocolES
  208. // Elasticsearch 请求,按照 NoSQL 方式处理
  209. query := l7.ParseElasticsearch(r.Payload)
  210. if c.AppInfo.AppName != "" {
  211. klog.Debugf("[%s] ->>>>> Elasticsearch -> %s:%d Query:[%s]", c.AppInfo.AppName, sn, sport, query)
  212. }
  213. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  214. if conn == nil {
  215. conn = &ActiveConnection{
  216. Dest: r.ComponentDAddr,
  217. ActualDest: r.ComponentDAddr,
  218. Timestamp: timestamp,
  219. }
  220. }
  221. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemElasticsearch, method, query, r, conn.Src, conn.ActualDest)
  222. } else {
  223. // 普通 HTTP 请求
  224. apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
  225. }
  226. c.SendEvent(apmTrace, r.TraceId)
  227. }
  228. } else {
  229. // Elasticsearch 检测未启用,使用普通 HTTP 处理
  230. method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
  231. apmTrace, err := c.getOrInitTrace(r.TraceId)
  232. if err == nil {
  233. apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
  234. c.SendEvent(apmTrace, r.TraceId)
  235. }
  236. }
  237. }
  238. //return nil
  239. }
  240. /**
  241. * gRPC
  242. */
  243. if r.Protocol == l7.ProtocolGrpc {
  244. klog.Infoln("conn == nil r.Protocol == l7.ProtocolGrpc")
  245. klog.Infoln("enter the l7.ProtocolGrpc")
  246. if c.l7Attach && c.valuableTrace(r.TraceId) {
  247. apmTrace, err := c.getOrInitTrace(r.TraceId)
  248. if err == nil {
  249. apmTrace.GrpcClientTraceQueryEvent(r)
  250. c.SendEvent(apmTrace, r.TraceId)
  251. }
  252. }
  253. }
  254. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  255. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  256. if conn == nil {
  257. conn = &ActiveConnection{
  258. Dest: r.ComponentDAddr,
  259. ActualDest: r.ComponentDAddr,
  260. Timestamp: timestamp,
  261. }
  262. //return nil
  263. }
  264. if timestamp != 0 && conn.Timestamp != timestamp {
  265. //if r.Protocol == l7.ProtocolGrpc {
  266. // klog.Infoln("timestamp != 0 && conn.Timestamp != timestamp r.Protocol == l7.ProtocolGrpc")
  267. // klog.Infoln("enter the l7.ProtocolGrpc")
  268. // if c.l7Attach && c.valuableTrace(r.TraceId) {
  269. // apmTrace, err := c.getOrInitTrace(r.TraceId)
  270. // if err == nil {
  271. // apmTrace.GrpcClientTraceQueryEvent(r)
  272. // c.SendEvent(apmTrace, r.TraceId)
  273. // }
  274. // }
  275. //}
  276. return nil
  277. }
  278. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  279. //trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  280. switch r.Protocol {
  281. /**
  282. * HTTP
  283. */
  284. case l7.ProtocolHTTP:
  285. if c.AppInfo.AppName != "" {
  286. klog.Debugf("[%s] ->>>>> curl -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, r.Payload)
  287. }
  288. stats.observe(r.Status.Http(), "", r.Duration)
  289. /**
  290. * HTTP2
  291. */
  292. case l7.ProtocolHTTP2:
  293. if conn.http2Parser == nil {
  294. conn.http2Parser = l7.NewHttp2Parser()
  295. }
  296. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  297. for _, req := range requests {
  298. stats.observe(req.Status.Http(), "", req.Duration)
  299. //trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  300. }
  301. /**
  302. * PostgreSQL
  303. */
  304. case l7.ProtocolPostgres:
  305. if r.Method != l7.MethodStatementClose {
  306. stats.observe(r.Status.String(), "", r.Duration)
  307. }
  308. //if conn.postgresParser == nil {
  309. // conn.postgresParser = l7.NewPostgresParser()
  310. //}
  311. //query := conn.postgresParser.Parse(r.Payload)
  312. //trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  313. if c.l7Attach && c.valuableTrace(r.TraceId) {
  314. if conn.postgresParser == nil {
  315. conn.postgresParser = l7.NewPostgresParser()
  316. }
  317. query := conn.postgresParser.Parse(r.Payload)
  318. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  319. if c.AppInfo.AppName != "" {
  320. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  321. }
  322. //apmTrace, ok := c.getTrace(r.TraceId)
  323. apmTrace, err := c.getOrInitTrace(r.TraceId)
  324. //fmt.Println("mysql r.TraceId:", r.TraceId)
  325. //fmt.Println("ok:", ok)
  326. //fmt.Println("traceMap:", len(c.traceMap))
  327. if err == nil {
  328. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  329. //apmTrace.PostGreSqlTraceQueryEvent(query, r, conn.ActualDest)
  330. apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemPostgreSQL, query, r, conn.ActualDest)
  331. c.SendEvent(apmTrace, r.TraceId)
  332. }
  333. }
  334. /**
  335. * Mysql
  336. */
  337. case l7.ProtocolMysql:
  338. if r.Method != l7.MethodStatementClose {
  339. stats.observe(r.Status.String(), "", r.Duration)
  340. }
  341. if c.l7Attach && c.valuableTrace(r.TraceId) {
  342. if conn.mysqlParser == nil {
  343. conn.mysqlParser = l7.NewMysqlParser()
  344. }
  345. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  346. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  347. if c.AppInfo.AppName != "" {
  348. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  349. }
  350. //apmTrace, ok := c.getTrace(r.TraceId)
  351. apmTrace, err := c.getOrInitTrace(r.TraceId)
  352. //fmt.Println("mysql r.TraceId:", r.TraceId)
  353. //fmt.Println("ok:", ok)
  354. //fmt.Println("traceMap:", len(c.traceMap))
  355. if err == nil {
  356. dbSystem := semconv.DBSystemMySQL
  357. // 根据端口白名单确定协议类型
  358. l7Type := flags.GetProtocolByPort(uint16(conn.ActualDest.Port()))
  359. if l7Type == l7.ProtocolMariaDB {
  360. dbSystem = semconv.DBSystemMariaDB
  361. }
  362. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  363. //apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
  364. apmTrace.SQLTraceQueryEvent(l7Type, dbSystem, query, r, conn.ActualDest)
  365. c.SendEvent(apmTrace, r.TraceId)
  366. }
  367. }
  368. /**
  369. * DM (达梦数据库)
  370. */
  371. case l7.ProtocolDM:
  372. //统计dm的query次数
  373. stats.observe(r.Status.String(), "", r.Duration)
  374. //是否发送数据
  375. if c.l7Attach && c.valuableTrace(r.TraceId) {
  376. if conn.dmParser == nil {
  377. conn.dmParser = l7.NewDmParser()
  378. }
  379. query := conn.dmParser.Parse(r.Payload, r.StatementId)
  380. if c.AppInfo.AppName != "" {
  381. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  382. }
  383. apmTrace, err := c.getOrInitTrace(r.TraceId)
  384. if err == nil {
  385. //apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest)
  386. apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemDaMengDB, query, r, conn.ActualDest)
  387. c.SendEvent(apmTrace, r.TraceId)
  388. }
  389. }
  390. /**
  391. * Memcached
  392. */
  393. case l7.ProtocolMemcached:
  394. stats.observe(r.Status.String(), "", r.Duration)
  395. if c.l7Attach && c.valuableTrace(r.TraceId) {
  396. cmd, items := l7.ParseMemcached(r.Payload)
  397. if c.AppInfo.AppName != "" {
  398. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd+" "+strings.Join(items, " "))
  399. }
  400. apmTrace, err := c.getOrInitTrace(r.TraceId)
  401. if err == nil {
  402. statement := cmd
  403. if len(items) == 1 {
  404. statement += " " + items[0]
  405. } else if len(items) > 1 {
  406. joined := fmt.Sprintf("[%s]", strings.Join(items, " "))
  407. statement += " " + joined
  408. }
  409. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMemcached, cmd, statement, r, conn.Src, conn.ActualDest)
  410. c.SendEvent(apmTrace, r.TraceId)
  411. }
  412. }
  413. /**
  414. * Redis
  415. */
  416. case l7.ProtocolRedis:
  417. stats.observe(r.Status.String(), "", r.Duration)
  418. if c.l7Attach && c.valuableTrace(r.TraceId) {
  419. cmd, args := l7.ParseRedis(r.Payload)
  420. //fmt.Println("cmd", cmd)
  421. //fmt.Println("args", args)
  422. //apmTrace, ok := c.getTrace(r.TraceId)
  423. if c.AppInfo.AppName != "" {
  424. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd)
  425. }
  426. apmTrace, err := c.getOrInitTrace(r.TraceId)
  427. if err == nil {
  428. statement := cmd
  429. if args != "" {
  430. statement += " " + args
  431. }
  432. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemRedis, cmd, statement, r, conn.Src, conn.ActualDest)
  433. c.SendEvent(apmTrace, r.TraceId)
  434. }
  435. }
  436. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  437. /**
  438. * gRPC
  439. */
  440. case l7.ProtocolGrpc:
  441. klog.Debugln("enter the l7.ProtocolGrpc")
  442. stats.observe(r.Status.String(), "", r.Duration)
  443. if c.l7Attach && c.valuableTrace(r.TraceId) {
  444. apmTrace, err := c.getOrInitTrace(r.TraceId)
  445. if err == nil {
  446. apmTrace.GrpcClientTraceQueryEvent(r)
  447. c.SendEvent(apmTrace, r.TraceId)
  448. }
  449. }
  450. /**
  451. * MongoDB
  452. */
  453. case l7.ProtocolMongo:
  454. stats.observe(r.Status.String(), "", r.Duration)
  455. if c.l7Attach && c.valuableTrace(r.TraceId) {
  456. query := l7.ParseMongo(r.Payload)
  457. if c.AppInfo.AppName != "" {
  458. klog.Debugf("[%s] ->>>>> MongoDB -> %s SQL:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
  459. }
  460. apmTrace, err := c.getOrInitTrace(r.TraceId)
  461. if err == nil {
  462. // MongoDB query 格式通常是 JSON,如 {"insert":"users"} 或 {"find":"users","filter":{...}}
  463. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMongoDB, "", query, r, conn.Src, conn.ActualDest)
  464. c.SendEvent(apmTrace, r.TraceId)
  465. }
  466. }
  467. /**
  468. * Cassandra
  469. */
  470. case l7.ProtocolCassandra:
  471. stats.observe(r.Status.String(), "", r.Duration)
  472. if c.l7Attach && c.valuableTrace(r.TraceId) {
  473. if conn.cassandraParser == nil {
  474. conn.cassandraParser = l7.NewCassandraParser()
  475. }
  476. var query string
  477. query = string(r.Payload)
  478. //query := conn.cassandraParser.Parse(r.Payload)
  479. if c.AppInfo.AppName != "" {
  480. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  481. }
  482. apmTrace, err := c.getOrInitTrace(r.TraceId)
  483. if err == nil {
  484. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemCassandra, "", query, r, conn.Src, conn.ActualDest)
  485. c.SendEvent(apmTrace, r.TraceId)
  486. }
  487. }
  488. /**
  489. * Kafka
  490. */
  491. case l7.ProtocolKafka:
  492. stats.observe(r.Status.String(), "", r.Duration)
  493. if c.l7Attach && c.valuableTrace(r.TraceId) {
  494. if c.AppInfo.AppName != "" {
  495. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, r.DestAddrString)
  496. }
  497. apmTrace, err := c.getOrInitTrace(r.TraceId)
  498. if err == nil {
  499. apmTrace.MQTraceQueryEvent(r.Protocol, semconv.MessagingKafkaClientID("kafka"), "", "", r, conn.Src, conn.ActualDest)
  500. c.SendEvent(apmTrace, r.TraceId)
  501. }
  502. }
  503. /**
  504. * RabbitMQ / NATS
  505. */
  506. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  507. stats.observe(r.Status.String(), r.Method.String(), 0)
  508. if c.l7Attach && c.valuableTrace(r.TraceId) {
  509. }
  510. /**
  511. * Dubbo2
  512. */
  513. case l7.ProtocolDubbo2:
  514. stats.observe(r.Status.String(), "", r.Duration)
  515. if c.l7Attach && c.valuableTrace(r.TraceId) {
  516. }
  517. }
  518. return nil
  519. }
  520. func (c *Container) buildIDs(pid uint32) bool {
  521. c.lock.Lock()
  522. defer c.lock.Unlock()
  523. p := c.processes[pid]
  524. if p != nil {
  525. p.cmdline = string(proc.GetRealCmdline(pid))
  526. }
  527. var sns []string
  528. var sport uint16
  529. for address, val := range c.getListens() {
  530. if val == 1 {
  531. ip := address.IP()
  532. if ip.Is4() && !ip.IsLoopback() {
  533. // 获取端口号
  534. sport = address.Port()
  535. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  536. ////c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  537. //c.AppInfo.Sn = ip.String()
  538. //c.AppInfo.Sport = int(port)
  539. //strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip.String(), port))
  540. //fmt.Println(port)
  541. ////os.Exit(1)
  542. //c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  543. //c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  544. ////c.AppInfo.InstanceId = c.instanceID.IntVal
  545. //strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid))))
  546. //c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  547. //c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  548. //return true
  549. }
  550. }
  551. }
  552. if len(sns) > 0 {
  553. //c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  554. snsStr := strings.Join(sns, ",")
  555. c.AppInfo.Sn = snsStr
  556. c.AppInfo.Sport = int(sport)
  557. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  558. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  559. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  560. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  561. // c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  562. // c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  563. return true
  564. }
  565. return false
  566. }
  567. func (c *Container) ReBuildIds(pid uint32) {
  568. c.lock.Lock()
  569. defer c.lock.Unlock()
  570. var sns []string
  571. var sport uint16
  572. for address, val := range c.getListens() {
  573. if val == 1 {
  574. ip := address.IP()
  575. if ip.Is4() && !ip.IsLoopback() {
  576. // 获取端口号
  577. sport = address.Port()
  578. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  579. }
  580. }
  581. }
  582. if len(sns) > 0 {
  583. snsStr := strings.Join(sns, ",")
  584. c.AppInfo.Sn = snsStr
  585. c.AppInfo.Sport = int(sport)
  586. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  587. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  588. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  589. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  590. strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", utils.GetHostIP(), c.AppInfo.AppIdHash.IntVal))
  591. c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  592. c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  593. }
  594. }
  595. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  596. c.lock.Lock()
  597. defer c.lock.Unlock()
  598. // get the associated uprobe
  599. uprobe, err := c.GetUprobe(event, tracer)
  600. if err != nil {
  601. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  602. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  603. return
  604. }
  605. if event.TraceId <= 0 {
  606. //fmt.Println("StackProcess TraceId id 0")
  607. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  608. return
  609. }
  610. // fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  611. stackFun := ebpftracer.StackFunEvent{}
  612. stackFun.Uprobe = &uprobe
  613. stackFun.StackEvent = event
  614. apmTrace, ok := c.getTrace(event.TraceId)
  615. if ok {
  616. apmTrace.FunAdd(stackFun)
  617. }
  618. }
  619. func byteExtractString(nameString [100]byte) string {
  620. n := bytes.IndexFunc(nameString[:], func(r rune) bool {
  621. return r == 0 || r < 32 || r > 126 // 截取到第一个零值或非打印字符
  622. })
  623. if n == -1 {
  624. n = len(nameString) // 没找到零值或非打印字符,使用数组长度
  625. }
  626. return string(nameString[:n])
  627. }
  628. func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  629. c.lock.Lock()
  630. defer c.lock.Unlock()
  631. // get the associated uprobe
  632. switch event.Location {
  633. case 0: // ret
  634. Funcname := ""
  635. if event.Type != uint64(CodeTypeJava) {
  636. uprobe, err := c.GetUprobe(event, tracer)
  637. if err != nil {
  638. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  639. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  640. return
  641. }
  642. Funcname = uprobe.Funcname
  643. } else {
  644. ClassName := byteExtractString(event.ClassName)
  645. MethedName := byteExtractString(event.MethedName)
  646. Funcname = ClassName + "." + MethedName
  647. }
  648. if event.TraceId <= 0 {
  649. //fmt.Println("StackProcess TraceId id 0")
  650. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  651. return
  652. }
  653. //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  654. apmTrace, err := c.getOrInitTrace(event.TraceId)
  655. if err == nil {
  656. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  657. duration := event.TimeNsEnd - event.TimeNsStart
  658. apmTrace.FuncTraceQuery(Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
  659. c.SendEvent(apmTrace, event.TraceId)
  660. }
  661. }
  662. }
  663. // ResolveAddress returns the symbol(s) and offset of the given address.
  664. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  665. if addr == 0 {
  666. // err = errors.Wrapf(SymbolNotFoundError, "0")
  667. return
  668. }
  669. // symbols, _, err := e.Symbols()
  670. if err != nil {
  671. return
  672. }
  673. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  674. if idx == 0 {
  675. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  676. return
  677. }
  678. // why diff symbol may contains the same addr?
  679. sym := symbols[idx-1]
  680. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  681. syms = append(syms, symbols[i])
  682. }
  683. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  684. syms = append(syms, symbols[i])
  685. }
  686. return syms, uint(addr - sym.Value), nil
  687. }
  688. type MemoryMap struct {
  689. Start, End uint64
  690. }
  691. // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
  692. func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
  693. file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
  694. if err != nil {
  695. return nil, err
  696. }
  697. defer file.Close()
  698. scanner := bufio.NewScanner(file)
  699. if scanner.Scan() {
  700. fields := strings.Fields(scanner.Text())
  701. addresses := strings.Split(fields[0], "-")
  702. if len(addresses) != 2 {
  703. return nil, errors.New("unexpected format in /proc/<pid>/maps")
  704. }
  705. start, err := strconv.ParseUint(addresses[0], 16, 64)
  706. if err != nil {
  707. return nil, err
  708. }
  709. end, err := strconv.ParseUint(addresses[1], 16, 64)
  710. if err != nil {
  711. return nil, err
  712. }
  713. return &MemoryMap{
  714. Start: start,
  715. End: end,
  716. }, nil
  717. }
  718. if err := scanner.Err(); err != nil {
  719. return nil, err
  720. }
  721. return nil, errors.New("empty /proc/<pid>/maps")
  722. }
  723. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  724. //fmt.Println("GetUprobe entory:")
  725. memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
  726. Address := event.Ip - memoryMap.Start
  727. // fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
  728. for _, fun := range c.UprobesMap {
  729. funAddress := fun.Address + fun.AbsOffset
  730. // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
  731. if funAddress == Address {
  732. // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x ---- %s--%x\n", memoryMap.Start, event.Ip, fun.Funcname, fun.Address)
  733. return fun, nil
  734. }
  735. }
  736. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  737. if err != nil {
  738. return
  739. }
  740. for _, sym := range syms {
  741. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  742. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
  743. if ok {
  744. return uprobe, nil
  745. }
  746. }
  747. err = errors.New("uprobe not found")
  748. return
  749. }
  750. func (c *Container) GetAppInfo() AppInfo {
  751. return c.AppInfo
  752. }
  753. // 可注入前置
  754. func (c *Container) checkEventReady() bool {
  755. c.lock.Lock()
  756. defer c.lock.Unlock()
  757. return c.l7EventReady
  758. }
  759. func (c *Container) eventReady() {
  760. c.lock.Lock()
  761. defer c.lock.Unlock()
  762. c.l7EventReady = true
  763. }
  764. // uprobe前置
  765. func (c *Container) Isl7AttachSuccess() bool {
  766. c.lock.Lock()
  767. defer c.lock.Unlock()
  768. return c.l7Attach
  769. }
  770. func (c *Container) l7AttachSuccess() {
  771. c.lock.Lock()
  772. defer c.lock.Unlock()
  773. c.l7Attach = true
  774. }
  775. func (c *Container) ctrlStack(r *Registry, pid uint32) {
  776. resp, err := c.GetCodeSetting(r)
  777. if err != nil {
  778. klog.WithField("pid", pid).WithError(err).Error("[ctrlStack] GetCodeSetting failed.")
  779. return
  780. }
  781. if resp.BlackWhiteSettings.CollectStack == OPEN_STACK {
  782. // 有黑白名单规则 &&
  783. // 之前有注入 先卸载再注入
  784. // 之前没注入 直接注入
  785. // 没有有黑白名单 直接卸载
  786. if c.hasStackRule(resp) {
  787. if c.stackRuleUpdate(resp) {
  788. // 重新注入
  789. err = c.DetachStack(pid, APP_UNINSTALL)
  790. if err != nil {
  791. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  792. }
  793. }
  794. klog.WithField("pid", pid).Infoln("[ctrlStack] Attach app stack.")
  795. c.saveWhiteStackSettingInfo(resp)
  796. err = c.AttachStack(r.tracer, pid)
  797. if err != nil {
  798. c.AppInfo.SetAppStackError()
  799. klog.WithField("pid", pid).WithError(err).Errorf("[ctrlStack][end] Failed attach stack trace!")
  800. }
  801. } else {
  802. if c.noOrigRule() {
  803. return
  804. }
  805. c.saveWhiteStackSettingInfo(resp)
  806. // 关闭堆栈
  807. err = c.DetachStack(pid, APP_UNINSTALL)
  808. if err != nil {
  809. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  810. }
  811. }
  812. } else {
  813. if c.noOrigRule() {
  814. return
  815. }
  816. c.saveWhiteStackSettingInfo(resp)
  817. // 关闭堆栈
  818. err = c.DetachStack(pid, APP_UNINSTALL)
  819. if err != nil {
  820. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  821. }
  822. }
  823. }
  824. func (c *Container) verifyAttachConditions(r *Registry, pid uint32) (bool, int) {
  825. p := c.processes[pid]
  826. if p != nil && c.checkEventReady() {
  827. codeType := c.GetCodeTypeFromCache(pid)
  828. if codeType.IsUnknownCode() {
  829. klog.WithField("pid", pid).Debug("[verify] unknown language.")
  830. return false, 0
  831. }
  832. cmdline := p.GetCmdline()
  833. if len(cmdline) == 0 {
  834. return false, 0
  835. }
  836. //whiteListByCode := r.getWhiteListByCodeType(codeType)
  837. whiteListByCode := r.getWhiteListAll()
  838. //klog.WithField("pid", pid).WithField("codeType", codeType.String()).
  839. // Infof("[verify] white list %v", utils.ToString(whiteListByCode))
  840. // 当前语言的白名单规则
  841. for _, setting := range whiteListByCode {
  842. ruleVal := setting.Filters
  843. if ruleVal == "" {
  844. continue
  845. }
  846. // 判断规则
  847. if strings.Contains(cmdline, ruleVal) {
  848. //if !codeType.IsJvmCode() {
  849. // klog.WithField("pid", pid).Warning("[verify] This agent version only supports JVM applications.")
  850. // return false, 0
  851. //}
  852. c.WhiteSettingInfo.AppName = setting.AppName
  853. c.WhiteSettingInfo.Filters = setting.Filters
  854. klog.WithField("pid", pid).
  855. WithField("codeType", codeType.String()).
  856. WithField("ruleVal", ruleVal).
  857. WithField("cmdline", cmdline).
  858. //WithField("stack", setting.OpenStack).
  859. WithField("white list", utils.ToString(whiteListByCode)).
  860. Infoln("[verify] check successful.")
  861. return true, 0
  862. }
  863. }
  864. }
  865. return false, 0
  866. }
  867. // 1.卸载入口
  868. func (c *Container) Detach(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) {
  869. c.lock.Lock()
  870. defer c.lock.Unlock()
  871. if p := c.processes[pid]; p != nil {
  872. err := c.DetachUprobes(tracer, pid, detachType)
  873. if err != nil {
  874. klog.WithError(err).Errorln("DetachUprobes Error.")
  875. }
  876. err = c.DetachStack(pid, detachType)
  877. if err != nil {
  878. klog.WithError(err).Errorln("DetachStack Error.")
  879. }
  880. // 关闭7层监控
  881. c.l7Attach = false
  882. // 变更应用状态
  883. if err != nil {
  884. detachType = detachType.Error()
  885. }
  886. c.AppInfo.SetAppStatus(detachType)
  887. }
  888. }
  889. // 1.1卸载uprobe
  890. func (c *Container) DetachUprobes(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) error {
  891. // close uprobe
  892. if p := c.processes[pid]; p != nil {
  893. for _, u := range p.uprobes {
  894. err := u.Close()
  895. if err != nil {
  896. return err
  897. }
  898. }
  899. p.uprobes = []link.Link{}
  900. switch detachType {
  901. case APP_UNINSTALL, APP_FUSE:
  902. codeType := c.GetCodeTypeFromCache(pid)
  903. switch codeType {
  904. case CodeTypeJava:
  905. p.jvmAttachOnce = false
  906. case CodeTypeGo:
  907. p.goTlsUprobesChecked = false
  908. p.openSslUprobesChecked = false
  909. default:
  910. }
  911. case APP_UPROBE_ERROR:
  912. klog.Infof("[DetachUprobes] ERROR_DETACH for pid %d", pid)
  913. default:
  914. }
  915. //delete the proc info form proc_info_map(for kernel) when the uprobe detached
  916. if err := tracer.DelKProcInfo(pid); err != nil {
  917. return fmt.Errorf("[DetachUprobes] failed to delete KProcInfo for pid %d, detach type is:%s", pid, detachType)
  918. } else {
  919. klog.Infof("[DetachUprobes] delete KProcInfo success for pid %d,detachType:%d", pid, detachType)
  920. c.AppInfo.EBPFProcInfo = nil
  921. }
  922. } else {
  923. return fmt.Errorf("[DetachUprobes] cannot find uprobe for pid %d", pid)
  924. }
  925. return nil
  926. }
  927. // 1.2卸载堆栈
  928. func (c *Container) DetachStack(pid uint32, detachType APP_TYPE) error {
  929. if p := c.processes[pid]; p != nil {
  930. var err error
  931. codeType := c.GetCodeTypeFromCache(pid)
  932. switch codeType {
  933. // 1.2.1 卸载 jvm堆栈
  934. case CodeTypeJava:
  935. err = c.detachJvmStack(pid)
  936. default:
  937. err = p.closeStackUprobes()
  938. }
  939. if err != nil {
  940. klog.WithError(err).Errorln("[detachStack] failed to detach stack")
  941. return err
  942. }
  943. p.stackAttachOnce = false
  944. } else {
  945. return fmt.Errorf("[DetachStack] cannot find uprobe for pid %d", pid)
  946. }
  947. return nil
  948. }
  949. // 1.2.1 卸载 jvm堆栈
  950. func (c *Container) detachJvmStack(pid uint32) error {
  951. if p := c.processes[pid]; p != nil {
  952. //if p.stackStatus.IsStackUprobesSuccess() || len(p.stackUprobes) > 0 {
  953. //}
  954. // 卸载 JavaAgent
  955. var err error
  956. if p.stackStatus.IsJattachSuccess() {
  957. // 卸载堆栈probes
  958. err = p.closeStackUprobes()
  959. if err != nil {
  960. klog.WithError(err).Errorf("[detachJvmStack] closeStackUprobes")
  961. }
  962. err = p.uninstallJavaAgent()
  963. if err != nil {
  964. klog.WithError(err).Errorf("[detachJvmStack] uninstallJavaAgent")
  965. }
  966. }
  967. return err
  968. }
  969. return nil
  970. }
  971. func (c *Container) getRootfs() string {
  972. if c.metadata != nil && c.metadata.rootfs != "" {
  973. return path.Join(*flags.HostDirPathPrefix, c.metadata.rootfs)
  974. }
  975. return ""
  976. }
  977. func (c *Container) BuildActiveApps(runtimeApps map[uint32]AppStatusInfo, pid uint32) {
  978. if c == nil {
  979. //klog.WithField("pid", pid).Warningln("[BuildActiveApps] container_apm is nil.")
  980. return
  981. }
  982. if c.AppInfo.AppName == "" {
  983. return
  984. }
  985. klog.WithField("pid", pid).WithField("appname", c.AppInfo.AppName).Infof("[BuildActiveApps] container %s is running.", c.AppInfo.AppName)
  986. detail := AppStatusInfo{
  987. Pid: pid,
  988. ProcName: c.containerName,
  989. AppName: c.AppInfo.AppName,
  990. Language: c.AppInfo.CodeType.String(),
  991. LanguageVersion: c.AppInfo.Version,
  992. AppID: c.AppInfo.AppIdHash.IntVal,
  993. AgentID: c.AppInfo.AgentId,
  994. InstanceID: c.AppInfo.InstanceIdHash.IntVal,
  995. Sn: c.AppInfo.Sn,
  996. Sport: c.AppInfo.Sport,
  997. RegisterAt: time.Unix(c.AppInfo.RegisterAt, 0).Format("060102 15:04:05"),
  998. PreStatus: c.AppInfo.PreStatus,
  999. Status: c.AppInfo.Status,
  1000. Rule: c.WhiteSettingInfo.Filters,
  1001. Container: string(c.id),
  1002. }
  1003. detail.Rule = fmt.Sprintf("%s|%d", c.WhiteSettingInfo.Filters, c.WhiteSettingInfo.WhiteStackSettingInfo.OpenStack)
  1004. if c.AppInfo.UpdateAt != 0 {
  1005. detail.UpdateAt = time.Unix(c.AppInfo.UpdateAt, 0).Format("060102 15:04:05")
  1006. }
  1007. p := c.processes[pid]
  1008. if p != nil {
  1009. detail.StackStatus = p.stackStatus.String()
  1010. v := 0
  1011. if !p.versionFailed {
  1012. v = 1
  1013. }
  1014. detail.StackStatus += fmt.Sprintf("V=%d", v)
  1015. }
  1016. runtimeApps[pid] = detail
  1017. }
  1018. func (c *Container) AgentCtrl(r *Registry, pid uint32) {
  1019. if c == nil {
  1020. //klog.WithField("pid", pid).Warningln("[AgentCtrl] cannot find container.")
  1021. return
  1022. }
  1023. var err error
  1024. verifyAttachConditions, _ := c.verifyAttachConditions(r, pid)
  1025. // fusing UNINSTALL
  1026. if r.isFusing && c.Isl7AttachSuccess() {
  1027. c.Detach(r.tracer, pid, APP_FUSE)
  1028. klog.WithField("pid", pid).Infoln("[AgentCtrl] fusing")
  1029. return
  1030. }
  1031. // verify UNINSTALL
  1032. if !verifyAttachConditions && c.Isl7AttachSuccess() {
  1033. c.Detach(r.tracer, pid, APP_UNINSTALL)
  1034. klog.WithField("pid", pid).Infoln("[AgentCtrl] rule uninstall.")
  1035. return
  1036. }
  1037. if verifyAttachConditions {
  1038. err = c.RegisterAppInfo(r, pid)
  1039. if err != nil {
  1040. klog.WithError(err).Errorf("[AgentCtrl] Failed registerAppInfo.")
  1041. return
  1042. }
  1043. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes.")
  1044. err = c.AttachUprobes(r.tracer, pid, "Agentctrl")
  1045. if err != nil {
  1046. klog.WithField("pid", pid).WithError(err).Errorf("[AgentCtrl] Failed attach uprobes error!")
  1047. return
  1048. } else {
  1049. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes success!")
  1050. }
  1051. // 堆栈控制
  1052. c.ctrlStack(r, pid)
  1053. }
  1054. }