container_apm.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. package containers
  2. import (
  3. "bufio"
  4. "debug/elf"
  5. "fmt"
  6. "github.com/coroot/coroot-node-agent/ebpftracer"
  7. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  8. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  9. "github.com/coroot/coroot-node-agent/proc"
  10. "github.com/coroot/coroot-node-agent/tracing"
  11. "github.com/coroot/coroot-node-agent/utils"
  12. "github.com/pkg/errors"
  13. "inet.af/netaddr"
  14. "os"
  15. "sort"
  16. "strconv"
  17. "strings"
  18. "time"
  19. )
  20. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  21. trace, ok := c.traceMap[traceId]
  22. return trace, ok
  23. }
  24. func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
  25. c.traceMap[traceId] = trace
  26. }
  27. // 查询或创建trace信息
  28. func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
  29. trace, ok := c.getTrace(traceId)
  30. if !ok {
  31. //new trace
  32. trace = tracing.NewTraceFromEvent(string(c.id))
  33. //create TraceMap
  34. c.createTraceMap(traceId, trace)
  35. //create ParentSpan
  36. trace.CreateRootSpan(traceId)
  37. }
  38. return trace, nil
  39. }
  40. func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  41. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  42. ip, err := netaddr.ParseIP(hostIp)
  43. if err != nil {
  44. fmt.Println("host ip error")
  45. hostIp = "127.0.0.1"
  46. }
  47. addr := netaddr.IPPortFrom(ip, port)
  48. trace := tracing.NewTrace(string(c.id), addr)
  49. if trace == nil {
  50. return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  51. }
  52. c.traceMap[traceId] = trace
  53. trace.TraceStart(method, path, r.Status, r.Duration)
  54. return nil
  55. }
  56. // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
  57. // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
  58. func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
  59. if t.AllEventReady(traceID) {
  60. t.SendEvent()
  61. fmt.Printf("----send:%d \n", traceID)
  62. //fmt.Println(t.GetSpan())
  63. //fmt.Println("===============")
  64. delete(c.traceMap, traceID)
  65. }
  66. }
  67. func (c *Container) valuableTrace(traceID uint64) bool {
  68. return traceID != 0
  69. }
  70. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  71. c.lock.Lock()
  72. defer c.lock.Unlock()
  73. if r.Protocol == l7.ProtocolDNS {
  74. return c.onDNSRequest(r)
  75. }
  76. if !c.valuableTrace(r.TraceId) {
  77. return nil
  78. }
  79. if r.Protocol == l7.ProtocolTrace {
  80. //fmt.Println("r.TraceStart:", r.TraceStart)
  81. //fmt.Println("r.TraceEnd:", r.TraceEnd)
  82. if r.TraceStart == 1 {
  83. fmt.Println("====ProtocolTrace start====", r.TraceId)
  84. trace, err := c.getOrInitTrace(r.TraceId)
  85. if err == nil {
  86. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  87. ip, _ := netaddr.ParseIP(hostIp)
  88. trace.TraceStartEvent(method, path, r.Status, netaddr.IPPortFrom(ip, port), c.GetCodeTypeFromCache(pid))
  89. c.SendEvent(trace, r.TraceId)
  90. }
  91. return nil
  92. }
  93. if r.TraceEnd == 1 {
  94. fmt.Println("====ProtocolTrace end====", r.TraceId, r.EventCount)
  95. trace, err := c.getOrInitTrace(r.TraceId)
  96. if err == nil {
  97. trace.TraceEndEvent(r)
  98. c.SendEvent(trace, r.TraceId)
  99. }
  100. return nil
  101. }
  102. }
  103. if r.Protocol == l7.ProtocolHTTP {
  104. //stats.observe(r.Status.Http(), "", r.Duration)
  105. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  106. //trace.HttpRequest(method, path, r.Status, r.Duration)
  107. //apmTrace, ok := c.getTrace(r.TraceId)
  108. //if ok {
  109. // apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
  110. //}
  111. apmTrace, err := c.getOrInitTrace(r.TraceId)
  112. fmt.Println("ProtocolHTTP-----", r.TraceId, err)
  113. if err == nil {
  114. apmTrace.HttpTraceRequestEvent(method, path, hostIp, port, r)
  115. c.SendEvent(apmTrace, r.TraceId)
  116. }
  117. return nil
  118. }
  119. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  120. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  121. if conn == nil {
  122. return nil
  123. }
  124. if timestamp != 0 && conn.Timestamp != timestamp {
  125. return nil
  126. }
  127. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  128. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  129. switch r.Protocol {
  130. case l7.ProtocolHTTP:
  131. //fmt.Println("l7.ProtocolHTTP", r.TraceId)
  132. ////stats.observe(r.Status.Http(), "", r.Duration)
  133. //method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  134. ////trace.HttpRequest(method, path, r.Status, r.Duration)
  135. //
  136. //apmTrace, ok := c.getTrace(r.TraceId)
  137. //if ok {
  138. // apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
  139. //}
  140. case l7.ProtocolHTTP2:
  141. if conn.http2Parser == nil {
  142. conn.http2Parser = l7.NewHttp2Parser()
  143. }
  144. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  145. for _, req := range requests {
  146. stats.observe(req.Status.Http(), "", req.Duration)
  147. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  148. }
  149. case l7.ProtocolPostgres:
  150. //if r.Method != l7.MethodStatementClose {
  151. // stats.observe(r.Status.String(), "", r.Duration)
  152. //}
  153. //if conn.postgresParser == nil {
  154. // conn.postgresParser = l7.NewPostgresParser()
  155. //}
  156. //query := conn.postgresParser.Parse(r.Payload)
  157. //trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  158. case l7.ProtocolMysql:
  159. //fmt.Println("mysql mysql")
  160. //fmt.Println(conn)
  161. if r.Method != l7.MethodStatementClose {
  162. stats.observe(r.Status.String(), "", r.Duration)
  163. }
  164. if conn.mysqlParser == nil {
  165. conn.mysqlParser = l7.NewMysqlParser()
  166. }
  167. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  168. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  169. //apmTrace, ok := c.getTrace(r.TraceId)
  170. apmTrace, err := c.getOrInitTrace(r.TraceId)
  171. //fmt.Println("mysql r.TraceId:", r.TraceId)
  172. //fmt.Println("ok:", ok)
  173. //fmt.Println("traceMap:", len(c.traceMap))
  174. if err == nil {
  175. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  176. apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
  177. c.SendEvent(apmTrace, r.TraceId)
  178. }
  179. case l7.ProtocolMemcached:
  180. //stats.observe(r.Status.String(), "", r.Duration)
  181. //cmd, items := l7.ParseMemcached(r.Payload)
  182. //trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  183. case l7.ProtocolRedis:
  184. stats.observe(r.Status.String(), "", r.Duration)
  185. cmd, args := l7.ParseRedis(r.Payload)
  186. fmt.Println("cmd", cmd)
  187. fmt.Println("args", args)
  188. //apmTrace, ok := c.getTrace(r.TraceId)
  189. apmTrace, err := c.getOrInitTrace(r.TraceId)
  190. if err == nil {
  191. //apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
  192. apmTrace.RedisTraceQueryEvent(cmd, args, r, conn.ActualDest)
  193. c.SendEvent(apmTrace, r.TraceId)
  194. }
  195. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  196. case l7.ProtocolMongo:
  197. //stats.observe(r.Status.String(), "", r.Duration)
  198. //query := l7.ParseMongo(r.Payload)
  199. //trace.MongoQuery(query, r.Status.Error(), r.Duration)
  200. case l7.ProtocolKafka, l7.ProtocolCassandra:
  201. //stats.observe(r.Status.String(), "", r.Duration)
  202. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  203. //stats.observe(r.Status.String(), r.Method.String(), 0)
  204. }
  205. return nil
  206. }
  207. func (c *Container) buildIDs(pid uint32) {
  208. c.lock.Lock()
  209. defer c.lock.Unlock()
  210. p := c.processes[pid]
  211. if p != nil {
  212. p.cmdline = string(proc.GetRealCmdline(pid))
  213. }
  214. for address, val := range c.getListens() {
  215. if val == 1 {
  216. ip := address.IP()
  217. if ip.Is4() && !ip.IsLoopback() {
  218. // 获取端口号
  219. port := address.Port()
  220. //c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  221. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip, port))
  222. c.instanceID.IntVal, _ = strInstanceID.ToInt64()
  223. c.instanceID.HashtVal = strInstanceID.ToHashByte()
  224. c.AppInfo.InstanceId = c.instanceID.IntVal
  225. strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid))))
  226. c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  227. break
  228. }
  229. }
  230. }
  231. }
  232. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  233. c.lock.Lock()
  234. defer c.lock.Unlock()
  235. // get the associated uprobe
  236. uprobe, err := c.GetUprobe(event, tracer)
  237. if err != nil {
  238. fmt.Println("GetUprobeGetUprobe errer: %v", err)
  239. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  240. return
  241. }
  242. if event.TraceId <= 0 {
  243. fmt.Println("StackProcess TraceId id 0")
  244. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  245. return
  246. }
  247. // fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  248. stackFun := ebpftracer.StackFunEvent{}
  249. stackFun.Uprobe = &uprobe
  250. stackFun.StackEvent = event
  251. apmTrace, ok := c.getTrace(event.TraceId)
  252. if ok {
  253. apmTrace.FunAdd(stackFun)
  254. }
  255. }
  256. func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  257. c.lock.Lock()
  258. defer c.lock.Unlock()
  259. // get the associated uprobe
  260. switch event.Location {
  261. case 0: // ret
  262. uprobe, err := c.GetUprobe(event, tracer)
  263. if err != nil {
  264. fmt.Println("GetUprobeGetUprobe errer: %v", err)
  265. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  266. return
  267. }
  268. if event.TraceId <= 0 {
  269. fmt.Println("StackProcess TraceId id 0")
  270. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  271. return
  272. }
  273. //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  274. apmTrace, err := c.getOrInitTrace(event.TraceId)
  275. if err == nil {
  276. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  277. duration := event.TimeNsEnd - event.TimeNsStart
  278. apmTrace.FuncTraceQuery(uprobe.Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
  279. c.SendEvent(apmTrace, event.TraceId)
  280. }
  281. }
  282. }
  283. // ResolveAddress returns the symbol(s) and offset of the given address.
  284. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  285. if addr == 0 {
  286. // err = errors.Wrapf(SymbolNotFoundError, "0")
  287. return
  288. }
  289. // symbols, _, err := e.Symbols()
  290. if err != nil {
  291. return
  292. }
  293. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  294. if idx == 0 {
  295. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  296. return
  297. }
  298. // why diff symbol may contains the same addr?
  299. sym := symbols[idx-1]
  300. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  301. syms = append(syms, symbols[i])
  302. }
  303. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  304. syms = append(syms, symbols[i])
  305. }
  306. return syms, uint(addr - sym.Value), nil
  307. }
  308. type MemoryMap struct {
  309. Start, End uint64
  310. }
  311. // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
  312. func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
  313. file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
  314. if err != nil {
  315. return nil, err
  316. }
  317. defer file.Close()
  318. scanner := bufio.NewScanner(file)
  319. if scanner.Scan() {
  320. fields := strings.Fields(scanner.Text())
  321. addresses := strings.Split(fields[0], "-")
  322. if len(addresses) != 2 {
  323. return nil, errors.New("unexpected format in /proc/<pid>/maps")
  324. }
  325. start, err := strconv.ParseUint(addresses[0], 16, 64)
  326. if err != nil {
  327. return nil, err
  328. }
  329. end, err := strconv.ParseUint(addresses[1], 16, 64)
  330. if err != nil {
  331. return nil, err
  332. }
  333. return &MemoryMap{
  334. Start: start,
  335. End: end,
  336. }, nil
  337. }
  338. if err := scanner.Err(); err != nil {
  339. return nil, err
  340. }
  341. return nil, errors.New("empty /proc/<pid>/maps")
  342. }
  343. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  344. //fmt.Println("GetUprobe entory:")
  345. memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
  346. Address := event.Ip - memoryMap.Start
  347. // fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
  348. for _, fun := range c.UprobesMap {
  349. funAddress := fun.Address + fun.AbsOffset
  350. // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
  351. if funAddress == Address {
  352. // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x ---- %s--%x\n", memoryMap.Start, event.Ip, fun.Funcname, fun.Address)
  353. return fun, nil
  354. }
  355. }
  356. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  357. if err != nil {
  358. return
  359. }
  360. for _, sym := range syms {
  361. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  362. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
  363. if ok {
  364. return uprobe, nil
  365. }
  366. }
  367. err = errors.New("uprobe not found")
  368. return
  369. }
  370. func (c *Container) eventReady() {
  371. c.lock.Lock()
  372. defer c.lock.Unlock()
  373. c.l7EventReady = true
  374. }
  375. func (c *Container) checkEventReady() bool {
  376. c.lock.Lock()
  377. defer c.lock.Unlock()
  378. return c.l7EventReady
  379. }
  380. func (c *Container) checkL7AttachReady() bool {
  381. c.lock.Lock()
  382. defer c.lock.Unlock()
  383. return c.l7Attach
  384. }
  385. func (c *Container) l7AttachSuccess() {
  386. c.l7Attach = true
  387. }
  388. func (c *Container) verifyAttachConditions(r *Registry, pid uint32) bool {
  389. p := c.processes[pid]
  390. if p != nil && c.checkEventReady() {
  391. codeType := c.GetCodeTypeFromCache(pid)
  392. cmdline := p.GetCmdline()
  393. fmt.Printf("checkEventReady %v [%d] cmdline='%s' len(cmdline)[%d]\n", c.checkEventReady(), pid, cmdline, len(cmdline))
  394. if len(cmdline) == 0 {
  395. return false
  396. }
  397. fmt.Println("r.getWhiteListByCodeType(codeType):")
  398. fmt.Println(r.getWhiteListByCodeType(codeType))
  399. // 当前语言的白名单规则
  400. for _, setting := range r.getWhiteListByCodeType(codeType) {
  401. ruleVal := setting.Filters
  402. if ruleVal == "" {
  403. continue
  404. }
  405. fmt.Println("strings.Contains(cmdline, ruleVal)")
  406. fmt.Println(cmdline)
  407. fmt.Println(ruleVal)
  408. // 判断规则
  409. if strings.Contains(cmdline, ruleVal) {
  410. fmt.Println("------------", pid, ruleVal)
  411. c.WhiteSettingInfo = setting
  412. return true
  413. }
  414. }
  415. }
  416. return false
  417. }
  418. func (c *Container) RegisterAppInfo(pid uint32) error {
  419. if c.WhiteSettingInfo.AppName == "" {
  420. return fmt.Errorf("AppName is empty")
  421. }
  422. var err error
  423. originAppName := c.AppInfo.AppName
  424. if originAppName != c.WhiteSettingInfo.AppName {
  425. c.AppInfo.AppName = c.WhiteSettingInfo.AppName
  426. c.AppInfo.AppId, err = utils.BuildInt64ID(c.AppInfo.AppName).ToInt64()
  427. // 注册
  428. fmt.Println("注册")
  429. }
  430. // ip:port + process_name + exe路径生成
  431. //c.instanceID.IntVal + proc.GetExe(pid)
  432. //c.AppInfo.AgentId =
  433. return err
  434. }
  435. func (c *Container) detachUprobes(pid uint32) {
  436. c.lock.Lock()
  437. defer c.lock.Unlock()
  438. // close uprobe
  439. if p := c.processes[pid]; p != nil {
  440. if len(p.uprobes) > 0 {
  441. fmt.Println("卸载")
  442. p.DynamicClose()
  443. c.l7Attach = false
  444. }
  445. }
  446. }