container_apm.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. package containers
  2. import (
  3. "bufio"
  4. "debug/elf"
  5. "fmt"
  6. "os"
  7. "sort"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/coroot/coroot-node-agent/ebpftracer"
  12. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  13. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  14. "github.com/coroot/coroot-node-agent/tracing"
  15. "github.com/coroot/coroot-node-agent/utils"
  16. "github.com/pkg/errors"
  17. "inet.af/netaddr"
  18. )
  19. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  20. trace, ok := c.traceMap[traceId]
  21. return trace, ok
  22. }
  23. func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
  24. c.traceMap[traceId] = trace
  25. }
  26. // 查询或创建trace信息
  27. func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
  28. trace, ok := c.getTrace(traceId)
  29. if !ok {
  30. //new trace
  31. trace = tracing.NewTraceFromEvent(string(c.id))
  32. //create TraceMap
  33. c.createTraceMap(traceId, trace)
  34. //create ParentSpan
  35. trace.CreateRootSpan(traceId)
  36. }
  37. return trace, nil
  38. }
  39. func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  40. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  41. ip, err := netaddr.ParseIP(hostIp)
  42. if err != nil {
  43. return fmt.Errorf("host ip error")
  44. }
  45. addr := netaddr.IPPortFrom(ip, port)
  46. trace := tracing.NewTrace(string(c.id), addr)
  47. if trace == nil {
  48. return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  49. }
  50. c.traceMap[traceId] = trace
  51. trace.TraceStart(method, path, r.Status, r.Duration)
  52. return nil
  53. }
  54. // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
  55. // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
  56. func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
  57. if t.AllEventReady(traceID) {
  58. t.SendEvent()
  59. fmt.Printf("----send:%d \n", traceID)
  60. //fmt.Println(t.GetSpan())
  61. //fmt.Println("===============")
  62. delete(c.traceMap, traceID)
  63. }
  64. }
  65. func (c *Container) valuableTrace(traceID uint64) bool {
  66. return traceID != 0
  67. }
  68. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  69. c.lock.Lock()
  70. defer c.lock.Unlock()
  71. if r.Protocol == l7.ProtocolDNS {
  72. return c.onDNSRequest(r)
  73. }
  74. if !c.valuableTrace(r.TraceId) {
  75. return nil
  76. }
  77. if r.Protocol == l7.ProtocolTrace {
  78. //fmt.Println("r.TraceStart:", r.TraceStart)
  79. //fmt.Println("r.TraceEnd:", r.TraceEnd)
  80. if r.TraceStart == 1 {
  81. fmt.Println("====ProtocolTrace start====", r.TraceId)
  82. trace, err := c.getOrInitTrace(r.TraceId)
  83. if err == nil {
  84. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  85. ip, _ := netaddr.ParseIP(hostIp)
  86. trace.TraceStartEvent(method, path, r.Status, netaddr.IPPortFrom(ip, port))
  87. c.SendEvent(trace, r.TraceId)
  88. }
  89. return nil
  90. }
  91. if r.TraceEnd == 1 {
  92. fmt.Println("====ProtocolTrace end====", r.TraceId, r.EventCount)
  93. trace, err := c.getOrInitTrace(r.TraceId)
  94. if err == nil {
  95. trace.TraceEndEvent(r)
  96. c.SendEvent(trace, r.TraceId)
  97. }
  98. return nil
  99. }
  100. }
  101. if r.Protocol == l7.ProtocolHTTP {
  102. //stats.observe(r.Status.Http(), "", r.Duration)
  103. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  104. //trace.HttpRequest(method, path, r.Status, r.Duration)
  105. //apmTrace, ok := c.getTrace(r.TraceId)
  106. //if ok {
  107. // apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
  108. //}
  109. apmTrace, err := c.getOrInitTrace(r.TraceId)
  110. fmt.Println("ProtocolHTTP-----", r.TraceId, err)
  111. if err == nil {
  112. apmTrace.HttpTraceRequestEvent(method, path, hostIp, port, r)
  113. c.SendEvent(apmTrace, r.TraceId)
  114. }
  115. return nil
  116. }
  117. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  118. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  119. if conn == nil {
  120. return nil
  121. }
  122. if timestamp != 0 && conn.Timestamp != timestamp {
  123. return nil
  124. }
  125. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  126. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  127. switch r.Protocol {
  128. case l7.ProtocolHTTP:
  129. //fmt.Println("l7.ProtocolHTTP", r.TraceId)
  130. ////stats.observe(r.Status.Http(), "", r.Duration)
  131. //method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  132. ////trace.HttpRequest(method, path, r.Status, r.Duration)
  133. //
  134. //apmTrace, ok := c.getTrace(r.TraceId)
  135. //if ok {
  136. // apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
  137. //}
  138. case l7.ProtocolHTTP2:
  139. if conn.http2Parser == nil {
  140. conn.http2Parser = l7.NewHttp2Parser()
  141. }
  142. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  143. for _, req := range requests {
  144. stats.observe(req.Status.Http(), "", req.Duration)
  145. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  146. }
  147. case l7.ProtocolPostgres:
  148. //if r.Method != l7.MethodStatementClose {
  149. // stats.observe(r.Status.String(), "", r.Duration)
  150. //}
  151. //if conn.postgresParser == nil {
  152. // conn.postgresParser = l7.NewPostgresParser()
  153. //}
  154. //query := conn.postgresParser.Parse(r.Payload)
  155. //trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  156. case l7.ProtocolMysql:
  157. //fmt.Println("mysql mysql")
  158. //fmt.Println(conn)
  159. if r.Method != l7.MethodStatementClose {
  160. stats.observe(r.Status.String(), "", r.Duration)
  161. }
  162. if conn.mysqlParser == nil {
  163. conn.mysqlParser = l7.NewMysqlParser()
  164. }
  165. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  166. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  167. //apmTrace, ok := c.getTrace(r.TraceId)
  168. apmTrace, err := c.getOrInitTrace(r.TraceId)
  169. //fmt.Println("mysql r.TraceId:", r.TraceId)
  170. //fmt.Println("ok:", ok)
  171. //fmt.Println("traceMap:", len(c.traceMap))
  172. if err == nil {
  173. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  174. apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
  175. c.SendEvent(apmTrace, r.TraceId)
  176. }
  177. case l7.ProtocolMemcached:
  178. //stats.observe(r.Status.String(), "", r.Duration)
  179. //cmd, items := l7.ParseMemcached(r.Payload)
  180. //trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  181. case l7.ProtocolRedis:
  182. stats.observe(r.Status.String(), "", r.Duration)
  183. cmd, args := l7.ParseRedis(r.Payload)
  184. fmt.Println("cmd", cmd)
  185. fmt.Println("args", args)
  186. //apmTrace, ok := c.getTrace(r.TraceId)
  187. apmTrace, err := c.getOrInitTrace(r.TraceId)
  188. if err == nil {
  189. //apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
  190. apmTrace.RedisTraceQueryEvent(cmd, args, r, conn.ActualDest)
  191. c.SendEvent(apmTrace, r.TraceId)
  192. }
  193. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  194. case l7.ProtocolMongo:
  195. //stats.observe(r.Status.String(), "", r.Duration)
  196. //query := l7.ParseMongo(r.Payload)
  197. //trace.MongoQuery(query, r.Status.Error(), r.Duration)
  198. case l7.ProtocolKafka, l7.ProtocolCassandra:
  199. //stats.observe(r.Status.String(), "", r.Duration)
  200. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  201. //stats.observe(r.Status.String(), r.Method.String(), 0)
  202. }
  203. return nil
  204. }
  205. func (c *Container) buildInstanceID() {
  206. c.lock.Lock()
  207. defer c.lock.Unlock()
  208. for address, val := range c.getListens() {
  209. if val == 1 {
  210. ip := address.IP()
  211. if ip.Is4() && !ip.IsLoopback() {
  212. // 获取端口号
  213. port := address.Port()
  214. c.instanceID.IntVal, c.instanceID.HashtVal = utils.SetInsID(fmt.Sprintf("%s:%d", ip, port))
  215. break
  216. }
  217. }
  218. }
  219. }
  220. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  221. c.lock.Lock()
  222. defer c.lock.Unlock()
  223. // get the associated uprobe
  224. switch event.Location {
  225. case 0: // ret
  226. uprobe, err := c.GetUprobe(event, tracer)
  227. if err != nil {
  228. fmt.Println("GetUprobeGetUprobe errer: %v", err)
  229. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  230. return
  231. }
  232. if event.TraceId <= 0 {
  233. fmt.Println("StackProcess TraceId id 0")
  234. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  235. return
  236. }
  237. fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  238. apmTrace, ok := c.getTrace(event.TraceId)
  239. if ok {
  240. fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  241. duration := event.TimeNsEnd - event.TimeNsStart
  242. apmTrace.FuncTraceQuery(uprobe.Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
  243. }
  244. }
  245. }
  246. func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  247. c.lock.Lock()
  248. defer c.lock.Unlock()
  249. // get the associated uprobe
  250. switch event.Location {
  251. case 0: // ret
  252. uprobe, err := c.GetUprobe(event, tracer)
  253. if err != nil {
  254. fmt.Println("GetUprobeGetUprobe errer: %v", err)
  255. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  256. return
  257. }
  258. if event.TraceId <= 0 {
  259. fmt.Println("StackProcess TraceId id 0")
  260. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  261. return
  262. }
  263. //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  264. apmTrace, err := c.getOrInitTrace(event.TraceId)
  265. if err == nil {
  266. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  267. duration := event.TimeNsEnd - event.TimeNsStart
  268. apmTrace.FuncTraceQuery(uprobe.Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
  269. c.SendEvent(apmTrace, event.TraceId)
  270. }
  271. }
  272. }
  273. // ResolveAddress returns the symbol(s) and offset of the given address.
  274. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  275. if addr == 0 {
  276. // err = errors.Wrapf(SymbolNotFoundError, "0")
  277. return
  278. }
  279. // symbols, _, err := e.Symbols()
  280. if err != nil {
  281. return
  282. }
  283. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  284. if idx == 0 {
  285. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  286. return
  287. }
  288. // why diff symbol may contains the same addr?
  289. sym := symbols[idx-1]
  290. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  291. syms = append(syms, symbols[i])
  292. }
  293. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  294. syms = append(syms, symbols[i])
  295. }
  296. return syms, uint(addr - sym.Value), nil
  297. }
  298. type MemoryMap struct {
  299. Start, End uint64
  300. }
  301. // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
  302. func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
  303. file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
  304. if err != nil {
  305. return nil, err
  306. }
  307. defer file.Close()
  308. scanner := bufio.NewScanner(file)
  309. if scanner.Scan() {
  310. fields := strings.Fields(scanner.Text())
  311. addresses := strings.Split(fields[0], "-")
  312. if len(addresses) != 2 {
  313. return nil, errors.New("unexpected format in /proc/<pid>/maps")
  314. }
  315. start, err := strconv.ParseUint(addresses[0], 16, 64)
  316. if err != nil {
  317. return nil, err
  318. }
  319. end, err := strconv.ParseUint(addresses[1], 16, 64)
  320. if err != nil {
  321. return nil, err
  322. }
  323. return &MemoryMap{
  324. Start: start,
  325. End: end,
  326. }, nil
  327. }
  328. if err := scanner.Err(); err != nil {
  329. return nil, err
  330. }
  331. return nil, errors.New("empty /proc/<pid>/maps")
  332. }
  333. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  334. //fmt.Println("GetUprobe entory:")
  335. memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
  336. Address := event.Ip - memoryMap.Start
  337. fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
  338. for _, fun := range tracer.UprobesMap {
  339. funAddress := fun.Address + fun.AbsOffset
  340. // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
  341. if funAddress == Address {
  342. // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x\n", memoryMap.Start, event.Ip)
  343. return fun, nil
  344. }
  345. }
  346. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  347. if err != nil {
  348. return
  349. }
  350. for _, sym := range syms {
  351. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  352. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
  353. if ok {
  354. return uprobe, nil
  355. }
  356. }
  357. err = errors.New("uprobe not found")
  358. return
  359. }