container_apm.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. package containers
  2. import (
  3. "debug/elf"
  4. "fmt"
  5. "math/rand"
  6. "sort"
  7. "strconv"
  8. "time"
  9. "github.com/coroot/coroot-node-agent/ebpftracer"
  10. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  11. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  12. "github.com/coroot/coroot-node-agent/tracing"
  13. "github.com/coroot/coroot-node-agent/utils"
  14. "github.com/pkg/errors"
  15. "inet.af/netaddr"
  16. )
  17. type CodeType int16
  18. const (
  19. CodeTypeUnknown CodeType = -1
  20. CodeTypeWaitCheck CodeType = 0
  21. CodeTypeGo CodeType = 1006
  22. CodeTypeJava CodeType = 1002
  23. )
  24. func (p CodeType) String() string {
  25. switch p {
  26. case CodeTypeGo:
  27. return "GO"
  28. case CodeTypeJava:
  29. return "JAVA"
  30. }
  31. return "UNKNOWN:Language"
  32. }
  33. func (p CodeType) IsWaitCheck() bool {
  34. if p == CodeTypeWaitCheck {
  35. return true
  36. }
  37. return false
  38. }
  39. func (p CodeType) IsUnknownCode() bool {
  40. if p == CodeTypeUnknown {
  41. return true
  42. }
  43. return false
  44. }
  45. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  46. trace, ok := c.traceMap[traceId]
  47. return trace, ok
  48. }
  49. func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
  50. c.traceMap[traceId] = trace
  51. }
  52. // 查询或创建trace信息
  53. func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
  54. trace, ok := c.getTrace(traceId)
  55. if !ok {
  56. //new trace
  57. trace = tracing.NewTraceFromEvent(string(c.id))
  58. //create TraceMap
  59. c.createTraceMap(traceId, trace)
  60. //create ParentSpan
  61. trace.CreateRootSpan(traceId)
  62. }
  63. return trace, nil
  64. }
  65. func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  66. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  67. ip, err := netaddr.ParseIP(hostIp)
  68. if err != nil {
  69. return fmt.Errorf("host ip error")
  70. }
  71. addr := netaddr.IPPortFrom(ip, port)
  72. trace := tracing.NewTrace(string(c.id), addr)
  73. if trace == nil {
  74. return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  75. }
  76. c.traceMap[traceId] = trace
  77. trace.TraceStart(method, path, r.Status, r.Duration)
  78. return nil
  79. }
  80. // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
  81. // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
  82. func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
  83. if t.AllEventReady(traceID) {
  84. t.SendEvent()
  85. fmt.Printf("----send:%d \n", traceID)
  86. //fmt.Println(t.GetSpan())
  87. //fmt.Println("===============")
  88. delete(c.traceMap, traceID)
  89. }
  90. }
  91. func (c *Container) valuableTrace(traceID uint64) bool {
  92. return traceID != 0
  93. }
  94. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  95. c.lock.Lock()
  96. defer c.lock.Unlock()
  97. if r.Protocol == l7.ProtocolDNS {
  98. return c.onDNSRequest(r)
  99. }
  100. if !c.valuableTrace(r.TraceId) {
  101. return nil
  102. }
  103. if r.Protocol == l7.ProtocolTrace {
  104. //fmt.Println("r.TraceStart:", r.TraceStart)
  105. //fmt.Println("r.TraceEnd:", r.TraceEnd)
  106. if r.TraceStart == 1 {
  107. fmt.Println("====ProtocolTrace start====", r.TraceId)
  108. trace, err := c.getOrInitTrace(r.TraceId)
  109. if err == nil {
  110. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  111. ip, _ := netaddr.ParseIP(hostIp)
  112. trace.TraceStartEvent(method, path, r.Status, netaddr.IPPortFrom(ip, port))
  113. c.SendEvent(trace, r.TraceId)
  114. }
  115. return nil
  116. }
  117. if r.TraceEnd == 1 {
  118. fmt.Println("====ProtocolTrace end====", r.TraceId, r.EventCount)
  119. trace, err := c.getOrInitTrace(r.TraceId)
  120. if err == nil {
  121. trace.TraceEndEvent(r)
  122. c.SendEvent(trace, r.TraceId)
  123. }
  124. return nil
  125. }
  126. }
  127. if r.Protocol == l7.ProtocolHTTP {
  128. //stats.observe(r.Status.Http(), "", r.Duration)
  129. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  130. //trace.HttpRequest(method, path, r.Status, r.Duration)
  131. //apmTrace, ok := c.getTrace(r.TraceId)
  132. //if ok {
  133. // apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
  134. //}
  135. apmTrace, err := c.getOrInitTrace(r.TraceId)
  136. fmt.Println("ProtocolHTTP-----", r.TraceId, err)
  137. if err == nil {
  138. apmTrace.HttpTraceRequestEvent(method, path, hostIp, port, r)
  139. c.SendEvent(apmTrace, r.TraceId)
  140. }
  141. return nil
  142. }
  143. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  144. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  145. if conn == nil {
  146. return nil
  147. }
  148. if timestamp != 0 && conn.Timestamp != timestamp {
  149. return nil
  150. }
  151. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  152. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  153. switch r.Protocol {
  154. case l7.ProtocolHTTP:
  155. fmt.Println("l7.ProtocolHTTP", r.TraceId)
  156. //stats.observe(r.Status.Http(), "", r.Duration)
  157. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  158. //trace.HttpRequest(method, path, r.Status, r.Duration)
  159. apmTrace, ok := c.getTrace(r.TraceId)
  160. if ok {
  161. apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
  162. }
  163. case l7.ProtocolHTTP2:
  164. if conn.http2Parser == nil {
  165. conn.http2Parser = l7.NewHttp2Parser()
  166. }
  167. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  168. for _, req := range requests {
  169. stats.observe(req.Status.Http(), "", req.Duration)
  170. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  171. }
  172. case l7.ProtocolPostgres:
  173. if r.Method != l7.MethodStatementClose {
  174. stats.observe(r.Status.String(), "", r.Duration)
  175. }
  176. if conn.postgresParser == nil {
  177. conn.postgresParser = l7.NewPostgresParser()
  178. }
  179. query := conn.postgresParser.Parse(r.Payload)
  180. trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  181. case l7.ProtocolMysql:
  182. //fmt.Println("mysql mysql")
  183. //fmt.Println(conn)
  184. if r.Method != l7.MethodStatementClose {
  185. stats.observe(r.Status.String(), "", r.Duration)
  186. }
  187. if conn.mysqlParser == nil {
  188. conn.mysqlParser = l7.NewMysqlParser()
  189. }
  190. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  191. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  192. apmTrace, ok := c.getTrace(r.TraceId)
  193. //fmt.Println("mysql r.TraceId:", r.TraceId)
  194. //fmt.Println("ok:", ok)
  195. //fmt.Println("traceMap:", len(c.traceMap))
  196. if ok {
  197. apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  198. }
  199. case l7.ProtocolMemcached:
  200. stats.observe(r.Status.String(), "", r.Duration)
  201. cmd, items := l7.ParseMemcached(r.Payload)
  202. trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  203. case l7.ProtocolRedis:
  204. fmt.Println("redis redis")
  205. stats.observe(r.Status.String(), "", r.Duration)
  206. cmd, args := l7.ParseRedis(r.Payload)
  207. fmt.Println("cmd", cmd)
  208. fmt.Println("args", args)
  209. apmTrace, ok := c.getTrace(r.TraceId)
  210. fmt.Println("redis r.TraceId:", r.TraceId)
  211. fmt.Println("ok:", ok)
  212. fmt.Println("traceMap:", len(c.traceMap))
  213. if ok {
  214. apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
  215. }
  216. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  217. case l7.ProtocolMongo:
  218. stats.observe(r.Status.String(), "", r.Duration)
  219. query := l7.ParseMongo(r.Payload)
  220. trace.MongoQuery(query, r.Status.Error(), r.Duration)
  221. case l7.ProtocolKafka, l7.ProtocolCassandra:
  222. stats.observe(r.Status.String(), "", r.Duration)
  223. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  224. stats.observe(r.Status.String(), r.Method.String(), 0)
  225. }
  226. return nil
  227. }
  228. func (c *Container) buildInstanceID() {
  229. c.lock.Lock()
  230. defer c.lock.Unlock()
  231. for address, val := range c.getListens() {
  232. if val == 1 {
  233. ip := address.IP()
  234. if ip.Is4() && !ip.IsLoopback() {
  235. // 获取端口号
  236. port := address.Port()
  237. c.instanceID.IntVal, c.instanceID.HashtVal = utils.SetInsID(fmt.Sprintf("%s:%d", ip, port))
  238. break
  239. }
  240. }
  241. }
  242. }
  243. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  244. c.lock.Lock()
  245. defer c.lock.Unlock()
  246. // get the associated uprobe
  247. switch event.Location {
  248. case 0: // ret
  249. uprobe, err := c.GetUprobe(event, tracer)
  250. if err != nil {
  251. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  252. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  253. return
  254. }
  255. if event.TraceId <= 0 {
  256. //fmt.Println("StackProcess TraceId id 0")
  257. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  258. return
  259. }
  260. //fmt.Println("StackProcess 函数入口开始处理 fun:", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  261. apmTrace, ok := c.getTrace(event.TraceId)
  262. if ok {
  263. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  264. duration := event.TimeNsEnd - event.TimeNsStart
  265. apmTrace.FuncTraceQuery(uprobe.Funcname, time.Duration(duration), int(event.Level), int(event.Fpid), int(event.Nid))
  266. }
  267. case 2: // coroutine
  268. //fmt.Println("StackProcess 协程入口开始处理 fun:", event.TraceId, event.Goid, event.TimeNsEnd-event.TimeNsStart)
  269. apmTrace, ok := c.getTrace(event.TraceId)
  270. if ok {
  271. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, "coroutine"+strconv.FormatUint(event.Goid, 10), event.Pid, event.Fpid)
  272. duration := event.TimeNsEnd - event.TimeNsStart
  273. apmTrace.FuncTraceQuery("coroutine"+strconv.FormatUint(event.Goid, 10), time.Duration(duration), int(event.Level), int(event.Fpid), int(event.Nid))
  274. }
  275. }
  276. }
  277. func (c *Container) StackProcessBak(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  278. c.lock.Lock()
  279. defer c.lock.Unlock()
  280. // get the associated uprobe
  281. uprobe, err := c.GetUprobe(event, tracer)
  282. if err != nil {
  283. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  284. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  285. return
  286. }
  287. if event.TraceId <= 0 {
  288. //fmt.Println("StackProcess TraceId id 0")
  289. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  290. return
  291. }
  292. length := len(c.goEventStacks[event.TraceId])
  293. if length <= 0 {
  294. c.goEventStacks = map[uint64]map[uint64][]ebpftracer.StackFunEvent{}
  295. c.goEventStacks[event.TraceId] = map[uint64][]ebpftracer.StackFunEvent{}
  296. c.goEventStacks[event.TraceId][event.Goid] = []ebpftracer.StackFunEvent{}
  297. }
  298. switch event.Location {
  299. case 0: // entry
  300. level := 100
  301. pid := 100000 + event.Goid
  302. length := len(c.goEventStacks[event.TraceId][event.Goid])
  303. //fmt.Println("StackProcess 函数入口开始处理 fun:", event.TraceId, uprobe.Funcname, length)
  304. if length > 0 {
  305. funEvent := c.goEventStacks[event.TraceId][event.Goid][length-1]
  306. //fmt.Println("funEvent goEventStacks fun:", event.TraceId, funEvent.Uprobe.Funcname, funEvent.Nid, funEvent.Level)
  307. lastEvent := funEvent.StackEvent
  308. if lastEvent.Location == event.Location && lastEvent.Ip == event.Ip && lastEvent.Bp != event.CallerBp {
  309. // duplicated entry event due to stack expansion/shrinkage
  310. // log.Debugf("duplicated entry event: %+v", event)
  311. //fmt.Println("GetUprobeGetUprobe duplicated entry event: %+v", event)
  312. c.goEventStacks[event.TraceId][event.Goid][length-1].StackEvent = event
  313. return
  314. }
  315. level = int(funEvent.Level)
  316. pid = uint64(funEvent.Nid)
  317. }
  318. rand.Seed(time.Now().UnixNano())
  319. // append new event
  320. //fmt.Println("append goEventStacks fun:", event.TraceId, uprobe.Funcname, pid, level+1)
  321. c.goEventStacks[event.TraceId][event.Goid] = append(c.goEventStacks[event.TraceId][event.Goid], ebpftracer.StackFunEvent{
  322. StackEvent: event,
  323. Uprobe: &uprobe,
  324. Level: level + 1,
  325. Pid: int(pid),
  326. Nid: rand.Intn(100000000),
  327. })
  328. length = len(c.goEventStacks[event.TraceId][event.Goid])
  329. //fmt.Println("append goEventStacks end:", event.TraceId, uprobe.Funcname, pid, level+1, length)
  330. case 1: // ret
  331. //// fmt.Println("StackProcess 函数出口开始处理 fun:", event.TraceId, uprobe.Funcname)
  332. length := len(c.goEventStacks[event.TraceId][event.Goid])
  333. //fmt.Println("StackProcess 函数出口开始处理 fun:", event.TraceId, uprobe.Funcname, length)
  334. if length > 0 {
  335. funEvent := c.goEventStacks[event.TraceId][event.Goid][length-1]
  336. entFun := funEvent.StackEvent
  337. apmTrace, ok := c.getTrace(event.TraceId)
  338. //fmt.Println("StackProcess 函数出口处理 fun:", event.TraceId, funEvent.Uprobe.Funcname, length)
  339. if ok {
  340. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, funEvent.Pid, funEvent.Level, funEvent.Nid)
  341. duration := event.TimeNsEnd - entFun.TimeNsStart
  342. c.goEventStacks[event.TraceId][event.Goid] = c.goEventStacks[event.TraceId][event.Goid][:length-1]
  343. apmTrace.FuncTraceQuery(funEvent.Uprobe.Funcname, time.Duration(duration), funEvent.Level, funEvent.Pid, funEvent.Nid)
  344. }
  345. }
  346. }
  347. }
  348. // ResolveAddress returns the symbol(s) and offset of the given address.
  349. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  350. if addr == 0 {
  351. // err = errors.Wrapf(SymbolNotFoundError, "0")
  352. return
  353. }
  354. // symbols, _, err := e.Symbols()
  355. if err != nil {
  356. return
  357. }
  358. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  359. if idx == 0 {
  360. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  361. return
  362. }
  363. // why diff symbol may contains the same addr?
  364. sym := symbols[idx-1]
  365. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  366. syms = append(syms, symbols[i])
  367. }
  368. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  369. syms = append(syms, symbols[i])
  370. }
  371. return syms, uint(addr - sym.Value), nil
  372. }
  373. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  374. //fmt.Println("GetUprobe entory:")
  375. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  376. if err != nil {
  377. return
  378. }
  379. for _, sym := range syms {
  380. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  381. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s", sym.Name)]
  382. if ok {
  383. return uprobe, nil
  384. }
  385. }
  386. err = errors.New("uprobe not found")
  387. return
  388. }