container_apm.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077
  1. package containers
  2. import (
  3. "bufio"
  4. "bytes"
  5. "debug/elf"
  6. "fmt"
  7. "net"
  8. "os"
  9. "path"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/cilium/ebpf/link"
  15. "github.com/coroot/coroot-node-agent/flags"
  16. "github.com/coroot/coroot-node-agent/ebpftracer"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  18. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  19. "github.com/coroot/coroot-node-agent/proc"
  20. "github.com/coroot/coroot-node-agent/tracing"
  21. "github.com/coroot/coroot-node-agent/utils"
  22. . "github.com/coroot/coroot-node-agent/utils/modelse"
  23. "github.com/pkg/errors"
  24. klog "github.com/sirupsen/logrus"
  25. semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
  26. "inet.af/netaddr"
  27. )
  28. const (
  29. TRACE_STATUS = 1
  30. )
  31. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  32. trace, ok := c.traceMap[traceId]
  33. return trace, ok
  34. }
  35. func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
  36. c.traceMap[traceId] = trace
  37. }
  38. // 查询或创建trace信息
  39. func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
  40. trace, ok := c.getTrace(traceId)
  41. if !ok {
  42. //new trace
  43. trace = tracing.NewTraceFromEvent(string(c.id))
  44. //create TraceMap
  45. c.createTraceMap(traceId, trace)
  46. //create ParentSpan
  47. trace.CreateRootSpan(traceId)
  48. }
  49. return trace, nil
  50. }
  51. // getGrpcServerNetworkInfo 获取 gRPC server 的网络信息
  52. // 返回: IP地址, 端口号, 容器ID
  53. func (c *Container) getGrpcServerNetworkInfo() (string, uint16, string) {
  54. containerID := ""
  55. if c.cgroup != nil {
  56. containerID = c.cgroup.ContainerId
  57. }
  58. ipAddr := ""
  59. ifaces, err := net.Interfaces()
  60. if err == nil {
  61. for _, iface := range ifaces {
  62. if iface.Name == "eth0" {
  63. addrs, err := iface.Addrs()
  64. if err == nil {
  65. for _, addr := range addrs {
  66. var ipnet *net.IPNet
  67. switch v := addr.(type) {
  68. case *net.IPNet:
  69. ipnet = v
  70. case *net.IPAddr:
  71. ipnet = &net.IPNet{IP: v.IP, Mask: v.IP.DefaultMask()}
  72. }
  73. if ipnet != nil && ipnet.IP.To4() != nil {
  74. ipAddr = ipnet.IP.String()
  75. break
  76. }
  77. }
  78. }
  79. break
  80. }
  81. }
  82. }
  83. klog.Debugf("grpc server ip %s", ipAddr)
  84. // 本地端口尝试从AppInfo.Sport获取
  85. port := c.AppInfo.Sport
  86. klog.Debugf("grpc server port %d", port)
  87. return ipAddr, uint16(port), containerID
  88. }
  89. // Deprecated: InitTrace not used
  90. //func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  91. // method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  92. // ip, err := netaddr.ParseIP(hostIp)
  93. // if err != nil {
  94. // //fmt.Println("host ip error")
  95. // hostIp = "127.0.0.1"
  96. // }
  97. // addr := netaddr.IPPortFrom(ip, port)
  98. // trace := tracing.NewTrace(string(c.id), addr)
  99. // if trace == nil {
  100. // return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  101. // }
  102. // c.traceMap[traceId] = trace
  103. //
  104. // trace.TraceStart(method, path, r.Status, r.Duration)
  105. // return nil
  106. //}
  107. // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
  108. // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
  109. func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
  110. if t.AllEventReady(traceID) {
  111. t.SendEvent()
  112. klog.Debugf("SendEvent %d", traceID)
  113. //fmt.Println(t.GetSpan())
  114. //fmt.Println("===============")
  115. delete(c.traceMap, traceID)
  116. }
  117. }
  118. func (c *Container) valuableTrace(traceID uint64) bool {
  119. return traceID != 0
  120. }
  121. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  122. c.lock.Lock()
  123. defer c.lock.Unlock()
  124. if r.Protocol == l7.ProtocolDNS {
  125. ip2fqdn, _type, fqdn, ttl, ips := c.onDNSRequest(r)
  126. if c.l7Attach && c.valuableTrace(r.TraceId) {
  127. apmTrace, err := c.getOrInitTrace(r.TraceId)
  128. if err == nil {
  129. apmTrace.DNSTraceQueryEvent(r, _type, fqdn, ttl, ips)
  130. c.SendEvent(apmTrace, r.TraceId)
  131. }
  132. }
  133. return ip2fqdn
  134. }
  135. //if !c.valuableTrace(r.TraceId) {
  136. // return nil
  137. //}
  138. //klog.Infof("====ProtocolTrace+++++ start==== %d %d", pid, r.TraceId)
  139. // klog.Infof("====ProtocolTrace===== start==== %d %d", r.Protocol == l7.ProtocolTrace, c.l7Attach)
  140. if r.Protocol == l7.ProtocolTrace && c.l7Attach && c.valuableTrace(r.TraceId) {
  141. // klog.Infof("====ProtocolTrace---- start==== %d %d", pid, r.TraceId)
  142. if r.TraceStart == TRACE_STATUS {
  143. // klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId)
  144. trace, err := c.getOrInitTrace(r.TraceId)
  145. if c.AppInfo.AppName != "" {
  146. klog.Debugf("->>> [%s] -> payload:[%s]", c.AppInfo.AppName, r.Payload)
  147. }
  148. if err == nil {
  149. if r.TraceType == 0 {
  150. method, requestURI, sn, sport, userAgent := l7.ParseHttpHostWithUserAgent(r.Payload, r.IsTls)
  151. // userAgent 可以在这里使用,例如传递给 trace.TraceStartEvent
  152. ip, _ := netaddr.ParseIP(sn)
  153. //codeType := c.GetCodeTypeFromCache(pid)
  154. container_id := ""
  155. if c.cgroup != nil {
  156. container_id = c.cgroup.ContainerId
  157. }
  158. trace.TraceStartEvent(method, requestURI, sn, userAgent, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo(), container_id)
  159. c.SendEvent(trace, r.TraceId)
  160. } else if r.TraceType == 1 {
  161. ipAddr, port, containerID := c.getGrpcServerNetworkInfo()
  162. trace.GrpcServerTraceStartEvent(ipAddr, port, r, c.GetAppInfo(), containerID)
  163. c.SendEvent(trace, r.TraceId)
  164. // apmTrace, err := c.getOrInitTrace(r.TraceId)
  165. // if err == nil {
  166. // apmTrace.GrpcServerTraceQueryEvent(r, c.GetAppInfo())
  167. // c.SendEvent(apmTrace, r.TraceId)
  168. // }
  169. }
  170. }
  171. return nil
  172. }
  173. if r.TraceEnd == TRACE_STATUS {
  174. klog.Debugf("====ProtocolTrace end==== %d %d", pid, r.TraceId)
  175. trace, err := c.getOrInitTrace(r.TraceId)
  176. if err == nil {
  177. trace.TraceEndEvent(r)
  178. c.SendEvent(trace, r.TraceId)
  179. }
  180. return nil
  181. }
  182. }
  183. /**
  184. * HTTP
  185. */
  186. if r.Protocol == l7.ProtocolHTTP {
  187. if c.l7Attach && c.valuableTrace(r.TraceId) {
  188. method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
  189. apmTrace, err := c.getOrInitTrace(r.TraceId)
  190. //fmt.Println("ProtocolHTTP-----", r.TraceId, err)
  191. if err == nil {
  192. apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
  193. c.SendEvent(apmTrace, r.TraceId)
  194. }
  195. }
  196. //return nil
  197. }
  198. /**
  199. * gRPC
  200. */
  201. if r.Protocol == l7.ProtocolGrpc {
  202. klog.Infoln("conn == nil r.Protocol == l7.ProtocolGrpc")
  203. klog.Infoln("enter the l7.ProtocolGrpc")
  204. if c.l7Attach && c.valuableTrace(r.TraceId) {
  205. apmTrace, err := c.getOrInitTrace(r.TraceId)
  206. if err == nil {
  207. apmTrace.GrpcClientTraceQueryEvent(r)
  208. c.SendEvent(apmTrace, r.TraceId)
  209. }
  210. }
  211. }
  212. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  213. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  214. if conn == nil {
  215. conn = &ActiveConnection{
  216. Dest: r.ComponentDAddr,
  217. ActualDest: r.ComponentDAddr,
  218. Timestamp: timestamp,
  219. }
  220. //return nil
  221. }
  222. if timestamp != 0 && conn.Timestamp != timestamp {
  223. //if r.Protocol == l7.ProtocolGrpc {
  224. // klog.Infoln("timestamp != 0 && conn.Timestamp != timestamp r.Protocol == l7.ProtocolGrpc")
  225. // klog.Infoln("enter the l7.ProtocolGrpc")
  226. // if c.l7Attach && c.valuableTrace(r.TraceId) {
  227. // apmTrace, err := c.getOrInitTrace(r.TraceId)
  228. // if err == nil {
  229. // apmTrace.GrpcClientTraceQueryEvent(r)
  230. // c.SendEvent(apmTrace, r.TraceId)
  231. // }
  232. // }
  233. //}
  234. return nil
  235. }
  236. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  237. //trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  238. switch r.Protocol {
  239. /**
  240. * HTTP
  241. */
  242. case l7.ProtocolHTTP:
  243. if c.AppInfo.AppName != "" {
  244. klog.Debugf("[%s] ->>>>> curl -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, r.Payload)
  245. }
  246. stats.observe(r.Status.Http(), "", r.Duration)
  247. /**
  248. * HTTP2
  249. */
  250. case l7.ProtocolHTTP2:
  251. if conn.http2Parser == nil {
  252. conn.http2Parser = l7.NewHttp2Parser()
  253. }
  254. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  255. for _, req := range requests {
  256. stats.observe(req.Status.Http(), "", req.Duration)
  257. //trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  258. }
  259. /**
  260. * PostgreSQL
  261. */
  262. case l7.ProtocolPostgres:
  263. if r.Method != l7.MethodStatementClose {
  264. stats.observe(r.Status.String(), "", r.Duration)
  265. }
  266. //if conn.postgresParser == nil {
  267. // conn.postgresParser = l7.NewPostgresParser()
  268. //}
  269. //query := conn.postgresParser.Parse(r.Payload)
  270. //trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  271. if c.l7Attach && c.valuableTrace(r.TraceId) {
  272. if conn.postgresParser == nil {
  273. conn.postgresParser = l7.NewPostgresParser()
  274. }
  275. query := conn.postgresParser.Parse(r.Payload)
  276. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  277. if c.AppInfo.AppName != "" {
  278. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  279. }
  280. //apmTrace, ok := c.getTrace(r.TraceId)
  281. apmTrace, err := c.getOrInitTrace(r.TraceId)
  282. //fmt.Println("mysql r.TraceId:", r.TraceId)
  283. //fmt.Println("ok:", ok)
  284. //fmt.Println("traceMap:", len(c.traceMap))
  285. if err == nil {
  286. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  287. //apmTrace.PostGreSqlTraceQueryEvent(query, r, conn.ActualDest)
  288. apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemPostgreSQL, query, r, conn.ActualDest)
  289. c.SendEvent(apmTrace, r.TraceId)
  290. }
  291. }
  292. /**
  293. * Mysql
  294. */
  295. case l7.ProtocolMysql:
  296. if r.Method != l7.MethodStatementClose {
  297. stats.observe(r.Status.String(), "", r.Duration)
  298. }
  299. if c.l7Attach && c.valuableTrace(r.TraceId) {
  300. if conn.mysqlParser == nil {
  301. conn.mysqlParser = l7.NewMysqlParser()
  302. }
  303. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  304. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  305. if c.AppInfo.AppName != "" {
  306. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  307. }
  308. //apmTrace, ok := c.getTrace(r.TraceId)
  309. apmTrace, err := c.getOrInitTrace(r.TraceId)
  310. //fmt.Println("mysql r.TraceId:", r.TraceId)
  311. //fmt.Println("ok:", ok)
  312. //fmt.Println("traceMap:", len(c.traceMap))
  313. if err == nil {
  314. dbSystem := semconv.DBSystemMySQL
  315. // 根据端口白名单确定协议类型
  316. l7Type := flags.GetProtocolByPort(uint16(conn.ActualDest.Port()))
  317. if l7Type == l7.ProtocolMariaDB {
  318. dbSystem = semconv.DBSystemMariaDB
  319. }
  320. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  321. //apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
  322. apmTrace.SQLTraceQueryEvent(l7Type, dbSystem, query, r, conn.ActualDest)
  323. c.SendEvent(apmTrace, r.TraceId)
  324. }
  325. }
  326. /**
  327. * DM (达梦数据库)
  328. */
  329. case l7.ProtocolDM:
  330. //统计dm的query次数
  331. stats.observe(r.Status.String(), "", r.Duration)
  332. //是否发送数据
  333. if c.l7Attach && c.valuableTrace(r.TraceId) {
  334. if conn.dmParser == nil {
  335. conn.dmParser = l7.NewDmParser()
  336. }
  337. query := conn.dmParser.Parse(r.Payload, r.StatementId)
  338. if c.AppInfo.AppName != "" {
  339. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  340. }
  341. apmTrace, err := c.getOrInitTrace(r.TraceId)
  342. if err == nil {
  343. //apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest)
  344. apmTrace.SQLTraceQueryEvent(r.Protocol, semconv.DBSystemDaMengDB, query, r, conn.ActualDest)
  345. c.SendEvent(apmTrace, r.TraceId)
  346. }
  347. }
  348. /**
  349. * Memcached
  350. */
  351. case l7.ProtocolMemcached:
  352. stats.observe(r.Status.String(), "", r.Duration)
  353. if c.l7Attach && c.valuableTrace(r.TraceId) {
  354. cmd, items := l7.ParseMemcached(r.Payload)
  355. if c.AppInfo.AppName != "" {
  356. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd+" "+strings.Join(items, " "))
  357. }
  358. apmTrace, err := c.getOrInitTrace(r.TraceId)
  359. if err == nil {
  360. statement := cmd
  361. if len(items) == 1 {
  362. statement += " " + items[0]
  363. } else if len(items) > 1 {
  364. joined := fmt.Sprintf("[%s]", strings.Join(items, " "))
  365. statement += " " + joined
  366. }
  367. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMemcached, cmd, statement, r, conn.Src, conn.ActualDest)
  368. c.SendEvent(apmTrace, r.TraceId)
  369. }
  370. }
  371. /**
  372. * Redis
  373. */
  374. case l7.ProtocolRedis:
  375. stats.observe(r.Status.String(), "", r.Duration)
  376. if c.l7Attach && c.valuableTrace(r.TraceId) {
  377. cmd, args := l7.ParseRedis(r.Payload)
  378. //fmt.Println("cmd", cmd)
  379. //fmt.Println("args", args)
  380. //apmTrace, ok := c.getTrace(r.TraceId)
  381. if c.AppInfo.AppName != "" {
  382. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, cmd)
  383. }
  384. apmTrace, err := c.getOrInitTrace(r.TraceId)
  385. if err == nil {
  386. statement := cmd
  387. if args != "" {
  388. statement += " " + args
  389. }
  390. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemRedis, cmd, statement, r, conn.Src, conn.ActualDest)
  391. c.SendEvent(apmTrace, r.TraceId)
  392. }
  393. }
  394. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  395. /**
  396. * gRPC
  397. */
  398. case l7.ProtocolGrpc:
  399. klog.Debugln("enter the l7.ProtocolGrpc")
  400. stats.observe(r.Status.String(), "", r.Duration)
  401. if c.l7Attach && c.valuableTrace(r.TraceId) {
  402. apmTrace, err := c.getOrInitTrace(r.TraceId)
  403. if err == nil {
  404. apmTrace.GrpcClientTraceQueryEvent(r)
  405. c.SendEvent(apmTrace, r.TraceId)
  406. }
  407. }
  408. /**
  409. * MongoDB
  410. */
  411. case l7.ProtocolMongo:
  412. stats.observe(r.Status.String(), "", r.Duration)
  413. if c.l7Attach && c.valuableTrace(r.TraceId) {
  414. query := l7.ParseMongo(r.Payload)
  415. if c.AppInfo.AppName != "" {
  416. klog.Debugf("[%s] ->>>>> MongoDB -> %s SQL:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
  417. }
  418. apmTrace, err := c.getOrInitTrace(r.TraceId)
  419. if err == nil {
  420. // MongoDB query 格式通常是 JSON,如 {"insert":"users"} 或 {"find":"users","filter":{...}}
  421. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemMongoDB, "", query, r, conn.Src, conn.ActualDest)
  422. c.SendEvent(apmTrace, r.TraceId)
  423. }
  424. }
  425. /**
  426. * Cassandra
  427. */
  428. case l7.ProtocolCassandra:
  429. stats.observe(r.Status.String(), "", r.Duration)
  430. if c.l7Attach && c.valuableTrace(r.TraceId) {
  431. if conn.cassandraParser == nil {
  432. conn.cassandraParser = l7.NewCassandraParser()
  433. }
  434. var query string
  435. query = string(r.Payload)
  436. //query := conn.cassandraParser.Parse(r.Payload)
  437. if c.AppInfo.AppName != "" {
  438. klog.Debugf("[%s] ->>>>> %s -> %s payload:[%s]", c.AppInfo.AppName, r.Protocol.String(), conn.ActualDest, query)
  439. }
  440. apmTrace, err := c.getOrInitTrace(r.TraceId)
  441. if err == nil {
  442. apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemCassandra, "", query, r, conn.Src, conn.ActualDest)
  443. c.SendEvent(apmTrace, r.TraceId)
  444. }
  445. }
  446. /**
  447. * Kafka
  448. */
  449. case l7.ProtocolKafka:
  450. stats.observe(r.Status.String(), "", r.Duration)
  451. if c.l7Attach && c.valuableTrace(r.TraceId) {
  452. }
  453. /**
  454. * RabbitMQ / NATS
  455. */
  456. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  457. stats.observe(r.Status.String(), r.Method.String(), 0)
  458. if c.l7Attach && c.valuableTrace(r.TraceId) {
  459. }
  460. /**
  461. * Dubbo2
  462. */
  463. case l7.ProtocolDubbo2:
  464. stats.observe(r.Status.String(), "", r.Duration)
  465. if c.l7Attach && c.valuableTrace(r.TraceId) {
  466. }
  467. }
  468. return nil
  469. }
  470. func (c *Container) buildIDs(pid uint32) bool {
  471. c.lock.Lock()
  472. defer c.lock.Unlock()
  473. p := c.processes[pid]
  474. if p != nil {
  475. p.cmdline = string(proc.GetRealCmdline(pid))
  476. }
  477. var sns []string
  478. var sport uint16
  479. for address, val := range c.getListens() {
  480. if val == 1 {
  481. ip := address.IP()
  482. if ip.Is4() && !ip.IsLoopback() {
  483. // 获取端口号
  484. sport = address.Port()
  485. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  486. ////c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  487. //c.AppInfo.Sn = ip.String()
  488. //c.AppInfo.Sport = int(port)
  489. //strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip.String(), port))
  490. //fmt.Println(port)
  491. ////os.Exit(1)
  492. //c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  493. //c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  494. ////c.AppInfo.InstanceId = c.instanceID.IntVal
  495. //strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid))))
  496. //c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  497. //c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  498. //return true
  499. }
  500. }
  501. }
  502. if len(sns) > 0 {
  503. //c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  504. snsStr := strings.Join(sns, ",")
  505. c.AppInfo.Sn = snsStr
  506. c.AppInfo.Sport = int(sport)
  507. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  508. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  509. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  510. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  511. // c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  512. // c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  513. return true
  514. }
  515. return false
  516. }
  517. func (c *Container) ReBuildIds(pid uint32) {
  518. c.lock.Lock()
  519. defer c.lock.Unlock()
  520. var sns []string
  521. var sport uint16
  522. for address, val := range c.getListens() {
  523. if val == 1 {
  524. ip := address.IP()
  525. if ip.Is4() && !ip.IsLoopback() {
  526. // 获取端口号
  527. sport = address.Port()
  528. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  529. }
  530. }
  531. }
  532. if len(sns) > 0 {
  533. snsStr := strings.Join(sns, ",")
  534. c.AppInfo.Sn = snsStr
  535. c.AppInfo.Sport = int(sport)
  536. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  537. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  538. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  539. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  540. strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", utils.GetHostIP(), c.AppInfo.AppIdHash.IntVal))
  541. c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  542. c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  543. }
  544. }
  545. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  546. c.lock.Lock()
  547. defer c.lock.Unlock()
  548. // get the associated uprobe
  549. uprobe, err := c.GetUprobe(event, tracer)
  550. if err != nil {
  551. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  552. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  553. return
  554. }
  555. if event.TraceId <= 0 {
  556. //fmt.Println("StackProcess TraceId id 0")
  557. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  558. return
  559. }
  560. // fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  561. stackFun := ebpftracer.StackFunEvent{}
  562. stackFun.Uprobe = &uprobe
  563. stackFun.StackEvent = event
  564. apmTrace, ok := c.getTrace(event.TraceId)
  565. if ok {
  566. apmTrace.FunAdd(stackFun)
  567. }
  568. }
  569. func byteExtractString(nameString [100]byte) string {
  570. n := bytes.IndexFunc(nameString[:], func(r rune) bool {
  571. return r == 0 || r < 32 || r > 126 // 截取到第一个零值或非打印字符
  572. })
  573. if n == -1 {
  574. n = len(nameString) // 没找到零值或非打印字符,使用数组长度
  575. }
  576. return string(nameString[:n])
  577. }
  578. func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  579. c.lock.Lock()
  580. defer c.lock.Unlock()
  581. // get the associated uprobe
  582. switch event.Location {
  583. case 0: // ret
  584. Funcname := ""
  585. if event.Type != uint64(CodeTypeJava) {
  586. uprobe, err := c.GetUprobe(event, tracer)
  587. if err != nil {
  588. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  589. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  590. return
  591. }
  592. Funcname = uprobe.Funcname
  593. } else {
  594. ClassName := byteExtractString(event.ClassName)
  595. MethedName := byteExtractString(event.MethedName)
  596. Funcname = ClassName + "." + MethedName
  597. }
  598. if event.TraceId <= 0 {
  599. //fmt.Println("StackProcess TraceId id 0")
  600. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  601. return
  602. }
  603. //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  604. apmTrace, err := c.getOrInitTrace(event.TraceId)
  605. if err == nil {
  606. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  607. duration := event.TimeNsEnd - event.TimeNsStart
  608. apmTrace.FuncTraceQuery(Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
  609. c.SendEvent(apmTrace, event.TraceId)
  610. }
  611. }
  612. }
  613. // ResolveAddress returns the symbol(s) and offset of the given address.
  614. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  615. if addr == 0 {
  616. // err = errors.Wrapf(SymbolNotFoundError, "0")
  617. return
  618. }
  619. // symbols, _, err := e.Symbols()
  620. if err != nil {
  621. return
  622. }
  623. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  624. if idx == 0 {
  625. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  626. return
  627. }
  628. // why diff symbol may contains the same addr?
  629. sym := symbols[idx-1]
  630. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  631. syms = append(syms, symbols[i])
  632. }
  633. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  634. syms = append(syms, symbols[i])
  635. }
  636. return syms, uint(addr - sym.Value), nil
  637. }
  638. type MemoryMap struct {
  639. Start, End uint64
  640. }
  641. // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
  642. func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
  643. file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
  644. if err != nil {
  645. return nil, err
  646. }
  647. defer file.Close()
  648. scanner := bufio.NewScanner(file)
  649. if scanner.Scan() {
  650. fields := strings.Fields(scanner.Text())
  651. addresses := strings.Split(fields[0], "-")
  652. if len(addresses) != 2 {
  653. return nil, errors.New("unexpected format in /proc/<pid>/maps")
  654. }
  655. start, err := strconv.ParseUint(addresses[0], 16, 64)
  656. if err != nil {
  657. return nil, err
  658. }
  659. end, err := strconv.ParseUint(addresses[1], 16, 64)
  660. if err != nil {
  661. return nil, err
  662. }
  663. return &MemoryMap{
  664. Start: start,
  665. End: end,
  666. }, nil
  667. }
  668. if err := scanner.Err(); err != nil {
  669. return nil, err
  670. }
  671. return nil, errors.New("empty /proc/<pid>/maps")
  672. }
  673. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  674. //fmt.Println("GetUprobe entory:")
  675. memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
  676. Address := event.Ip - memoryMap.Start
  677. // fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
  678. for _, fun := range c.UprobesMap {
  679. funAddress := fun.Address + fun.AbsOffset
  680. // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
  681. if funAddress == Address {
  682. // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x ---- %s--%x\n", memoryMap.Start, event.Ip, fun.Funcname, fun.Address)
  683. return fun, nil
  684. }
  685. }
  686. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  687. if err != nil {
  688. return
  689. }
  690. for _, sym := range syms {
  691. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  692. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
  693. if ok {
  694. return uprobe, nil
  695. }
  696. }
  697. err = errors.New("uprobe not found")
  698. return
  699. }
  700. func (c *Container) GetAppInfo() AppInfo {
  701. return c.AppInfo
  702. }
  703. // 可注入前置
  704. func (c *Container) checkEventReady() bool {
  705. c.lock.Lock()
  706. defer c.lock.Unlock()
  707. return c.l7EventReady
  708. }
  709. func (c *Container) eventReady() {
  710. c.lock.Lock()
  711. defer c.lock.Unlock()
  712. c.l7EventReady = true
  713. }
  714. // uprobe前置
  715. func (c *Container) Isl7AttachSuccess() bool {
  716. c.lock.Lock()
  717. defer c.lock.Unlock()
  718. return c.l7Attach
  719. }
  720. func (c *Container) l7AttachSuccess() {
  721. c.lock.Lock()
  722. defer c.lock.Unlock()
  723. c.l7Attach = true
  724. }
  725. func (c *Container) ctrlStack(r *Registry, pid uint32) {
  726. resp, err := c.GetCodeSetting(r)
  727. if err != nil {
  728. klog.WithField("pid", pid).WithError(err).Error("[ctrlStack] GetCodeSetting failed.")
  729. return
  730. }
  731. if resp.BlackWhiteSettings.CollectStack == OPEN_STACK {
  732. // 有黑白名单规则 &&
  733. // 之前有注入 先卸载再注入
  734. // 之前没注入 直接注入
  735. // 没有有黑白名单 直接卸载
  736. if c.hasStackRule(resp) {
  737. if c.stackRuleUpdate(resp) {
  738. // 重新注入
  739. err = c.DetachStack(pid, APP_UNINSTALL)
  740. if err != nil {
  741. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  742. }
  743. }
  744. klog.WithField("pid", pid).Infoln("[ctrlStack] Attach app stack.")
  745. c.saveWhiteStackSettingInfo(resp)
  746. err = c.AttachStack(r.tracer, pid)
  747. if err != nil {
  748. c.AppInfo.SetAppStackError()
  749. klog.WithField("pid", pid).WithError(err).Errorf("[ctrlStack][end] Failed attach stack trace!")
  750. }
  751. } else {
  752. if c.noOrigRule() {
  753. return
  754. }
  755. c.saveWhiteStackSettingInfo(resp)
  756. // 关闭堆栈
  757. err = c.DetachStack(pid, APP_UNINSTALL)
  758. if err != nil {
  759. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  760. }
  761. }
  762. } else {
  763. if c.noOrigRule() {
  764. return
  765. }
  766. c.saveWhiteStackSettingInfo(resp)
  767. // 关闭堆栈
  768. err = c.DetachStack(pid, APP_UNINSTALL)
  769. if err != nil {
  770. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  771. }
  772. }
  773. }
  774. func (c *Container) verifyAttachConditions(r *Registry, pid uint32) (bool, int) {
  775. p := c.processes[pid]
  776. if p != nil && c.checkEventReady() {
  777. codeType := c.GetCodeTypeFromCache(pid)
  778. if codeType.IsUnknownCode() {
  779. klog.WithField("pid", pid).Debug("[verify] unknown language.")
  780. return false, 0
  781. }
  782. cmdline := p.GetCmdline()
  783. if len(cmdline) == 0 {
  784. return false, 0
  785. }
  786. //whiteListByCode := r.getWhiteListByCodeType(codeType)
  787. whiteListByCode := r.getWhiteListAll()
  788. //klog.WithField("pid", pid).WithField("codeType", codeType.String()).
  789. // Infof("[verify] white list %v", utils.ToString(whiteListByCode))
  790. // 当前语言的白名单规则
  791. for _, setting := range whiteListByCode {
  792. ruleVal := setting.Filters
  793. if ruleVal == "" {
  794. continue
  795. }
  796. // 判断规则
  797. if strings.Contains(cmdline, ruleVal) {
  798. //if !codeType.IsJvmCode() {
  799. // klog.WithField("pid", pid).Warning("[verify] This agent version only supports JVM applications.")
  800. // return false, 0
  801. //}
  802. c.WhiteSettingInfo.AppName = setting.AppName
  803. c.WhiteSettingInfo.Filters = setting.Filters
  804. klog.WithField("pid", pid).
  805. WithField("codeType", codeType.String()).
  806. WithField("ruleVal", ruleVal).
  807. WithField("cmdline", cmdline).
  808. //WithField("stack", setting.OpenStack).
  809. WithField("white list", utils.ToString(whiteListByCode)).
  810. Infoln("[verify] check successful.")
  811. return true, 0
  812. }
  813. }
  814. }
  815. return false, 0
  816. }
  817. // 1.卸载入口
  818. func (c *Container) Detach(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) {
  819. c.lock.Lock()
  820. defer c.lock.Unlock()
  821. if p := c.processes[pid]; p != nil {
  822. err := c.DetachUprobes(tracer, pid, detachType)
  823. if err != nil {
  824. klog.WithError(err).Errorln("DetachUprobes Error.")
  825. }
  826. err = c.DetachStack(pid, detachType)
  827. if err != nil {
  828. klog.WithError(err).Errorln("DetachStack Error.")
  829. }
  830. // 关闭7层监控
  831. c.l7Attach = false
  832. // 变更应用状态
  833. if err != nil {
  834. detachType = detachType.Error()
  835. }
  836. c.AppInfo.SetAppStatus(detachType)
  837. }
  838. }
  839. // 1.1卸载uprobe
  840. func (c *Container) DetachUprobes(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) error {
  841. // close uprobe
  842. if p := c.processes[pid]; p != nil {
  843. for _, u := range p.uprobes {
  844. err := u.Close()
  845. if err != nil {
  846. return err
  847. }
  848. }
  849. p.uprobes = []link.Link{}
  850. switch detachType {
  851. case APP_UNINSTALL, APP_FUSE:
  852. codeType := c.GetCodeTypeFromCache(pid)
  853. switch codeType {
  854. case CodeTypeJava:
  855. p.jvmAttachOnce = false
  856. case CodeTypeGo:
  857. p.goTlsUprobesChecked = false
  858. p.openSslUprobesChecked = false
  859. default:
  860. }
  861. case APP_UPROBE_ERROR:
  862. klog.Infof("[DetachUprobes] ERROR_DETACH for pid %d", pid)
  863. default:
  864. }
  865. //delete the proc info form proc_info_map(for kernel) when the uprobe detached
  866. if err := tracer.DelKProcInfo(pid); err != nil {
  867. return fmt.Errorf("[DetachUprobes] failed to delete KProcInfo for pid %d, detach type is:%s", pid, detachType)
  868. } else {
  869. klog.Infof("[DetachUprobes] delete KProcInfo success for pid %d,detachType:%d", pid, detachType)
  870. c.AppInfo.EBPFProcInfo = nil
  871. }
  872. } else {
  873. return fmt.Errorf("[DetachUprobes] cannot find uprobe for pid %d", pid)
  874. }
  875. return nil
  876. }
  877. // 1.2卸载堆栈
  878. func (c *Container) DetachStack(pid uint32, detachType APP_TYPE) error {
  879. if p := c.processes[pid]; p != nil {
  880. var err error
  881. codeType := c.GetCodeTypeFromCache(pid)
  882. switch codeType {
  883. // 1.2.1 卸载 jvm堆栈
  884. case CodeTypeJava:
  885. err = c.detachJvmStack(pid)
  886. default:
  887. err = p.closeStackUprobes()
  888. }
  889. if err != nil {
  890. klog.WithError(err).Errorln("[detachStack] failed to detach stack")
  891. return err
  892. }
  893. p.stackAttachOnce = false
  894. } else {
  895. return fmt.Errorf("[DetachStack] cannot find uprobe for pid %d", pid)
  896. }
  897. return nil
  898. }
  899. // 1.2.1 卸载 jvm堆栈
  900. func (c *Container) detachJvmStack(pid uint32) error {
  901. if p := c.processes[pid]; p != nil {
  902. //if p.stackStatus.IsStackUprobesSuccess() || len(p.stackUprobes) > 0 {
  903. //}
  904. // 卸载 JavaAgent
  905. var err error
  906. if p.stackStatus.IsJattachSuccess() {
  907. // 卸载堆栈probes
  908. err = p.closeStackUprobes()
  909. if err != nil {
  910. klog.WithError(err).Errorf("[detachJvmStack] closeStackUprobes")
  911. }
  912. err = p.uninstallJavaAgent()
  913. if err != nil {
  914. klog.WithError(err).Errorf("[detachJvmStack] uninstallJavaAgent")
  915. }
  916. }
  917. return err
  918. }
  919. return nil
  920. }
  921. func (c *Container) getRootfs() string {
  922. if c.metadata != nil && c.metadata.rootfs != "" {
  923. return path.Join(*flags.HostDirPathPrefix, c.metadata.rootfs)
  924. }
  925. return ""
  926. }
  927. func (c *Container) BuildActiveApps(runtimeApps map[uint32]AppStatusInfo, pid uint32) {
  928. if c == nil {
  929. //klog.WithField("pid", pid).Warningln("[BuildActiveApps] container_apm is nil.")
  930. return
  931. }
  932. if c.AppInfo.AppName == "" {
  933. return
  934. }
  935. klog.WithField("pid", pid).WithField("appname", c.AppInfo.AppName).Infof("[BuildActiveApps] container %s is running.", c.AppInfo.AppName)
  936. detail := AppStatusInfo{
  937. Pid: pid,
  938. ProcName: c.containerName,
  939. AppName: c.AppInfo.AppName,
  940. Language: c.AppInfo.CodeType.String(),
  941. AppID: c.AppInfo.AppIdHash.IntVal,
  942. AgentID: c.AppInfo.AgentId,
  943. InstanceID: c.AppInfo.InstanceIdHash.IntVal,
  944. Sn: c.AppInfo.Sn,
  945. Sport: c.AppInfo.Sport,
  946. RegisterAt: time.Unix(c.AppInfo.RegisterAt, 0).Format("060102 15:04:05"),
  947. PreStatus: c.AppInfo.PreStatus,
  948. Status: c.AppInfo.Status,
  949. Rule: c.WhiteSettingInfo.Filters,
  950. Container: string(c.id),
  951. }
  952. detail.Rule = fmt.Sprintf("%s|%d", c.WhiteSettingInfo.Filters, c.WhiteSettingInfo.WhiteStackSettingInfo.OpenStack)
  953. if c.AppInfo.UpdateAt != 0 {
  954. detail.UpdateAt = time.Unix(c.AppInfo.UpdateAt, 0).Format("060102 15:04:05")
  955. }
  956. p := c.processes[pid]
  957. if p != nil {
  958. detail.StackStatus = p.stackStatus.String()
  959. v := 0
  960. if !p.versionFailed {
  961. v = 1
  962. }
  963. detail.StackStatus += fmt.Sprintf("V=%d", v)
  964. }
  965. runtimeApps[pid] = detail
  966. }
  967. func (c *Container) AgentCtrl(r *Registry, pid uint32) {
  968. if c == nil {
  969. //klog.WithField("pid", pid).Warningln("[AgentCtrl] cannot find container.")
  970. return
  971. }
  972. var err error
  973. verifyAttachConditions, _ := c.verifyAttachConditions(r, pid)
  974. // fusing UNINSTALL
  975. if r.isFusing && c.Isl7AttachSuccess() {
  976. c.Detach(r.tracer, pid, APP_FUSE)
  977. klog.WithField("pid", pid).Infoln("[AgentCtrl] fusing")
  978. return
  979. }
  980. // verify UNINSTALL
  981. if !verifyAttachConditions && c.Isl7AttachSuccess() {
  982. c.Detach(r.tracer, pid, APP_UNINSTALL)
  983. klog.WithField("pid", pid).Infoln("[AgentCtrl] rule uninstall.")
  984. return
  985. }
  986. if verifyAttachConditions {
  987. err = c.RegisterAppInfo(r, pid)
  988. if err != nil {
  989. klog.WithError(err).Errorf("[AgentCtrl] Failed registerAppInfo.")
  990. return
  991. }
  992. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes.")
  993. err = c.AttachUprobes(r.tracer, pid, "Agentctrl")
  994. if err != nil {
  995. klog.WithField("pid", pid).WithError(err).Errorf("[AgentCtrl] Failed attach uprobes error!")
  996. return
  997. } else {
  998. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes success!")
  999. }
  1000. // 堆栈控制
  1001. c.ctrlStack(r, pid)
  1002. }
  1003. }