container_apm.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. package containers
  2. import (
  3. "debug/elf"
  4. "fmt"
  5. "math/rand"
  6. "sort"
  7. "time"
  8. "github.com/coroot/coroot-node-agent/ebpftracer"
  9. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  10. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  11. "github.com/coroot/coroot-node-agent/tracing"
  12. "github.com/coroot/coroot-node-agent/utils"
  13. "github.com/pkg/errors"
  14. "inet.af/netaddr"
  15. )
  16. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  17. trace, ok := c.traceMap[traceId]
  18. return trace, ok
  19. }
  20. func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  21. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  22. ip, err := netaddr.ParseIP(hostIp)
  23. if err != nil {
  24. return fmt.Errorf("host ip error")
  25. }
  26. addr := netaddr.IPPortFrom(ip, port)
  27. trace := tracing.NewTrace(string(c.id), addr)
  28. if trace == nil {
  29. return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  30. }
  31. c.traceMap[traceId] = trace
  32. trace.TraceStart(method, path, r.Status, r.Duration)
  33. return nil
  34. }
  35. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  36. c.lock.Lock()
  37. defer c.lock.Unlock()
  38. if r.Protocol == l7.ProtocolDNS {
  39. return c.onDNSRequest(r)
  40. }
  41. if r.Protocol == l7.ProtocolTrace {
  42. //fmt.Println("r.TraceStart:", r.TraceStart)
  43. //fmt.Println("r.TraceEnd:", r.TraceEnd)
  44. if r.TraceStart == 1 {
  45. //fmt.Println("====ProtocolTrace start1====", r.TraceId)
  46. err := c.InitTrace(r.TraceId, r)
  47. if err != nil {
  48. fmt.Println(err)
  49. }
  50. //fmt.Println("init r.TraceId:", r.TraceId)
  51. //trace, _ := c.getTrace(r.TraceId)
  52. //fmt.Println("init traceId", trace)
  53. //stats.observe(r.Status.Http(), "", r.Duration)
  54. //method, path := l7.ParseHttp(r.Payload)
  55. //fmt.Println("r.Payload:", string(r.Payload))
  56. //fmt.Println("method:", method)
  57. //fmt.Println("path:", path)
  58. //fmt.Println("====ProtocolTrace start2====")
  59. return nil
  60. }
  61. if r.TraceEnd == 1 {
  62. //fmt.Println("r:", r)
  63. //fmt.Println("r.Payload:", string(r.Payload))
  64. //fmt.Println("====ProtocolTrace end2====")
  65. trace, ok := c.getTrace(r.TraceId)
  66. if ok {
  67. trace.TraceEnd(r)
  68. delete(c.traceMap, r.TraceId)
  69. }
  70. //fmt.Println("====ProtocolTrace end1====", ok, r.TraceId)
  71. return nil
  72. }
  73. }
  74. if r.Protocol == l7.ProtocolHTTP {
  75. //stats.observe(r.Status.Http(), "", r.Duration)
  76. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  77. //trace.HttpRequest(method, path, r.Status, r.Duration)
  78. apmTrace, ok := c.getTrace(r.TraceId)
  79. if ok {
  80. apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
  81. }
  82. return nil
  83. }
  84. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  85. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  86. if conn == nil {
  87. return nil
  88. }
  89. if timestamp != 0 && conn.Timestamp != timestamp {
  90. return nil
  91. }
  92. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  93. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  94. switch r.Protocol {
  95. case l7.ProtocolHTTP:
  96. fmt.Println("l7.ProtocolHTTP", r.TraceId)
  97. //stats.observe(r.Status.Http(), "", r.Duration)
  98. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  99. //trace.HttpRequest(method, path, r.Status, r.Duration)
  100. apmTrace, ok := c.getTrace(r.TraceId)
  101. if ok {
  102. apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
  103. }
  104. case l7.ProtocolHTTP2:
  105. if conn.http2Parser == nil {
  106. conn.http2Parser = l7.NewHttp2Parser()
  107. }
  108. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  109. for _, req := range requests {
  110. stats.observe(req.Status.Http(), "", req.Duration)
  111. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  112. }
  113. case l7.ProtocolPostgres:
  114. if r.Method != l7.MethodStatementClose {
  115. stats.observe(r.Status.String(), "", r.Duration)
  116. }
  117. if conn.postgresParser == nil {
  118. conn.postgresParser = l7.NewPostgresParser()
  119. }
  120. query := conn.postgresParser.Parse(r.Payload)
  121. trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  122. case l7.ProtocolMysql:
  123. //fmt.Println("mysql mysql")
  124. //fmt.Println(conn)
  125. if r.Method != l7.MethodStatementClose {
  126. stats.observe(r.Status.String(), "", r.Duration)
  127. }
  128. if conn.mysqlParser == nil {
  129. conn.mysqlParser = l7.NewMysqlParser()
  130. }
  131. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  132. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  133. apmTrace, ok := c.getTrace(r.TraceId)
  134. //fmt.Println("mysql r.TraceId:", r.TraceId)
  135. //fmt.Println("ok:", ok)
  136. //fmt.Println("traceMap:", len(c.traceMap))
  137. if ok {
  138. apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  139. }
  140. case l7.ProtocolMemcached:
  141. stats.observe(r.Status.String(), "", r.Duration)
  142. cmd, items := l7.ParseMemcached(r.Payload)
  143. trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  144. case l7.ProtocolRedis:
  145. fmt.Println("redis redis")
  146. stats.observe(r.Status.String(), "", r.Duration)
  147. cmd, args := l7.ParseRedis(r.Payload)
  148. fmt.Println("cmd", cmd)
  149. fmt.Println("args", args)
  150. apmTrace, ok := c.getTrace(r.TraceId)
  151. fmt.Println("redis r.TraceId:", r.TraceId)
  152. fmt.Println("ok:", ok)
  153. fmt.Println("traceMap:", len(c.traceMap))
  154. if ok {
  155. apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
  156. }
  157. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  158. case l7.ProtocolMongo:
  159. stats.observe(r.Status.String(), "", r.Duration)
  160. query := l7.ParseMongo(r.Payload)
  161. trace.MongoQuery(query, r.Status.Error(), r.Duration)
  162. case l7.ProtocolKafka, l7.ProtocolCassandra:
  163. stats.observe(r.Status.String(), "", r.Duration)
  164. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  165. stats.observe(r.Status.String(), r.Method.String(), 0)
  166. }
  167. return nil
  168. }
  169. func (c *Container) buildInstanceID() {
  170. c.lock.Lock()
  171. defer c.lock.Unlock()
  172. for address, val := range c.getListens() {
  173. if val == 1 {
  174. ip := address.IP()
  175. if ip.Is4() && !ip.IsLoopback() {
  176. // 获取端口号
  177. port := address.Port()
  178. c.instanceID.IntVal, c.instanceID.HashtVal = utils.SetInsID(fmt.Sprintf("%s:%d", ip, port))
  179. break
  180. }
  181. }
  182. }
  183. }
  184. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  185. c.lock.Lock()
  186. defer c.lock.Unlock()
  187. // get the associated uprobe
  188. switch event.Location {
  189. case 0: // ret
  190. uprobe, err := c.GetUprobe(event, tracer)
  191. if err != nil {
  192. fmt.Println("GetUprobeGetUprobe errer: %v", err)
  193. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  194. return
  195. }
  196. if event.TraceId <= 0 {
  197. fmt.Println("StackProcess TraceId id 0")
  198. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  199. return
  200. }
  201. fmt.Println("StackProcess 函数入口开始处理 fun:", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  202. apmTrace, ok := c.getTrace(event.TraceId)
  203. if ok {
  204. fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  205. duration := event.TimeNsEnd - event.TimeNsStart
  206. apmTrace.FuncTraceQuery(uprobe.Funcname, time.Duration(duration), int(event.Level), int(event.Fpid), int(event.Nid))
  207. }
  208. }
  209. }
  210. func (c *Container) StackProcessBak(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  211. c.lock.Lock()
  212. defer c.lock.Unlock()
  213. // get the associated uprobe
  214. uprobe, err := c.GetUprobe(event, tracer)
  215. if err != nil {
  216. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  217. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  218. return
  219. }
  220. if event.TraceId <= 0 {
  221. //fmt.Println("StackProcess TraceId id 0")
  222. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  223. return
  224. }
  225. length := len(c.goEventStacks[event.TraceId])
  226. if length <= 0 {
  227. c.goEventStacks = map[uint64]map[uint64][]ebpftracer.StackFunEvent{}
  228. c.goEventStacks[event.TraceId] = map[uint64][]ebpftracer.StackFunEvent{}
  229. c.goEventStacks[event.TraceId][event.Goid] = []ebpftracer.StackFunEvent{}
  230. }
  231. switch event.Location {
  232. case 0: // entry
  233. level := 100
  234. pid := 100000 + event.Goid
  235. length := len(c.goEventStacks[event.TraceId][event.Goid])
  236. //fmt.Println("StackProcess 函数入口开始处理 fun:", event.TraceId, uprobe.Funcname, length)
  237. if length > 0 {
  238. funEvent := c.goEventStacks[event.TraceId][event.Goid][length-1]
  239. //fmt.Println("funEvent goEventStacks fun:", event.TraceId, funEvent.Uprobe.Funcname, funEvent.Nid, funEvent.Level)
  240. lastEvent := funEvent.StackEvent
  241. if lastEvent.Location == event.Location && lastEvent.Ip == event.Ip && lastEvent.Bp != event.CallerBp {
  242. // duplicated entry event due to stack expansion/shrinkage
  243. // log.Debugf("duplicated entry event: %+v", event)
  244. //fmt.Println("GetUprobeGetUprobe duplicated entry event: %+v", event)
  245. c.goEventStacks[event.TraceId][event.Goid][length-1].StackEvent = event
  246. return
  247. }
  248. level = int(funEvent.Level)
  249. pid = uint64(funEvent.Nid)
  250. }
  251. rand.Seed(time.Now().UnixNano())
  252. // append new event
  253. //fmt.Println("append goEventStacks fun:", event.TraceId, uprobe.Funcname, pid, level+1)
  254. c.goEventStacks[event.TraceId][event.Goid] = append(c.goEventStacks[event.TraceId][event.Goid], ebpftracer.StackFunEvent{
  255. StackEvent: event,
  256. Uprobe: &uprobe,
  257. Level: level + 1,
  258. Pid: int(pid),
  259. Nid: rand.Intn(100000000),
  260. })
  261. length = len(c.goEventStacks[event.TraceId][event.Goid])
  262. //fmt.Println("append goEventStacks end:", event.TraceId, uprobe.Funcname, pid, level+1, length)
  263. case 1: // ret
  264. //// fmt.Println("StackProcess 函数出口开始处理 fun:", event.TraceId, uprobe.Funcname)
  265. length := len(c.goEventStacks[event.TraceId][event.Goid])
  266. //fmt.Println("StackProcess 函数出口开始处理 fun:", event.TraceId, uprobe.Funcname, length)
  267. if length > 0 {
  268. funEvent := c.goEventStacks[event.TraceId][event.Goid][length-1]
  269. entFun := funEvent.StackEvent
  270. apmTrace, ok := c.getTrace(event.TraceId)
  271. //fmt.Println("StackProcess 函数出口处理 fun:", event.TraceId, funEvent.Uprobe.Funcname, length)
  272. if ok {
  273. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, funEvent.Pid, funEvent.Level, funEvent.Nid)
  274. duration := event.TimeNsEnd - entFun.TimeNsStart
  275. c.goEventStacks[event.TraceId][event.Goid] = c.goEventStacks[event.TraceId][event.Goid][:length-1]
  276. apmTrace.FuncTraceQuery(funEvent.Uprobe.Funcname, time.Duration(duration), funEvent.Level, funEvent.Pid, funEvent.Nid)
  277. }
  278. }
  279. }
  280. }
  281. // ResolveAddress returns the symbol(s) and offset of the given address.
  282. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  283. if addr == 0 {
  284. // err = errors.Wrapf(SymbolNotFoundError, "0")
  285. return
  286. }
  287. // symbols, _, err := e.Symbols()
  288. if err != nil {
  289. return
  290. }
  291. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  292. if idx == 0 {
  293. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  294. return
  295. }
  296. // why diff symbol may contains the same addr?
  297. sym := symbols[idx-1]
  298. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  299. syms = append(syms, symbols[i])
  300. }
  301. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  302. syms = append(syms, symbols[i])
  303. }
  304. return syms, uint(addr - sym.Value), nil
  305. }
  306. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  307. //fmt.Println("GetUprobe entory:")
  308. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  309. if err != nil {
  310. return
  311. }
  312. for _, sym := range syms {
  313. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  314. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s", sym.Name)]
  315. if ok {
  316. return uprobe, nil
  317. }
  318. }
  319. err = errors.New("uprobe not found")
  320. return
  321. }