container_apm.go 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076
  1. package containers
  2. import (
  3. "bufio"
  4. "bytes"
  5. "debug/elf"
  6. "fmt"
  7. "net"
  8. "os"
  9. "path"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/cilium/ebpf/link"
  15. "github.com/coroot/coroot-node-agent/flags"
  16. "github.com/coroot/coroot-node-agent/ebpftracer"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  18. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  19. "github.com/coroot/coroot-node-agent/proc"
  20. "github.com/coroot/coroot-node-agent/tracing"
  21. "github.com/coroot/coroot-node-agent/utils"
  22. . "github.com/coroot/coroot-node-agent/utils/modelse"
  23. "github.com/pkg/errors"
  24. klog "github.com/sirupsen/logrus"
  25. semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
  26. "inet.af/netaddr"
  27. )
  28. const (
  29. TRACE_STATUS = 1
  30. )
  31. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  32. trace, ok := c.traceMap[traceId]
  33. return trace, ok
  34. }
  35. func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
  36. c.traceMap[traceId] = trace
  37. }
  38. // 查询或创建trace信息
  39. func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
  40. trace, ok := c.getTrace(traceId)
  41. if !ok {
  42. //new trace
  43. trace = tracing.NewTraceFromEvent(string(c.id))
  44. //create TraceMap
  45. c.createTraceMap(traceId, trace)
  46. //create ParentSpan
  47. trace.CreateRootSpan(traceId)
  48. }
  49. return trace, nil
  50. }
  51. // getGrpcServerNetworkInfo 获取 gRPC server 的网络信息
  52. // 返回: IP地址, 端口号, 容器ID
  53. func (c *Container) getGrpcServerNetworkInfo() (string, uint16, string) {
  54. containerID := ""
  55. if c.cgroup != nil {
  56. containerID = c.cgroup.ContainerId
  57. }
  58. ipAddr := ""
  59. ifaces, err := net.Interfaces()
  60. if err == nil {
  61. for _, iface := range ifaces {
  62. if iface.Name == "eth0" {
  63. addrs, err := iface.Addrs()
  64. if err == nil {
  65. for _, addr := range addrs {
  66. var ipnet *net.IPNet
  67. switch v := addr.(type) {
  68. case *net.IPNet:
  69. ipnet = v
  70. case *net.IPAddr:
  71. ipnet = &net.IPNet{IP: v.IP, Mask: v.IP.DefaultMask()}
  72. }
  73. if ipnet != nil && ipnet.IP.To4() != nil {
  74. ipAddr = ipnet.IP.String()
  75. break
  76. }
  77. }
  78. }
  79. break
  80. }
  81. }
  82. }
  83. klog.Debugf("grpc server ip %s", ipAddr)
  84. // 本地端口尝试从AppInfo.Sport获取
  85. port := c.AppInfo.Sport
  86. klog.Debugf("grpc server port %d", port)
  87. return ipAddr, uint16(port), containerID
  88. }
  89. // Deprecated: InitTrace not used
  90. //func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  91. // method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  92. // ip, err := netaddr.ParseIP(hostIp)
  93. // if err != nil {
  94. // //fmt.Println("host ip error")
  95. // hostIp = "127.0.0.1"
  96. // }
  97. // addr := netaddr.IPPortFrom(ip, port)
  98. // trace := tracing.NewTrace(string(c.id), addr)
  99. // if trace == nil {
  100. // return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  101. // }
  102. // c.traceMap[traceId] = trace
  103. //
  104. // trace.TraceStart(method, path, r.Status, r.Duration)
  105. // return nil
  106. //}
  107. // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
  108. // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
  109. func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
  110. if t.AllEventReady(traceID) {
  111. t.SendEvent()
  112. klog.Debugf("SendEvent %d", traceID)
  113. //fmt.Println(t.GetSpan())
  114. //fmt.Println("===============")
  115. delete(c.traceMap, traceID)
  116. }
  117. }
  118. func (c *Container) valuableTrace(traceID uint64) bool {
  119. return traceID != 0
  120. }
  121. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  122. c.lock.Lock()
  123. defer c.lock.Unlock()
  124. if r.Protocol == l7.ProtocolDNS {
  125. ip2fqdn, _type, fqdn, ttl, ips := c.onDNSRequest(r)
  126. if c.l7Attach && c.valuableTrace(r.TraceId) {
  127. apmTrace, err := c.getOrInitTrace(r.TraceId)
  128. if err == nil {
  129. apmTrace.DNSTraceQueryEvent(r, _type, fqdn, ttl, ips)
  130. c.SendEvent(apmTrace, r.TraceId)
  131. }
  132. }
  133. return ip2fqdn
  134. }
  135. //if !c.valuableTrace(r.TraceId) {
  136. // return nil
  137. //}
  138. //klog.Infof("====ProtocolTrace+++++ start==== %d %d", pid, r.TraceId)
  139. // klog.Infof("====ProtocolTrace===== start==== %d %d", r.Protocol == l7.ProtocolTrace, c.l7Attach)
  140. if r.Protocol == l7.ProtocolTrace && c.l7Attach && c.valuableTrace(r.TraceId) {
  141. // klog.Infof("====ProtocolTrace---- start==== %d %d", pid, r.TraceId)
  142. if r.TraceStart == TRACE_STATUS {
  143. // klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId)
  144. trace, err := c.getOrInitTrace(r.TraceId)
  145. if c.AppInfo.AppName != "" {
  146. klog.Debugf("->>> [%s] -> payload:[%s]", c.AppInfo.AppName, r.Payload)
  147. }
  148. if err == nil {
  149. if r.TraceType == 0 {
  150. method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
  151. ip, _ := netaddr.ParseIP(sn)
  152. //codeType := c.GetCodeTypeFromCache(pid)
  153. container_id := ""
  154. if c.cgroup != nil {
  155. container_id = c.cgroup.ContainerId
  156. }
  157. trace.TraceStartEvent(method, requestURI, sn, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo(), container_id)
  158. c.SendEvent(trace, r.TraceId)
  159. } else if r.TraceType == 1 {
  160. ipAddr, port, containerID := c.getGrpcServerNetworkInfo()
  161. trace.GrpcServerTraceStartEvent(ipAddr, port, r, c.GetAppInfo(), containerID)
  162. c.SendEvent(trace, r.TraceId)
  163. // apmTrace, err := c.getOrInitTrace(r.TraceId)
  164. // if err == nil {
  165. // apmTrace.GrpcServerTraceQueryEvent(r, c.GetAppInfo())
  166. // c.SendEvent(apmTrace, r.TraceId)
  167. // }
  168. }
  169. }
  170. return nil
  171. }
  172. if r.TraceEnd == TRACE_STATUS {
  173. klog.Debugf("====ProtocolTrace end==== %d %d", pid, r.TraceId)
  174. trace, err := c.getOrInitTrace(r.TraceId)
  175. if err == nil {
  176. trace.TraceEndEvent(r)
  177. c.SendEvent(trace, r.TraceId)
  178. }
  179. return nil
  180. }
  181. }
  182. /**
  183. * HTTP
  184. */
  185. if r.Protocol == l7.ProtocolHTTP {
  186. if c.l7Attach && c.valuableTrace(r.TraceId) {
  187. method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
  188. apmTrace, err := c.getOrInitTrace(r.TraceId)
  189. //fmt.Println("ProtocolHTTP-----", r.TraceId, err)
  190. if err == nil {
  191. apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
  192. c.SendEvent(apmTrace, r.TraceId)
  193. }
  194. }
  195. //return nil
  196. }
  197. /**
  198. * gRPC
  199. */
  200. if r.Protocol == l7.ProtocolGrpc {
  201. klog.Infoln("conn == nil r.Protocol == l7.ProtocolGrpc")
  202. klog.Infoln("enter the l7.ProtocolGrpc")
  203. if c.l7Attach && c.valuableTrace(r.TraceId) {
  204. apmTrace, err := c.getOrInitTrace(r.TraceId)
  205. if err == nil {
  206. apmTrace.GrpcClientTraceQueryEvent(r)
  207. c.SendEvent(apmTrace, r.TraceId)
  208. }
  209. }
  210. }
  211. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  212. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  213. if conn == nil {
  214. conn = &ActiveConnection{
  215. Dest: r.ComponentDAddr,
  216. ActualDest: r.ComponentDAddr,
  217. Timestamp: timestamp,
  218. }
  219. //return nil
  220. }
  221. if timestamp != 0 && conn.Timestamp != timestamp {
  222. //if r.Protocol == l7.ProtocolGrpc {
  223. // klog.Infoln("timestamp != 0 && conn.Timestamp != timestamp r.Protocol == l7.ProtocolGrpc")
  224. // klog.Infoln("enter the l7.ProtocolGrpc")
  225. // if c.l7Attach && c.valuableTrace(r.TraceId) {
  226. // apmTrace, err := c.getOrInitTrace(r.TraceId)
  227. // if err == nil {
  228. // apmTrace.GrpcClientTraceQueryEvent(r)
  229. // c.SendEvent(apmTrace, r.TraceId)
  230. // }
  231. // }
  232. //}
  233. return nil
  234. }
  235. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  236. //trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  237. switch r.Protocol {
  238. /**
  239. * HTTP
  240. */
  241. case l7.ProtocolHTTP:
  242. if c.AppInfo.AppName != "" {
  243. klog.Debugf("[%s] ->>>>> curl -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, r.Payload)
  244. }
  245. stats.observe(r.Status.Http(), "", r.Duration)
  246. /**
  247. * HTTP2
  248. */
  249. case l7.ProtocolHTTP2:
  250. if conn.http2Parser == nil {
  251. conn.http2Parser = l7.NewHttp2Parser()
  252. }
  253. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  254. for _, req := range requests {
  255. stats.observe(req.Status.Http(), "", req.Duration)
  256. //trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  257. }
  258. /**
  259. * PostgreSQL
  260. */
  261. case l7.ProtocolPostgres:
  262. if r.Method != l7.MethodStatementClose {
  263. stats.observe(r.Status.String(), "", r.Duration)
  264. }
  265. //if conn.postgresParser == nil {
  266. // conn.postgresParser = l7.NewPostgresParser()
  267. //}
  268. //query := conn.postgresParser.Parse(r.Payload)
  269. //trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  270. if c.l7Attach && c.valuableTrace(r.TraceId) {
  271. if conn.postgresParser == nil {
  272. conn.postgresParser = l7.NewPostgresParser()
  273. }
  274. query := conn.postgresParser.Parse(r.Payload)
  275. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  276. if c.AppInfo.AppName != "" {
  277. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  278. }
  279. //apmTrace, ok := c.getTrace(r.TraceId)
  280. apmTrace, err := c.getOrInitTrace(r.TraceId)
  281. //fmt.Println("mysql r.TraceId:", r.TraceId)
  282. //fmt.Println("ok:", ok)
  283. //fmt.Println("traceMap:", len(c.traceMap))
  284. if err == nil {
  285. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  286. //apmTrace.PostGreSqlTraceQueryEvent(query, r, conn.ActualDest)
  287. apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemPostgreSQL, query, r, conn.ActualDest)
  288. c.SendEvent(apmTrace, r.TraceId)
  289. }
  290. }
  291. /**
  292. * Mysql
  293. */
  294. case l7.ProtocolMysql:
  295. if r.Method != l7.MethodStatementClose {
  296. stats.observe(r.Status.String(), "", r.Duration)
  297. }
  298. if c.l7Attach && c.valuableTrace(r.TraceId) {
  299. if conn.mysqlParser == nil {
  300. conn.mysqlParser = l7.NewMysqlParser()
  301. }
  302. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  303. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  304. if c.AppInfo.AppName != "" {
  305. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  306. }
  307. //apmTrace, ok := c.getTrace(r.TraceId)
  308. apmTrace, err := c.getOrInitTrace(r.TraceId)
  309. //fmt.Println("mysql r.TraceId:", r.TraceId)
  310. //fmt.Println("ok:", ok)
  311. //fmt.Println("traceMap:", len(c.traceMap))
  312. if err == nil {
  313. dbSystem := semconv.DBSystemMySQL
  314. // 根据端口白名单确定协议类型
  315. l7Type := flags.GetProtocolByPort(uint16(conn.ActualDest.Port()))
  316. if l7Type == l7.ProtocolMariaDB {
  317. dbSystem = semconv.DBSystemMariaDB
  318. }
  319. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  320. //apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
  321. apmTrace.SQLTraceQueryEvent(l7Type, dbSystem, query, r, conn.ActualDest)
  322. c.SendEvent(apmTrace, r.TraceId)
  323. }
  324. }
  325. /**
  326. * DM (达梦数据库)
  327. */
  328. case l7.ProtocolDM:
  329. //统计dm的query次数
  330. stats.observe(r.Status.String(), "", r.Duration)
  331. //是否发送数据
  332. if c.l7Attach && c.valuableTrace(r.TraceId) {
  333. if conn.dmParser == nil {
  334. conn.dmParser = l7.NewDmParser()
  335. }
  336. query := conn.dmParser.Parse(r.Payload, r.StatementId)
  337. if c.AppInfo.AppName != "" {
  338. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  339. }
  340. apmTrace, err := c.getOrInitTrace(r.TraceId)
  341. if err == nil {
  342. //apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest)
  343. apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemDaMengDB, query, r, conn.ActualDest)
  344. c.SendEvent(apmTrace, r.TraceId)
  345. }
  346. }
  347. /**
  348. * Memcached
  349. */
  350. case l7.ProtocolMemcached:
  351. stats.observe(r.Status.String(), "", r.Duration)
  352. if c.l7Attach && c.valuableTrace(r.TraceId) {
  353. cmd, items := l7.ParseMemcached(r.Payload)
  354. if c.AppInfo.AppName != "" {
  355. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd+" "+strings.Join(items, " "))
  356. }
  357. apmTrace, err := c.getOrInitTrace(r.TraceId)
  358. if err == nil {
  359. statement := cmd
  360. if len(items) == 1 {
  361. statement += " " + items[0]
  362. } else if len(items) > 1 {
  363. joined := fmt.Sprintf("[%s]", strings.Join(items, " "))
  364. statement += " " + joined
  365. }
  366. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMemcached, cmd, statement, r, conn.Src, conn.ActualDest)
  367. c.SendEvent(apmTrace, r.TraceId)
  368. }
  369. }
  370. /**
  371. * Redis
  372. */
  373. case l7.ProtocolRedis:
  374. stats.observe(r.Status.String(), "", r.Duration)
  375. if c.l7Attach && c.valuableTrace(r.TraceId) {
  376. cmd, args := l7.ParseRedis(r.Payload)
  377. //fmt.Println("cmd", cmd)
  378. //fmt.Println("args", args)
  379. //apmTrace, ok := c.getTrace(r.TraceId)
  380. if c.AppInfo.AppName != "" {
  381. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd)
  382. }
  383. apmTrace, err := c.getOrInitTrace(r.TraceId)
  384. if err == nil {
  385. statement := cmd
  386. if args != "" {
  387. statement += " " + args
  388. }
  389. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemRedis, cmd, statement, r, conn.Src, conn.ActualDest)
  390. c.SendEvent(apmTrace, r.TraceId)
  391. }
  392. }
  393. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  394. /**
  395. * gRPC
  396. */
  397. case l7.ProtocolGrpc:
  398. klog.Debugln("enter the l7.ProtocolGrpc")
  399. stats.observe(r.Status.String(), "", r.Duration)
  400. if c.l7Attach && c.valuableTrace(r.TraceId) {
  401. apmTrace, err := c.getOrInitTrace(r.TraceId)
  402. if err == nil {
  403. apmTrace.GrpcClientTraceQueryEvent(r)
  404. c.SendEvent(apmTrace, r.TraceId)
  405. }
  406. }
  407. /**
  408. * MongoDB
  409. */
  410. case l7.ProtocolMongo:
  411. stats.observe(r.Status.String(), "", r.Duration)
  412. if c.l7Attach && c.valuableTrace(r.TraceId) {
  413. query := l7.ParseMongo(r.Payload)
  414. if c.AppInfo.AppName != "" {
  415. klog.Debugf("[%s] ->>>>> MongoDB -> %s SQL:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
  416. }
  417. apmTrace, err := c.getOrInitTrace(r.TraceId)
  418. if err == nil {
  419. // MongoDB query 格式通常是 JSON,如 {"insert":"users"} 或 {"find":"users","filter":{...}}
  420. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMongoDB, "", query, r, conn.Src, conn.ActualDest)
  421. c.SendEvent(apmTrace, r.TraceId)
  422. }
  423. }
  424. /**
  425. * Cassandra
  426. */
  427. case l7.ProtocolCassandra:
  428. stats.observe(r.Status.String(), "", r.Duration)
  429. if c.l7Attach && c.valuableTrace(r.TraceId) {
  430. if conn.cassandraParser == nil {
  431. conn.cassandraParser = l7.NewCassandraParser()
  432. }
  433. var query string
  434. query = string(r.Payload)
  435. //query := conn.cassandraParser.Parse(r.Payload)
  436. if c.AppInfo.AppName != "" {
  437. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  438. }
  439. apmTrace, err := c.getOrInitTrace(r.TraceId)
  440. if err == nil {
  441. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemCassandra, "", query, r, conn.Src, conn.ActualDest)
  442. c.SendEvent(apmTrace, r.TraceId)
  443. }
  444. }
  445. /**
  446. * Kafka
  447. */
  448. case l7.ProtocolKafka:
  449. stats.observe(r.Status.String(), "", r.Duration)
  450. if c.l7Attach && c.valuableTrace(r.TraceId) {
  451. }
  452. /**
  453. * RabbitMQ / NATS
  454. */
  455. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  456. stats.observe(r.Status.String(), r.Method.String(), 0)
  457. if c.l7Attach && c.valuableTrace(r.TraceId) {
  458. }
  459. /**
  460. * Dubbo2
  461. */
  462. case l7.ProtocolDubbo2:
  463. stats.observe(r.Status.String(), "", r.Duration)
  464. if c.l7Attach && c.valuableTrace(r.TraceId) {
  465. }
  466. }
  467. return nil
  468. }
  469. func (c *Container) buildIDs(pid uint32) bool {
  470. c.lock.Lock()
  471. defer c.lock.Unlock()
  472. p := c.processes[pid]
  473. if p != nil {
  474. p.cmdline = string(proc.GetRealCmdline(pid))
  475. }
  476. var sns []string
  477. var sport uint16
  478. for address, val := range c.getListens() {
  479. if val == 1 {
  480. ip := address.IP()
  481. if ip.Is4() && !ip.IsLoopback() {
  482. // 获取端口号
  483. sport = address.Port()
  484. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  485. ////c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  486. //c.AppInfo.Sn = ip.String()
  487. //c.AppInfo.Sport = int(port)
  488. //strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip.String(), port))
  489. //fmt.Println(port)
  490. ////os.Exit(1)
  491. //c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  492. //c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  493. ////c.AppInfo.InstanceId = c.instanceID.IntVal
  494. //strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid))))
  495. //c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  496. //c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  497. //return true
  498. }
  499. }
  500. }
  501. if len(sns) > 0 {
  502. //c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  503. snsStr := strings.Join(sns, ",")
  504. c.AppInfo.Sn = snsStr
  505. c.AppInfo.Sport = int(sport)
  506. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  507. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  508. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  509. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  510. // c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  511. // c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  512. return true
  513. }
  514. return false
  515. }
  516. func (c *Container) ReBuildIds(pid uint32) {
  517. c.lock.Lock()
  518. defer c.lock.Unlock()
  519. var sns []string
  520. var sport uint16
  521. for address, val := range c.getListens() {
  522. if val == 1 {
  523. ip := address.IP()
  524. if ip.Is4() && !ip.IsLoopback() {
  525. // 获取端口号
  526. sport = address.Port()
  527. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  528. }
  529. }
  530. }
  531. if len(sns) > 0 {
  532. snsStr := strings.Join(sns, ",")
  533. c.AppInfo.Sn = snsStr
  534. c.AppInfo.Sport = int(sport)
  535. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  536. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  537. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  538. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  539. strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", utils.GetHostIP(), c.AppInfo.AppIdHash.IntVal))
  540. c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  541. c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  542. }
  543. }
  544. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  545. c.lock.Lock()
  546. defer c.lock.Unlock()
  547. // get the associated uprobe
  548. uprobe, err := c.GetUprobe(event, tracer)
  549. if err != nil {
  550. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  551. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  552. return
  553. }
  554. if event.TraceId <= 0 {
  555. //fmt.Println("StackProcess TraceId id 0")
  556. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  557. return
  558. }
  559. // fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  560. stackFun := ebpftracer.StackFunEvent{}
  561. stackFun.Uprobe = &uprobe
  562. stackFun.StackEvent = event
  563. apmTrace, ok := c.getTrace(event.TraceId)
  564. if ok {
  565. apmTrace.FunAdd(stackFun)
  566. }
  567. }
  568. func byteExtractString(nameString [100]byte) string {
  569. n := bytes.IndexFunc(nameString[:], func(r rune) bool {
  570. return r == 0 || r < 32 || r > 126 // 截取到第一个零值或非打印字符
  571. })
  572. if n == -1 {
  573. n = len(nameString) // 没找到零值或非打印字符,使用数组长度
  574. }
  575. return string(nameString[:n])
  576. }
  577. func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  578. c.lock.Lock()
  579. defer c.lock.Unlock()
  580. // get the associated uprobe
  581. switch event.Location {
  582. case 0: // ret
  583. Funcname := ""
  584. if event.Type != uint64(CodeTypeJava) {
  585. uprobe, err := c.GetUprobe(event, tracer)
  586. if err != nil {
  587. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  588. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  589. return
  590. }
  591. Funcname = uprobe.Funcname
  592. } else {
  593. ClassName := byteExtractString(event.ClassName)
  594. MethedName := byteExtractString(event.MethedName)
  595. Funcname = ClassName + "." + MethedName
  596. }
  597. if event.TraceId <= 0 {
  598. //fmt.Println("StackProcess TraceId id 0")
  599. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  600. return
  601. }
  602. //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  603. apmTrace, err := c.getOrInitTrace(event.TraceId)
  604. if err == nil {
  605. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  606. duration := event.TimeNsEnd - event.TimeNsStart
  607. apmTrace.FuncTraceQuery(Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
  608. c.SendEvent(apmTrace, event.TraceId)
  609. }
  610. }
  611. }
  612. // ResolveAddress returns the symbol(s) and offset of the given address.
  613. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  614. if addr == 0 {
  615. // err = errors.Wrapf(SymbolNotFoundError, "0")
  616. return
  617. }
  618. // symbols, _, err := e.Symbols()
  619. if err != nil {
  620. return
  621. }
  622. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  623. if idx == 0 {
  624. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  625. return
  626. }
  627. // why diff symbol may contains the same addr?
  628. sym := symbols[idx-1]
  629. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  630. syms = append(syms, symbols[i])
  631. }
  632. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  633. syms = append(syms, symbols[i])
  634. }
  635. return syms, uint(addr - sym.Value), nil
  636. }
  637. type MemoryMap struct {
  638. Start, End uint64
  639. }
  640. // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
  641. func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
  642. file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
  643. if err != nil {
  644. return nil, err
  645. }
  646. defer file.Close()
  647. scanner := bufio.NewScanner(file)
  648. if scanner.Scan() {
  649. fields := strings.Fields(scanner.Text())
  650. addresses := strings.Split(fields[0], "-")
  651. if len(addresses) != 2 {
  652. return nil, errors.New("unexpected format in /proc/<pid>/maps")
  653. }
  654. start, err := strconv.ParseUint(addresses[0], 16, 64)
  655. if err != nil {
  656. return nil, err
  657. }
  658. end, err := strconv.ParseUint(addresses[1], 16, 64)
  659. if err != nil {
  660. return nil, err
  661. }
  662. return &MemoryMap{
  663. Start: start,
  664. End: end,
  665. }, nil
  666. }
  667. if err := scanner.Err(); err != nil {
  668. return nil, err
  669. }
  670. return nil, errors.New("empty /proc/<pid>/maps")
  671. }
  672. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  673. //fmt.Println("GetUprobe entory:")
  674. memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
  675. Address := event.Ip - memoryMap.Start
  676. // fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
  677. for _, fun := range c.UprobesMap {
  678. funAddress := fun.Address + fun.AbsOffset
  679. // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
  680. if funAddress == Address {
  681. // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x ---- %s--%x\n", memoryMap.Start, event.Ip, fun.Funcname, fun.Address)
  682. return fun, nil
  683. }
  684. }
  685. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  686. if err != nil {
  687. return
  688. }
  689. for _, sym := range syms {
  690. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  691. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
  692. if ok {
  693. return uprobe, nil
  694. }
  695. }
  696. err = errors.New("uprobe not found")
  697. return
  698. }
  699. func (c *Container) GetAppInfo() AppInfo {
  700. return c.AppInfo
  701. }
  702. // 可注入前置
  703. func (c *Container) checkEventReady() bool {
  704. c.lock.Lock()
  705. defer c.lock.Unlock()
  706. return c.l7EventReady
  707. }
  708. func (c *Container) eventReady() {
  709. c.lock.Lock()
  710. defer c.lock.Unlock()
  711. c.l7EventReady = true
  712. }
  713. // uprobe前置
  714. func (c *Container) Isl7AttachSuccess() bool {
  715. c.lock.Lock()
  716. defer c.lock.Unlock()
  717. return c.l7Attach
  718. }
  719. func (c *Container) l7AttachSuccess() {
  720. c.lock.Lock()
  721. defer c.lock.Unlock()
  722. c.l7Attach = true
  723. }
  724. func (c *Container) ctrlStack(r *Registry, pid uint32) {
  725. resp, err := c.GetCodeSetting(r)
  726. if err != nil {
  727. klog.WithField("pid", pid).WithError(err).Error("[ctrlStack] GetCodeSetting failed.")
  728. return
  729. }
  730. if resp.BlackWhiteSettings.CollectStack == OPEN_STACK {
  731. // 有黑白名单规则 &&
  732. // 之前有注入 先卸载再注入
  733. // 之前没注入 直接注入
  734. // 没有有黑白名单 直接卸载
  735. if c.hasStackRule(resp) {
  736. if c.stackRuleUpdate(resp) {
  737. // 重新注入
  738. err = c.DetachStack(pid, APP_UNINSTALL)
  739. if err != nil {
  740. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  741. }
  742. }
  743. klog.WithField("pid", pid).Infoln("[ctrlStack] Attach app stack.")
  744. c.saveWhiteStackSettingInfo(resp)
  745. err = c.AttachStack(r.tracer, pid)
  746. if err != nil {
  747. c.AppInfo.SetAppStackError()
  748. klog.WithField("pid", pid).WithError(err).Errorf("[ctrlStack][end] Failed attach stack trace!")
  749. }
  750. } else {
  751. if c.noOrigRule() {
  752. return
  753. }
  754. c.saveWhiteStackSettingInfo(resp)
  755. // 关闭堆栈
  756. err = c.DetachStack(pid, APP_UNINSTALL)
  757. if err != nil {
  758. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  759. }
  760. }
  761. } else {
  762. if c.noOrigRule() {
  763. return
  764. }
  765. c.saveWhiteStackSettingInfo(resp)
  766. // 关闭堆栈
  767. err = c.DetachStack(pid, APP_UNINSTALL)
  768. if err != nil {
  769. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  770. }
  771. }
  772. }
  773. func (c *Container) verifyAttachConditions(r *Registry, pid uint32) (bool, int) {
  774. p := c.processes[pid]
  775. if p != nil && c.checkEventReady() {
  776. codeType := c.GetCodeTypeFromCache(pid)
  777. if codeType.IsUnknownCode() {
  778. klog.WithField("pid", pid).Debug("[verify] unknown language.")
  779. return false, 0
  780. }
  781. cmdline := p.GetCmdline()
  782. if len(cmdline) == 0 {
  783. return false, 0
  784. }
  785. //whiteListByCode := r.getWhiteListByCodeType(codeType)
  786. whiteListByCode := r.getWhiteListAll()
  787. //klog.WithField("pid", pid).WithField("codeType", codeType.String()).
  788. // Infof("[verify] white list %v", utils.ToString(whiteListByCode))
  789. // 当前语言的白名单规则
  790. for _, setting := range whiteListByCode {
  791. ruleVal := setting.Filters
  792. if ruleVal == "" {
  793. continue
  794. }
  795. // 判断规则
  796. if strings.Contains(cmdline, ruleVal) {
  797. //if !codeType.IsJvmCode() {
  798. // klog.WithField("pid", pid).Warning("[verify] This agent version only supports JVM applications.")
  799. // return false, 0
  800. //}
  801. c.WhiteSettingInfo.AppName = setting.AppName
  802. c.WhiteSettingInfo.Filters = setting.Filters
  803. klog.WithField("pid", pid).
  804. WithField("codeType", codeType.String()).
  805. WithField("ruleVal", ruleVal).
  806. WithField("cmdline", cmdline).
  807. //WithField("stack", setting.OpenStack).
  808. WithField("white list", utils.ToString(whiteListByCode)).
  809. Infoln("[verify] check successful.")
  810. return true, 0
  811. }
  812. }
  813. }
  814. return false, 0
  815. }
  816. // 1.卸载入口
  817. func (c *Container) Detach(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) {
  818. c.lock.Lock()
  819. defer c.lock.Unlock()
  820. if p := c.processes[pid]; p != nil {
  821. err := c.DetachUprobes(tracer, pid, detachType)
  822. if err != nil {
  823. klog.WithError(err).Errorln("DetachUprobes Error.")
  824. }
  825. err = c.DetachStack(pid, detachType)
  826. if err != nil {
  827. klog.WithError(err).Errorln("DetachStack Error.")
  828. }
  829. // 关闭7层监控
  830. c.l7Attach = false
  831. // 变更应用状态
  832. if err != nil {
  833. detachType = detachType.Error()
  834. }
  835. c.AppInfo.SetAppStatus(detachType)
  836. }
  837. }
  838. // 1.1卸载uprobe
  839. func (c *Container) DetachUprobes(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) error {
  840. // close uprobe
  841. if p := c.processes[pid]; p != nil {
  842. for _, u := range p.uprobes {
  843. err := u.Close()
  844. if err != nil {
  845. return err
  846. }
  847. }
  848. p.uprobes = []link.Link{}
  849. switch detachType {
  850. case APP_UNINSTALL, APP_FUSE:
  851. codeType := c.GetCodeTypeFromCache(pid)
  852. switch codeType {
  853. case CodeTypeJava:
  854. p.jvmAttachOnce = false
  855. case CodeTypeGo:
  856. p.goTlsUprobesChecked = false
  857. p.openSslUprobesChecked = false
  858. default:
  859. }
  860. case APP_UPROBE_ERROR:
  861. klog.Infof("[DetachUprobes] ERROR_DETACH for pid %d", pid)
  862. default:
  863. }
  864. //delete the proc info form proc_info_map(for kernel) when the uprobe detached
  865. if err := tracer.DelKProcInfo(pid); err != nil {
  866. return fmt.Errorf("[DetachUprobes] failed to delete KProcInfo for pid %d, detach type is:%s", pid, detachType)
  867. } else {
  868. klog.Infof("[DetachUprobes] delete KProcInfo success for pid %d,detachType:%d", pid, detachType)
  869. c.AppInfo.EBPFProcInfo = nil
  870. }
  871. } else {
  872. return fmt.Errorf("[DetachUprobes] cannot find uprobe for pid %d", pid)
  873. }
  874. return nil
  875. }
  876. // 1.2卸载堆栈
  877. func (c *Container) DetachStack(pid uint32, detachType APP_TYPE) error {
  878. if p := c.processes[pid]; p != nil {
  879. var err error
  880. codeType := c.GetCodeTypeFromCache(pid)
  881. switch codeType {
  882. // 1.2.1 卸载 jvm堆栈
  883. case CodeTypeJava:
  884. err = c.detachJvmStack(pid)
  885. default:
  886. err = p.closeStackUprobes()
  887. }
  888. if err != nil {
  889. klog.WithError(err).Errorln("[detachStack] failed to detach stack")
  890. return err
  891. }
  892. p.stackAttachOnce = false
  893. } else {
  894. return fmt.Errorf("[DetachStack] cannot find uprobe for pid %d", pid)
  895. }
  896. return nil
  897. }
  898. // 1.2.1 卸载 jvm堆栈
  899. func (c *Container) detachJvmStack(pid uint32) error {
  900. if p := c.processes[pid]; p != nil {
  901. //if p.stackStatus.IsStackUprobesSuccess() || len(p.stackUprobes) > 0 {
  902. //}
  903. // 卸载 JavaAgent
  904. var err error
  905. if p.stackStatus.IsJattachSuccess() {
  906. // 卸载堆栈probes
  907. err = p.closeStackUprobes()
  908. if err != nil {
  909. klog.WithError(err).Errorf("[detachJvmStack] closeStackUprobes")
  910. }
  911. err = p.uninstallJavaAgent()
  912. if err != nil {
  913. klog.WithError(err).Errorf("[detachJvmStack] uninstallJavaAgent")
  914. }
  915. }
  916. return err
  917. }
  918. return nil
  919. }
  920. func (c *Container) getRootfs() string {
  921. if c.metadata != nil && c.metadata.rootfs != "" {
  922. return path.Join(*flags.HostDirPathPrefix, c.metadata.rootfs)
  923. }
  924. return ""
  925. }
  926. func (c *Container) BuildActiveApps(runtimeApps map[uint32]AppStatusInfo, pid uint32) {
  927. if c == nil {
  928. //klog.WithField("pid", pid).Warningln("[BuildActiveApps] container_apm is nil.")
  929. return
  930. }
  931. if c.AppInfo.AppName == "" {
  932. return
  933. }
  934. klog.WithField("pid", pid).WithField("appname", c.AppInfo.AppName).Infof("[BuildActiveApps] container %s is running.", c.AppInfo.AppName)
  935. detail := AppStatusInfo{
  936. Pid: pid,
  937. ProcName: c.containerName,
  938. AppName: c.AppInfo.AppName,
  939. Language: c.AppInfo.CodeType.String(),
  940. AppID: c.AppInfo.AppIdHash.IntVal,
  941. AgentID: c.AppInfo.AgentId,
  942. InstanceID: c.AppInfo.InstanceIdHash.IntVal,
  943. Sn: c.AppInfo.Sn,
  944. Sport: c.AppInfo.Sport,
  945. RegisterAt: time.Unix(c.AppInfo.RegisterAt, 0).Format("060102 15:04:05"),
  946. PreStatus: c.AppInfo.PreStatus,
  947. Status: c.AppInfo.Status,
  948. Rule: c.WhiteSettingInfo.Filters,
  949. Container: string(c.id),
  950. }
  951. detail.Rule = fmt.Sprintf("%s|%d", c.WhiteSettingInfo.Filters, c.WhiteSettingInfo.WhiteStackSettingInfo.OpenStack)
  952. if c.AppInfo.UpdateAt != 0 {
  953. detail.UpdateAt = time.Unix(c.AppInfo.UpdateAt, 0).Format("060102 15:04:05")
  954. }
  955. p := c.processes[pid]
  956. if p != nil {
  957. detail.StackStatus = p.stackStatus.String()
  958. v := 0
  959. if !p.versionFailed {
  960. v = 1
  961. }
  962. detail.StackStatus += fmt.Sprintf("V=%d", v)
  963. }
  964. runtimeApps[pid] = detail
  965. }
  966. func (c *Container) AgentCtrl(r *Registry, pid uint32) {
  967. if c == nil {
  968. //klog.WithField("pid", pid).Warningln("[AgentCtrl] cannot find container.")
  969. return
  970. }
  971. var err error
  972. verifyAttachConditions, _ := c.verifyAttachConditions(r, pid)
  973. // fusing UNINSTALL
  974. if r.isFusing && c.Isl7AttachSuccess() {
  975. c.Detach(r.tracer, pid, APP_FUSE)
  976. klog.WithField("pid", pid).Infoln("[AgentCtrl] fusing")
  977. return
  978. }
  979. // verify UNINSTALL
  980. if !verifyAttachConditions && c.Isl7AttachSuccess() {
  981. c.Detach(r.tracer, pid, APP_UNINSTALL)
  982. klog.WithField("pid", pid).Infoln("[AgentCtrl] rule uninstall.")
  983. return
  984. }
  985. if verifyAttachConditions {
  986. err = c.RegisterAppInfo(r, pid)
  987. if err != nil {
  988. klog.WithError(err).Errorf("[AgentCtrl] Failed registerAppInfo.")
  989. return
  990. }
  991. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes.")
  992. err = c.AttachUprobes(r.tracer, pid, "Agentctrl")
  993. if err != nil {
  994. klog.WithField("pid", pid).WithError(err).Errorf("[AgentCtrl] Failed attach uprobes error!")
  995. return
  996. } else {
  997. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes success!")
  998. }
  999. // 堆栈控制
  1000. c.ctrlStack(r, pid)
  1001. }
  1002. }