container_apm.go 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244
  1. package containers
  2. import (
  3. "bufio"
  4. "bytes"
  5. "debug/elf"
  6. "fmt"
  7. "net"
  8. "os"
  9. "path"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/cilium/ebpf/link"
  15. "github.com/coroot/coroot-node-agent/flags"
  16. "github.com/coroot/coroot-node-agent/ebpftracer"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  18. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  19. "github.com/coroot/coroot-node-agent/proc"
  20. "github.com/coroot/coroot-node-agent/tracing"
  21. "github.com/coroot/coroot-node-agent/utils"
  22. . "github.com/coroot/coroot-node-agent/utils/modelse"
  23. "github.com/pkg/errors"
  24. klog "github.com/sirupsen/logrus"
  25. "go.opentelemetry.io/otel/attribute"
  26. semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
  27. "inet.af/netaddr"
  28. )
  29. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  30. trace, ok := c.traceMap[traceId]
  31. return trace, ok
  32. }
  33. func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
  34. c.traceMap[traceId] = trace
  35. }
  36. // 查询或创建trace信息
  37. func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
  38. trace, ok := c.getTrace(traceId)
  39. if !ok {
  40. //new trace
  41. trace = tracing.NewTraceFromEvent(string(c.id))
  42. if trace == nil {
  43. // tracer 未初始化,返回错误
  44. return nil, fmt.Errorf("tracer is not initialized, cannot create trace")
  45. }
  46. //create TraceMap
  47. c.createTraceMap(traceId, trace)
  48. //create ParentSpan
  49. trace.CreateRootSpan(traceId)
  50. }
  51. return trace, nil
  52. }
  53. // getGrpcServerNetworkInfo 获取 gRPC server 的网络信息
  54. // 返回: IP地址, 端口号, 容器ID
  55. func (c *Container) getGrpcServerNetworkInfo() (string, uint16, string) {
  56. containerID := ""
  57. if c.cgroup != nil {
  58. containerID = c.cgroup.ContainerId
  59. }
  60. ipAddr := ""
  61. ifaces, err := net.Interfaces()
  62. if err == nil {
  63. for _, iface := range ifaces {
  64. if iface.Name == "eth0" {
  65. addrs, err := iface.Addrs()
  66. if err == nil {
  67. for _, addr := range addrs {
  68. var ipnet *net.IPNet
  69. switch v := addr.(type) {
  70. case *net.IPNet:
  71. ipnet = v
  72. case *net.IPAddr:
  73. ipnet = &net.IPNet{IP: v.IP, Mask: v.IP.DefaultMask()}
  74. }
  75. if ipnet != nil && ipnet.IP.To4() != nil {
  76. ipAddr = ipnet.IP.String()
  77. break
  78. }
  79. }
  80. }
  81. break
  82. }
  83. }
  84. }
  85. klog.Debugf("grpc server ip %s", ipAddr)
  86. // 本地端口尝试从AppInfo.Sport获取
  87. port := c.AppInfo.Sport
  88. klog.Debugf("grpc server port %d", port)
  89. return ipAddr, uint16(port), containerID
  90. }
  91. // Deprecated: InitTrace not used
  92. //func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  93. // method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  94. // ip, err := netaddr.ParseIP(hostIp)
  95. // if err != nil {
  96. // //fmt.Println("host ip error")
  97. // hostIp = "127.0.0.1"
  98. // }
  99. // addr := netaddr.IPPortFrom(ip, port)
  100. // trace := tracing.NewTrace(string(c.id), addr)
  101. // if trace == nil {
  102. // return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  103. // }
  104. // c.traceMap[traceId] = trace
  105. //
  106. // trace.TraceStart(method, path, r.Status, r.Duration)
  107. // return nil
  108. //}
  109. // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
  110. // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
  111. func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
  112. if t.AllEventReady(traceID) {
  113. t.SendEvent()
  114. klog.Debugf("SendEvent %d", traceID)
  115. //fmt.Println(t.GetSpan())
  116. //fmt.Println("===============")
  117. delete(c.traceMap, traceID)
  118. }
  119. }
  120. func (c *Container) valuableTrace(traceID uint64) bool {
  121. return traceID != 0
  122. }
  123. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  124. c.lock.Lock()
  125. defer c.lock.Unlock()
  126. if r.Protocol == l7.ProtocolDNS {
  127. ip2fqdn, _type, fqdn, ttl, ips := c.onDNSRequest(r)
  128. if c.l7Attach && c.valuableTrace(r.TraceId) {
  129. apmTrace, err := c.getOrInitTrace(r.TraceId)
  130. if err == nil {
  131. apmTrace.DNSTraceQueryEvent(r, _type, fqdn, ttl, ips)
  132. c.SendEvent(apmTrace, r.TraceId)
  133. }
  134. }
  135. return ip2fqdn
  136. }
  137. //if !c.valuableTrace(r.TraceId) {
  138. // return nil
  139. //}
  140. //klog.Infof("====ProtocolTrace+++++ start==== %d %d", pid, r.TraceId)
  141. // klog.Infof("====ProtocolTrace===== start==== %d %d", r.Protocol == l7.ProtocolTrace, c.l7Attach)
  142. if c.l7Attach && c.valuableTrace(r.TraceId) {
  143. // klog.Infof("====ProtocolTrace---- start==== %d %d", pid, r.TraceId)
  144. if r.TraceStart == TRACE_STATUS {
  145. // klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId)
  146. trace, err := c.getOrInitTrace(r.TraceId)
  147. if c.AppInfo.AppName != "" {
  148. klog.Debugf("->>> [%s] -> payload:[%s]", c.AppInfo.AppName, r.Payload)
  149. }
  150. if err == nil {
  151. switch r.Protocol {
  152. case l7.ProtocolHTTP:
  153. // cwother
  154. if !c.AppInfo.CodeType.IsGoCode() {
  155. r.SysvcFrom = l7.ParseHttpSysvcFrom(r.Payload)
  156. }
  157. method, requestURI, sn, sport, userAgent := l7.ParseHttpHostWithUserAgent(r.Payload, r.IsTls)
  158. // userAgent 可以在这里使用,例如传递给 trace.TraceStartEvent
  159. ip, _ := netaddr.ParseIP(sn)
  160. //codeType := c.GetCodeTypeFromCache(pid)
  161. container_id := ""
  162. if c.cgroup != nil {
  163. container_id = c.cgroup.ContainerId
  164. }
  165. trace.TraceStartEvent(method, requestURI, sn, userAgent, sport, r, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo(), container_id)
  166. c.SendEvent(trace, r.TraceId)
  167. case l7.ProtocolGrpc:
  168. // gRPC
  169. ipAddr, port, containerID := c.getGrpcServerNetworkInfo()
  170. trace.GrpcServerTraceStartEvent(ipAddr, port, r, c.GetAppInfo(), containerID)
  171. c.SendEvent(trace, r.TraceId)
  172. case l7.ProtocolKafka:
  173. var sn string
  174. var sport uint16
  175. var container_id string
  176. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  177. if conn != nil {
  178. sn = conn.ActualDest.IP().String()
  179. sport = conn.ActualDest.Port()
  180. }
  181. if c.cgroup != nil {
  182. container_id = c.cgroup.ContainerId
  183. }
  184. // MQ
  185. trace.MQConsumerTraceStartEvent(sn, sport, r, c.GetAppInfo(), container_id)
  186. c.SendEvent(trace, r.TraceId)
  187. }
  188. }
  189. return nil
  190. }
  191. if r.TraceEnd == TRACE_STATUS {
  192. klog.Debugf("====ProtocolTrace end==== %d %d", pid, r.TraceId)
  193. trace, err := c.getOrInitTrace(r.TraceId)
  194. if err == nil {
  195. trace.TraceEndEvent(r)
  196. c.SendEvent(trace, r.TraceId)
  197. }
  198. return nil
  199. }
  200. }
  201. /**
  202. * HTTP
  203. */
  204. if r.Protocol == l7.ProtocolHTTP {
  205. if c.l7Attach && c.valuableTrace(r.TraceId) {
  206. // 检查是否启用 Elasticsearch 检测
  207. if *flags.EnableElasticsearchDetection {
  208. // 解析 User-Agent 以检测 Elasticsearch 请求
  209. method, requestURI, sn, sport, userAgent := l7.ParseHttpHostWithUserAgent(r.Payload, r.IsTls)
  210. // 检查是否是 Elasticsearch 请求(通过 User-Agent)
  211. isElasticsearch := strings.Contains(strings.ToLower(userAgent), "elasticsearch")
  212. apmTrace, err := c.getOrInitTrace(r.TraceId)
  213. if err == nil {
  214. if isElasticsearch {
  215. r.Protocol = l7.ProtocolES
  216. // Elasticsearch 请求,按照 NoSQL 方式处理
  217. query := l7.ParseElasticsearch(r.Payload)
  218. if c.AppInfo.AppName != "" {
  219. klog.Debugf("[%s] ->>>>> Elasticsearch -> %s:%d Query:[%s]", c.AppInfo.AppName, sn, sport, query)
  220. }
  221. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  222. if conn == nil {
  223. conn = &ActiveConnection{
  224. Dest: r.ComponentDAddr,
  225. ActualDest: r.ComponentDAddr,
  226. Timestamp: timestamp,
  227. }
  228. }
  229. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemElasticsearch, method, query, r, conn.Src, conn.ActualDest)
  230. } else {
  231. // 普通 HTTP 请求
  232. apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
  233. }
  234. c.SendEvent(apmTrace, r.TraceId)
  235. }
  236. } else {
  237. // Elasticsearch 检测未启用,使用普通 HTTP 处理
  238. method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
  239. apmTrace, err := c.getOrInitTrace(r.TraceId)
  240. if err == nil {
  241. apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
  242. c.SendEvent(apmTrace, r.TraceId)
  243. }
  244. }
  245. }
  246. //return nil
  247. }
  248. /**
  249. * gRPC
  250. */
  251. if r.Protocol == l7.ProtocolGrpc {
  252. klog.Infoln("conn == nil r.Protocol == l7.ProtocolGrpc")
  253. klog.Infoln("enter the l7.ProtocolGrpc")
  254. if c.l7Attach && c.valuableTrace(r.TraceId) {
  255. apmTrace, err := c.getOrInitTrace(r.TraceId)
  256. if err == nil {
  257. apmTrace.GrpcClientTraceQueryEvent(r)
  258. c.SendEvent(apmTrace, r.TraceId)
  259. }
  260. }
  261. }
  262. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  263. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  264. if conn == nil {
  265. conn = &ActiveConnection{
  266. Dest: r.ComponentDAddr,
  267. ActualDest: r.ComponentDAddr,
  268. Timestamp: timestamp,
  269. }
  270. //return nil
  271. }
  272. if timestamp != 0 && conn.Timestamp != timestamp {
  273. //if r.Protocol == l7.ProtocolGrpc {
  274. // klog.Infoln("timestamp != 0 && conn.Timestamp != timestamp r.Protocol == l7.ProtocolGrpc")
  275. // klog.Infoln("enter the l7.ProtocolGrpc")
  276. // if c.l7Attach && c.valuableTrace(r.TraceId) {
  277. // apmTrace, err := c.getOrInitTrace(r.TraceId)
  278. // if err == nil {
  279. // apmTrace.GrpcClientTraceQueryEvent(r)
  280. // c.SendEvent(apmTrace, r.TraceId)
  281. // }
  282. // }
  283. //}
  284. return nil
  285. }
  286. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  287. //trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  288. switch r.Protocol {
  289. /**
  290. * HTTP
  291. */
  292. case l7.ProtocolHTTP:
  293. if c.AppInfo.AppName != "" {
  294. klog.Debugf("[%s] ->>>>> curl -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, r.Payload)
  295. }
  296. stats.observe(r.Status.Http(), "", r.Duration)
  297. /**
  298. * HTTP2
  299. */
  300. case l7.ProtocolHTTP2:
  301. if conn.http2Parser == nil {
  302. conn.http2Parser = l7.NewHttp2Parser()
  303. }
  304. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  305. for _, req := range requests {
  306. stats.observe(req.Status.Http(), "", req.Duration)
  307. //trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  308. }
  309. /**
  310. * PostgreSQL
  311. */
  312. case l7.ProtocolPostgres:
  313. if r.Method != l7.MethodStatementClose {
  314. stats.observe(r.Status.String(), "", r.Duration)
  315. }
  316. //if conn.postgresParser == nil {
  317. // conn.postgresParser = l7.NewPostgresParser()
  318. //}
  319. //query := conn.postgresParser.Parse(r.Payload)
  320. //trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  321. if c.l7Attach && c.valuableTrace(r.TraceId) {
  322. if conn.postgresParser == nil {
  323. conn.postgresParser = l7.NewPostgresParser()
  324. }
  325. query := conn.postgresParser.Parse(r.Payload)
  326. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  327. if c.AppInfo.AppName != "" {
  328. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  329. }
  330. //apmTrace, ok := c.getTrace(r.TraceId)
  331. apmTrace, err := c.getOrInitTrace(r.TraceId)
  332. //fmt.Println("mysql r.TraceId:", r.TraceId)
  333. //fmt.Println("ok:", ok)
  334. //fmt.Println("traceMap:", len(c.traceMap))
  335. if err == nil {
  336. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  337. //apmTrace.PostGreSqlTraceQueryEvent(query, r, conn.ActualDest)
  338. apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemPostgreSQL, query, r, conn.ActualDest)
  339. c.SendEvent(apmTrace, r.TraceId)
  340. }
  341. }
  342. /**
  343. * Mysql
  344. */
  345. case l7.ProtocolMysql:
  346. //fmt.Println("mysql r.TraceId:", r.TraceId)
  347. if r.Method != l7.MethodStatementClose {
  348. stats.observe(r.Status.String(), "", r.Duration)
  349. }
  350. if c.l7Attach && c.valuableTrace(r.TraceId) {
  351. if conn.mysqlParser == nil {
  352. conn.mysqlParser = l7.NewMysqlParser()
  353. }
  354. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  355. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  356. if c.AppInfo.AppName != "" {
  357. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  358. }
  359. //apmTrace, ok := c.getTrace(r.TraceId)
  360. apmTrace, err := c.getOrInitTrace(r.TraceId)
  361. //fmt.Println("mysql r.TraceId:", r.TraceId)
  362. //fmt.Println("ok:", ok)
  363. //fmt.Println("traceMap:", len(c.traceMap))
  364. if err == nil {
  365. dbSystem := semconv.DBSystemMySQL
  366. // 根据端口白名单确定协议类型
  367. l7Type := flags.GetProtocolByPort(uint16(conn.ActualDest.Port()))
  368. if l7Type == l7.ProtocolMariaDB {
  369. dbSystem = semconv.DBSystemMariaDB
  370. } else if l7Type == l7.ProtocolTiDB {
  371. dbSystem = semconv.DBSystemTiDB
  372. }
  373. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  374. //apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
  375. apmTrace.SQLTraceQueryEvent(l7Type, dbSystem, query, r, conn.ActualDest)
  376. c.SendEvent(apmTrace, r.TraceId)
  377. }
  378. }
  379. /**
  380. * DM (达梦数据库)
  381. */
  382. case l7.ProtocolDM:
  383. //统计dm的query次数
  384. stats.observe(r.Status.String(), "", r.Duration)
  385. //是否发送数据
  386. if c.l7Attach && c.valuableTrace(r.TraceId) {
  387. if conn.dmParser == nil {
  388. conn.dmParser = l7.NewDmParser()
  389. }
  390. query := conn.dmParser.Parse(r.Payload, r.StatementId)
  391. if c.AppInfo.AppName != "" {
  392. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  393. }
  394. apmTrace, err := c.getOrInitTrace(r.TraceId)
  395. if err == nil {
  396. //apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest)
  397. apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemDaMengDB, query, r, conn.ActualDest)
  398. c.SendEvent(apmTrace, r.TraceId)
  399. }
  400. }
  401. /**
  402. * Memcached
  403. */
  404. case l7.ProtocolMemcached:
  405. stats.observe(r.Status.String(), "", r.Duration)
  406. if c.l7Attach && c.valuableTrace(r.TraceId) {
  407. cmd, items := l7.ParseMemcached(r.Payload)
  408. if c.AppInfo.AppName != "" {
  409. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd+" "+strings.Join(items, " "))
  410. }
  411. apmTrace, err := c.getOrInitTrace(r.TraceId)
  412. if err == nil {
  413. statement := cmd
  414. if len(items) == 1 {
  415. statement += " " + items[0]
  416. } else if len(items) > 1 {
  417. joined := fmt.Sprintf("[%s]", strings.Join(items, " "))
  418. statement += " " + joined
  419. }
  420. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMemcached, cmd, statement, r, conn.Src, conn.ActualDest)
  421. c.SendEvent(apmTrace, r.TraceId)
  422. }
  423. }
  424. /**
  425. * Redis
  426. */
  427. case l7.ProtocolRedis:
  428. stats.observe(r.Status.String(), "", r.Duration)
  429. if c.l7Attach && c.valuableTrace(r.TraceId) {
  430. cmd, args := l7.ParseRedis(r.Payload)
  431. //fmt.Println("cmd", cmd)
  432. //fmt.Println("args", args)
  433. //apmTrace, ok := c.getTrace(r.TraceId)
  434. if c.AppInfo.AppName != "" {
  435. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd)
  436. }
  437. apmTrace, err := c.getOrInitTrace(r.TraceId)
  438. if err == nil {
  439. statement := cmd
  440. if args != "" {
  441. statement += " " + args
  442. }
  443. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemRedis, cmd, statement, r, conn.Src, conn.ActualDest)
  444. c.SendEvent(apmTrace, r.TraceId)
  445. }
  446. }
  447. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  448. /**
  449. * gRPC
  450. */
  451. case l7.ProtocolGrpc:
  452. klog.Debugln("enter the l7.ProtocolGrpc")
  453. stats.observe(r.Status.String(), "", r.Duration)
  454. if c.l7Attach && c.valuableTrace(r.TraceId) {
  455. apmTrace, err := c.getOrInitTrace(r.TraceId)
  456. if err == nil {
  457. apmTrace.GrpcClientTraceQueryEvent(r)
  458. c.SendEvent(apmTrace, r.TraceId)
  459. }
  460. }
  461. /**
  462. * MongoDB
  463. */
  464. case l7.ProtocolMongo:
  465. stats.observe(r.Status.String(), "", r.Duration)
  466. if c.l7Attach && c.valuableTrace(r.TraceId) {
  467. query := l7.ParseMongo(r.Payload)
  468. if l7.IsMongoHeartbeat(query) {
  469. break
  470. }
  471. if c.AppInfo.AppName != "" {
  472. klog.Debugf("[%s] ->>>>> MongoDB -> %s SQL:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
  473. }
  474. apmTrace, err := c.getOrInitTrace(r.TraceId)
  475. if err == nil {
  476. // MongoDB query 格式通常是 JSON,如 {"insert":"users"} 或 {"find":"users","filter":{...}}
  477. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMongoDB, "", query, r, conn.Src, conn.ActualDest)
  478. c.SendEvent(apmTrace, r.TraceId)
  479. }
  480. }
  481. /**
  482. * Cassandra
  483. */
  484. case l7.ProtocolCassandra:
  485. stats.observe(r.Status.String(), "", r.Duration)
  486. if c.l7Attach && c.valuableTrace(r.TraceId) {
  487. if conn.cassandraParser == nil {
  488. conn.cassandraParser = l7.NewCassandraParser()
  489. }
  490. var query string
  491. query = string(r.Payload)
  492. //query := conn.cassandraParser.Parse(r.Payload)
  493. if c.AppInfo.AppName != "" {
  494. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  495. }
  496. apmTrace, err := c.getOrInitTrace(r.TraceId)
  497. if err == nil {
  498. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemCassandra, "", query, r, conn.Src, conn.ActualDest)
  499. c.SendEvent(apmTrace, r.TraceId)
  500. }
  501. }
  502. /**
  503. * BuntDB
  504. */
  505. case l7.ProtocolBuntDB:
  506. stats.observe(r.Status.String(), "", r.Duration)
  507. if c.l7Attach && c.valuableTrace(r.TraceId) {
  508. query := string(r.Payload)
  509. if c.AppInfo.AppName != "" {
  510. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  511. }
  512. apmTrace, err := c.getOrInitTrace(r.TraceId)
  513. if err == nil {
  514. // BuntDB 是键值数据库,使用 NoSQLTraceQueryEvent,operation 从 payload 中提取(如 "Set: key_name")
  515. operation := ""
  516. statement := query
  517. dbSystem := attribute.String("db.system", "buntdb")
  518. defaultLocalAddr := netaddr.IPPortFrom(netaddr.MustParseIP("127.0.0.1"), 0)
  519. apmTrace.NoSQLTraceQueryEvent(r.Protocol, dbSystem, operation, statement, r, defaultLocalAddr, defaultLocalAddr)
  520. c.SendEvent(apmTrace, r.TraceId)
  521. }
  522. }
  523. /**
  524. * LevelDB
  525. */
  526. case l7.ProtocolLevelDB:
  527. stats.observe(r.Status.String(), "", r.Duration)
  528. if c.l7Attach && c.valuableTrace(r.TraceId) {
  529. query := string(r.Payload)
  530. if c.AppInfo.AppName != "" {
  531. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  532. }
  533. apmTrace, err := c.getOrInitTrace(r.TraceId)
  534. if err == nil {
  535. operation := ""
  536. statement := query
  537. dbSystem := attribute.String("db.system", "leveldb")
  538. defaultLocalAddr := netaddr.IPPortFrom(netaddr.MustParseIP("127.0.0.1"), 0)
  539. apmTrace.NoSQLTraceQueryEvent(r.Protocol, dbSystem, operation, statement, r, defaultLocalAddr, defaultLocalAddr)
  540. c.SendEvent(apmTrace, r.TraceId)
  541. }
  542. }
  543. /**
  544. * Kafka
  545. */
  546. case l7.ProtocolKafka:
  547. stats.observe(r.Status.String(), "", r.Duration)
  548. if c.l7Attach && c.valuableTrace(r.TraceId) {
  549. if c.AppInfo.AppName != "" {
  550. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, r.DestAddrString)
  551. }
  552. apmTrace, err := c.getOrInitTrace(r.TraceId)
  553. if err == nil {
  554. apmTrace.MQTraceQueryEvent(r.Protocol, semconv.MessagingKafkaClientID("kafka"), "", "", r, conn.Src, conn.ActualDest)
  555. c.SendEvent(apmTrace, r.TraceId)
  556. }
  557. }
  558. /**
  559. * RabbitMQ / NATS
  560. */
  561. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  562. stats.observe(r.Status.String(), r.Method.String(), 0)
  563. if c.l7Attach && c.valuableTrace(r.TraceId) {
  564. }
  565. /**
  566. * Dubbo2
  567. */
  568. case l7.ProtocolDubbo2:
  569. stats.observe(r.Status.String(), "", r.Duration)
  570. if c.l7Attach && c.valuableTrace(r.TraceId) {
  571. }
  572. }
  573. return nil
  574. }
  575. func (c *Container) buildIDs(pid uint32) bool {
  576. c.lock.Lock()
  577. defer c.lock.Unlock()
  578. p := c.processes[pid]
  579. if p != nil {
  580. p.cmdline = string(proc.GetRealCmdline(pid))
  581. }
  582. var sns []string
  583. var sport uint16
  584. for address, val := range c.getListens() {
  585. if val == 1 {
  586. ip := address.IP()
  587. if ip.Is4() && !ip.IsLoopback() {
  588. // 获取端口号
  589. sport = address.Port()
  590. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  591. ////c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  592. //c.AppInfo.Sn = ip.String()
  593. //c.AppInfo.Sport = int(port)
  594. //strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip.String(), port))
  595. //fmt.Println(port)
  596. ////os.Exit(1)
  597. //c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  598. //c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  599. ////c.AppInfo.InstanceId = c.instanceID.IntVal
  600. //strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid))))
  601. //c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  602. //c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  603. //return true
  604. }
  605. }
  606. }
  607. if len(sns) > 0 {
  608. //c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  609. snsStr := strings.Join(sns, ",")
  610. c.AppInfo.Sn = snsStr
  611. c.AppInfo.Sport = int(sport)
  612. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  613. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  614. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  615. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  616. // c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  617. // c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  618. return true
  619. }
  620. return false
  621. }
  622. func (c *Container) ReBuildIds(pid uint32) {
  623. c.lock.Lock()
  624. defer c.lock.Unlock()
  625. var sns []string
  626. var sport uint16
  627. for address, val := range c.getListens() {
  628. if val == 1 {
  629. ip := address.IP()
  630. if ip.Is4() && !ip.IsLoopback() {
  631. // 获取端口号
  632. sport = address.Port()
  633. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  634. }
  635. }
  636. }
  637. if len(sns) > 0 {
  638. snsStr := strings.Join(sns, ",")
  639. c.AppInfo.Sn = snsStr
  640. c.AppInfo.Sport = int(sport)
  641. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  642. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  643. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  644. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  645. strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", utils.GetHostIP(), c.AppInfo.AppIdHash.IntVal))
  646. c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  647. c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  648. }
  649. }
  650. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  651. c.lock.Lock()
  652. defer c.lock.Unlock()
  653. // get the associated uprobe
  654. uprobe, err := c.GetUprobe(event, tracer)
  655. if err != nil {
  656. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  657. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  658. return
  659. }
  660. if event.TraceId <= 0 {
  661. //fmt.Println("StackProcess TraceId id 0")
  662. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  663. return
  664. }
  665. // fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  666. stackFun := ebpftracer.StackFunEvent{}
  667. stackFun.Uprobe = &uprobe
  668. stackFun.StackEvent = event
  669. apmTrace, ok := c.getTrace(event.TraceId)
  670. if ok {
  671. apmTrace.FunAdd(stackFun)
  672. }
  673. }
  674. func byteExtractString(nameString [100]byte) string {
  675. n := bytes.IndexFunc(nameString[:], func(r rune) bool {
  676. return r == 0 || r < 32 || r > 126 // 截取到第一个零值或非打印字符
  677. })
  678. if n == -1 {
  679. n = len(nameString) // 没找到零值或非打印字符,使用数组长度
  680. }
  681. return string(nameString[:n])
  682. }
  683. func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  684. c.lock.Lock()
  685. defer c.lock.Unlock()
  686. // get the associated uprobe
  687. switch event.Location {
  688. case 0: // ret
  689. Funcname := ""
  690. if event.Type != uint64(CodeTypeJava) {
  691. uprobe, err := c.GetUprobe(event, tracer)
  692. if err != nil {
  693. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  694. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  695. return
  696. }
  697. Funcname = uprobe.Funcname
  698. } else {
  699. ClassName := byteExtractString(event.ClassName)
  700. MethedName := byteExtractString(event.MethedName)
  701. Funcname = ClassName + "." + MethedName
  702. }
  703. if event.TraceId <= 0 {
  704. //fmt.Println("StackProcess TraceId id 0")
  705. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  706. return
  707. }
  708. //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  709. apmTrace, err := c.getOrInitTrace(event.TraceId)
  710. if err == nil {
  711. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  712. duration := event.TimeNsEnd - event.TimeNsStart
  713. apmTrace.FuncTraceQuery(Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
  714. c.SendEvent(apmTrace, event.TraceId)
  715. }
  716. }
  717. }
  718. // ResolveAddress returns the symbol(s) and offset of the given address.
  719. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  720. if addr == 0 {
  721. // err = errors.Wrapf(SymbolNotFoundError, "0")
  722. return
  723. }
  724. // symbols, _, err := e.Symbols()
  725. if err != nil {
  726. return
  727. }
  728. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  729. if idx == 0 {
  730. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  731. return
  732. }
  733. // why diff symbol may contains the same addr?
  734. sym := symbols[idx-1]
  735. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  736. syms = append(syms, symbols[i])
  737. }
  738. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  739. syms = append(syms, symbols[i])
  740. }
  741. return syms, uint(addr - sym.Value), nil
  742. }
  743. type MemoryMap struct {
  744. Start, End uint64
  745. }
  746. // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
  747. func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
  748. file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
  749. if err != nil {
  750. return nil, err
  751. }
  752. defer file.Close()
  753. scanner := bufio.NewScanner(file)
  754. if scanner.Scan() {
  755. fields := strings.Fields(scanner.Text())
  756. addresses := strings.Split(fields[0], "-")
  757. if len(addresses) != 2 {
  758. return nil, errors.New("unexpected format in /proc/<pid>/maps")
  759. }
  760. start, err := strconv.ParseUint(addresses[0], 16, 64)
  761. if err != nil {
  762. return nil, err
  763. }
  764. end, err := strconv.ParseUint(addresses[1], 16, 64)
  765. if err != nil {
  766. return nil, err
  767. }
  768. return &MemoryMap{
  769. Start: start,
  770. End: end,
  771. }, nil
  772. }
  773. if err := scanner.Err(); err != nil {
  774. return nil, err
  775. }
  776. return nil, errors.New("empty /proc/<pid>/maps")
  777. }
  778. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  779. //fmt.Println("GetUprobe entory:")
  780. memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
  781. Address := event.Ip - memoryMap.Start
  782. // fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
  783. for _, fun := range c.UprobesMap {
  784. funAddress := fun.Address + fun.AbsOffset
  785. // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
  786. if funAddress == Address {
  787. // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x ---- %s--%x\n", memoryMap.Start, event.Ip, fun.Funcname, fun.Address)
  788. return fun, nil
  789. }
  790. }
  791. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  792. if err != nil {
  793. return
  794. }
  795. for _, sym := range syms {
  796. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  797. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
  798. if ok {
  799. return uprobe, nil
  800. }
  801. }
  802. err = errors.New("uprobe not found")
  803. return
  804. }
  805. func (c *Container) GetAppInfo() AppInfo {
  806. return c.AppInfo
  807. }
  808. // 可注入前置
  809. func (c *Container) checkEventReady() bool {
  810. c.lock.Lock()
  811. defer c.lock.Unlock()
  812. return c.l7EventReady
  813. }
  814. func (c *Container) eventReady() {
  815. c.lock.Lock()
  816. defer c.lock.Unlock()
  817. c.l7EventReady = true
  818. }
  819. // uprobe前置
  820. func (c *Container) Isl7AttachSuccess() bool {
  821. c.lock.Lock()
  822. defer c.lock.Unlock()
  823. return c.l7Attach
  824. }
  825. func (c *Container) l7AttachSuccess() {
  826. c.lock.Lock()
  827. defer c.lock.Unlock()
  828. c.l7Attach = true
  829. }
  830. // isProcessAttached 判断指定 pid 是否曾经被 attach 过(uprobes 或各类 attachOnce 标志)。
  831. // 用于避免对从未 attach 过的进程执行多余的 UNINSTALL。
  832. func (c *Container) isProcessAttached(pid uint32) bool {
  833. p := c.processes[pid]
  834. if p == nil {
  835. return false
  836. }
  837. return len(p.uprobes) > 0 ||
  838. p.jvmAttachOnce ||
  839. p.nodejsAttachOnce ||
  840. p.pyAttachOnce ||
  841. p.nginxAttachOnce ||
  842. p.stackAttachOnce
  843. }
  844. func (c *Container) IsLicenseFuse() bool {
  845. c.lock.Lock()
  846. defer c.lock.Unlock()
  847. return c.licenseFuse
  848. }
  849. func (c *Container) LicenseFuse() {
  850. c.lock.Lock()
  851. defer c.lock.Unlock()
  852. c.licenseFuse = true
  853. }
  854. func (c *Container) ClearLicenseFuse() {
  855. c.lock.Lock()
  856. defer c.lock.Unlock()
  857. c.licenseFuse = false
  858. }
  859. func (c *Container) ctrlStack(r *Registry, pid uint32) {
  860. resp, err := c.GetCodeSetting(r)
  861. if err != nil {
  862. klog.WithField("pid", pid).WithError(err).Error("[ctrlStack] GetCodeSetting failed.")
  863. return
  864. }
  865. if resp.BlackWhiteSettings.CollectStack == OPEN_STACK {
  866. // 有黑白名单规则 &&
  867. // 之前有注入 先卸载再注入
  868. // 之前没注入 直接注入
  869. // 没有有黑白名单 直接卸载
  870. if c.hasStackRule(resp) {
  871. if c.stackRuleUpdate(resp) {
  872. // 重新注入
  873. err = c.DetachStack(pid, APP_UNINSTALL)
  874. if err != nil {
  875. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  876. }
  877. }
  878. klog.WithField("pid", pid).Infoln("[ctrlStack] Attach app stack.")
  879. c.saveWhiteStackSettingInfo(resp)
  880. err = c.AttachStack(r.tracer, pid)
  881. if err != nil {
  882. c.AppInfo.SetAppStackError()
  883. klog.WithField("pid", pid).WithError(err).Errorf("[ctrlStack][end] Failed attach stack trace!")
  884. }
  885. } else {
  886. if c.noOrigRule() {
  887. return
  888. }
  889. c.saveWhiteStackSettingInfo(resp)
  890. // 关闭堆栈
  891. err = c.DetachStack(pid, APP_UNINSTALL)
  892. if err != nil {
  893. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  894. }
  895. }
  896. } else {
  897. if c.noOrigRule() {
  898. return
  899. }
  900. c.saveWhiteStackSettingInfo(resp)
  901. // 关闭堆栈
  902. err = c.DetachStack(pid, APP_UNINSTALL)
  903. if err != nil {
  904. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  905. }
  906. }
  907. }
  908. func (c *Container) verifyAttachConditions(r *Registry, pid uint32) (bool, int) {
  909. p := c.processes[pid]
  910. if p != nil {
  911. // 先校验 cmdline 是否匹配白名单
  912. cmdline := p.GetCmdline()
  913. if len(cmdline) == 0 {
  914. // 进程可能在 buildIDs 之后才被发现(如 nginx reload fork 的新 worker),
  915. // 此时 cmdline 尚未填充,主动读取一次。
  916. cmdline = string(proc.GetRealCmdline(pid))
  917. if len(cmdline) > 0 {
  918. p.SetCmdline(cmdline)
  919. } else {
  920. return false, 0
  921. }
  922. }
  923. //whiteListByCode := r.getWhiteListByCodeType(codeType)
  924. whiteListByCode := r.getWhiteListAll()
  925. //klog.WithField("pid", pid).WithField("codeType", codeType.String()).
  926. // Infof("[verify] white list %v", utils.ToString(whiteListByCode))
  927. // 白名单规则匹配
  928. for _, setting := range whiteListByCode {
  929. ruleVal := setting.Filters
  930. if ruleVal == "" {
  931. continue
  932. }
  933. // cmdline 匹配白名单后,再判断 codeType 是否可识别
  934. if strings.Contains(cmdline, ruleVal) {
  935. codeType := c.GetCodeTypeFromCache(pid)
  936. if codeType.IsUnknownCode() {
  937. klog.WithField("pid", pid).
  938. WithField("ruleVal", ruleVal).
  939. WithField("cmdline", cmdline).
  940. Debug("[verify] cmdline matched but language unknown, skip.")
  941. return false, 0
  942. }
  943. c.WhiteSettingInfo.AppName = setting.AppName
  944. c.WhiteSettingInfo.Filters = setting.Filters
  945. klog.WithField("pid", pid).
  946. WithField("codeType", codeType.String()).
  947. WithField("ruleVal", ruleVal).
  948. WithField("cmdline", cmdline).
  949. //WithField("stack", setting.OpenStack).
  950. WithField("white list", utils.ToString(whiteListByCode)).
  951. Infoln("[verify] check successful.")
  952. return true, 0
  953. }
  954. }
  955. }
  956. return false, 0
  957. }
  958. // 1.卸载入口
  959. func (c *Container) Detach(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) {
  960. c.lock.Lock()
  961. defer c.lock.Unlock()
  962. if p := c.processes[pid]; p != nil {
  963. err := c.DetachUprobes(tracer, pid, detachType)
  964. if err != nil {
  965. klog.WithError(err).Errorln("DetachUprobes Error.")
  966. }
  967. err = c.DetachStack(pid, detachType)
  968. if err != nil {
  969. klog.WithError(err).Errorln("DetachStack Error.")
  970. }
  971. // 关闭7层监控
  972. c.l7Attach = false
  973. // 变更应用状态
  974. if err != nil {
  975. detachType = detachType.Error()
  976. }
  977. c.AppInfo.SetAppStatus(detachType)
  978. }
  979. }
  980. // 1.1卸载uprobe
  981. func (c *Container) DetachUprobes(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) error {
  982. // close uprobe
  983. if p := c.processes[pid]; p != nil {
  984. for _, u := range p.uprobes {
  985. err := u.Close()
  986. if err != nil {
  987. return err
  988. }
  989. }
  990. p.uprobes = []link.Link{}
  991. switch detachType {
  992. case APP_UNINSTALL, APP_FUSE, APP_LICENSE_FUSE:
  993. codeType := c.GetCodeTypeFromCache(pid)
  994. switch codeType {
  995. case CodeTypeJava:
  996. p.jvmAttachOnce = false
  997. case CodeTypeGo:
  998. p.goTlsUprobesChecked = false
  999. p.openSslUprobesChecked = false
  1000. default:
  1001. }
  1002. case APP_UPROBE_ERROR:
  1003. klog.Infof("[DetachUprobes] ERROR_DETACH for pid %d", pid)
  1004. default:
  1005. }
  1006. //delete the proc info form proc_info_map(for kernel) when the uprobe detached
  1007. if err := tracer.DelKProcInfo(pid); err != nil {
  1008. return fmt.Errorf("[DetachUprobes] failed to delete KProcInfo for pid %d, detach type is:%s", pid, detachType)
  1009. } else {
  1010. klog.Infof("[DetachUprobes] delete KProcInfo success for pid %d,detachType:%s", pid, detachType.String())
  1011. c.AppInfo.EBPFProcInfo = nil
  1012. }
  1013. if p.l4HeaderCgroup != "" {
  1014. if err := tracer.ReleaseL4HeaderCgroup(p.l4HeaderCgroup); err != nil {
  1015. return fmt.Errorf("[DetachUprobes] failed to release l4-header cgroup for pid %d: %w", pid, err)
  1016. }
  1017. p.l4HeaderCgroup = ""
  1018. }
  1019. } else {
  1020. return fmt.Errorf("[DetachUprobes] cannot find uprobe for pid %d", pid)
  1021. }
  1022. return nil
  1023. }
  1024. // 1.2卸载堆栈
  1025. func (c *Container) DetachStack(pid uint32, detachType APP_TYPE) error {
  1026. if p := c.processes[pid]; p != nil {
  1027. var err error
  1028. codeType := c.GetCodeTypeFromCache(pid)
  1029. switch codeType {
  1030. // 1.2.1 卸载 jvm堆栈
  1031. case CodeTypeJava:
  1032. err = c.detachJvmStack(pid)
  1033. default:
  1034. err = p.closeStackUprobes()
  1035. }
  1036. if err != nil {
  1037. klog.WithError(err).Errorln("[detachStack] failed to detach stack")
  1038. return err
  1039. }
  1040. p.stackAttachOnce = false
  1041. } else {
  1042. return fmt.Errorf("[DetachStack] cannot find uprobe for pid %d", pid)
  1043. }
  1044. return nil
  1045. }
  1046. // 1.2.1 卸载 jvm堆栈
  1047. func (c *Container) detachJvmStack(pid uint32) error {
  1048. if p := c.processes[pid]; p != nil {
  1049. //if p.stackStatus.IsStackUprobesSuccess() || len(p.stackUprobes) > 0 {
  1050. //}
  1051. // 卸载 JavaAgent
  1052. var err error
  1053. if p.stackStatus.IsJattachSuccess() {
  1054. // 卸载堆栈probes
  1055. err = p.closeStackUprobes()
  1056. if err != nil {
  1057. klog.WithError(err).Errorf("[detachJvmStack] closeStackUprobes")
  1058. }
  1059. err = p.uninstallJavaAgent()
  1060. if err != nil {
  1061. klog.WithError(err).Errorf("[detachJvmStack] uninstallJavaAgent")
  1062. }
  1063. }
  1064. return err
  1065. }
  1066. return nil
  1067. }
  1068. func (c *Container) getRootfs() string {
  1069. if c.metadata != nil && c.metadata.rootfs != "" {
  1070. return path.Join(*flags.HostDirPathPrefix, c.metadata.rootfs)
  1071. }
  1072. return ""
  1073. }
  1074. func (c *Container) BuildActiveApps(runtimeApps map[uint32]AppStatusInfo, pid uint32) {
  1075. if c == nil {
  1076. //klog.WithField("pid", pid).Warningln("[BuildActiveApps] container_apm is nil.")
  1077. return
  1078. }
  1079. if !c.isProcessAttached(pid) {
  1080. return
  1081. }
  1082. if c.AppInfo.AppName == "" || !c.isProcessAttached(pid) {
  1083. return
  1084. }
  1085. klog.WithField("pid", pid).WithField("appname", c.AppInfo.AppName).Infof("[BuildActiveApps] container %s is running.", c.AppInfo.AppName)
  1086. detail := AppStatusInfo{
  1087. Pid: pid,
  1088. ProcName: c.containerName,
  1089. AppName: c.AppInfo.AppName,
  1090. Language: c.AppInfo.CodeType.ServiceTypeString(),
  1091. LanguageVersion: c.AppInfo.Version,
  1092. AppID: c.AppInfo.AppIdHash.IntVal,
  1093. AgentID: c.AppInfo.AgentId,
  1094. InstanceID: c.AppInfo.InstanceIdHash.IntVal,
  1095. Sn: c.AppInfo.Sn,
  1096. Sport: c.AppInfo.Sport,
  1097. RegisterAt: time.Unix(c.AppInfo.RegisterAt, 0).Format("060102 15:04:05"),
  1098. PreStatus: c.AppInfo.PreStatus,
  1099. Status: c.AppInfo.Status,
  1100. Rule: c.WhiteSettingInfo.Filters,
  1101. Container: string(c.id),
  1102. }
  1103. detail.Rule = fmt.Sprintf("%s|%d", c.WhiteSettingInfo.Filters, c.WhiteSettingInfo.WhiteStackSettingInfo.OpenStack)
  1104. if c.AppInfo.UpdateAt != 0 {
  1105. detail.UpdateAt = time.Unix(c.AppInfo.UpdateAt, 0).Format("060102 15:04:05")
  1106. }
  1107. p := c.processes[pid]
  1108. if p != nil {
  1109. detail.StackStatus = p.stackStatus.String()
  1110. v := 0
  1111. if !p.versionFailed {
  1112. v = 1
  1113. }
  1114. detail.StackStatus += fmt.Sprintf("V=%d", v)
  1115. }
  1116. runtimeApps[pid] = detail
  1117. }
  1118. func (c *Container) AgentCtrl(r *Registry, pid uint32) {
  1119. if c == nil {
  1120. return
  1121. }
  1122. var err error
  1123. verifyAttachConditions, _ := c.verifyAttachConditions(r, pid)
  1124. // License fuse UNINSTALL(仅在开启 License 校验时)
  1125. if *flags.EnableLicenseCheck && c.IsLicenseFuse() {
  1126. if c.Isl7AttachSuccess() {
  1127. c.Detach(r.tracer, pid, APP_LICENSE_FUSE)
  1128. }
  1129. klog.WithField("pid", pid).Infoln("[AgentCtrl] License fusing.")
  1130. return
  1131. }
  1132. // fusing UNINSTALL
  1133. if r.isFusing && c.Isl7AttachSuccess() {
  1134. c.Detach(r.tracer, pid, APP_FUSE)
  1135. klog.WithField("pid", pid).Infoln("[AgentCtrl] fusing")
  1136. return
  1137. }
  1138. // verify UNINSTALL:仅当该进程曾经被 attach 过才需要卸载,
  1139. // 避免对同容器内从未 attach 的进程(如 npm start 父进程)误触 UNINSTALL。
  1140. if !verifyAttachConditions && c.Isl7AttachSuccess() && c.isProcessAttached(pid) {
  1141. c.Detach(r.tracer, pid, APP_UNINSTALL)
  1142. klog.WithField("pid", pid).Infoln("[AgentCtrl] rule uninstall.")
  1143. return
  1144. }
  1145. if verifyAttachConditions {
  1146. err = c.RegisterAppInfo(r, pid)
  1147. if err != nil {
  1148. klog.WithError(err).Errorf("[AgentCtrl] Failed registerAppInfo.")
  1149. return
  1150. }
  1151. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes.")
  1152. err = c.AttachUprobes(r.tracer, pid, "Agentctrl")
  1153. if err != nil {
  1154. klog.WithField("pid", pid).WithError(err).Errorf("[AgentCtrl] Failed attach uprobes error!")
  1155. return
  1156. } else {
  1157. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes success!")
  1158. }
  1159. // 堆栈控制
  1160. c.ctrlStack(r, pid)
  1161. }
  1162. }