container_apm.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package containers
  2. import (
  3. "fmt"
  4. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  5. "github.com/coroot/coroot-node-agent/tracing"
  6. "inet.af/netaddr"
  7. )
  8. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  9. trace, ok := c.traceMap[traceId]
  10. return trace, ok
  11. }
  12. func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  13. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  14. ip, err := netaddr.ParseIP(hostIp)
  15. if err != nil {
  16. return fmt.Errorf("host ip error")
  17. }
  18. addr := netaddr.IPPortFrom(ip, port)
  19. trace := tracing.NewTrace(string(c.id), addr)
  20. if trace == nil {
  21. return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  22. }
  23. c.traceMap[traceId] = trace
  24. trace.TraceStart(method, path, r.Status, r.Duration)
  25. return nil
  26. }
  27. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) {
  28. c.lock.Lock()
  29. defer c.lock.Unlock()
  30. if r.Protocol == l7.ProtocolTrace {
  31. //fmt.Println("r.TraceStart:", r.TraceStart)
  32. //fmt.Println("r.TraceEnd:", r.TraceEnd)
  33. if r.TraceStart == 1 {
  34. //fmt.Println("====ProtocolTrace start1====", r.TraceId)
  35. err := c.InitTrace(r.TraceId, r)
  36. if err != nil {
  37. fmt.Println(err)
  38. }
  39. //fmt.Println("init r.TraceId:", r.TraceId)
  40. //trace, _ := c.getTrace(r.TraceId)
  41. //fmt.Println("init traceId", trace)
  42. //stats.observe(r.Status.Http(), "", r.Duration)
  43. //method, path := l7.ParseHttp(r.Payload)
  44. //fmt.Println("r.Payload:", string(r.Payload))
  45. //fmt.Println("method:", method)
  46. //fmt.Println("path:", path)
  47. //fmt.Println("====ProtocolTrace start2====")
  48. return
  49. }
  50. if r.TraceEnd == 1 {
  51. //fmt.Println("r:", r)
  52. //fmt.Println("r.Payload:", string(r.Payload))
  53. //fmt.Println("====ProtocolTrace end2====")
  54. trace, ok := c.getTrace(r.TraceId)
  55. if ok {
  56. trace.TraceEnd(r)
  57. delete(c.traceMap, r.TraceId)
  58. }
  59. //fmt.Println("====ProtocolTrace end1====", ok, r.TraceId)
  60. return
  61. }
  62. }
  63. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  64. //fmt.Println(conn, pid, fd)
  65. if conn == nil {
  66. return
  67. }
  68. if timestamp != 0 && conn.Timestamp != timestamp {
  69. return
  70. }
  71. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  72. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  73. switch r.Protocol {
  74. case l7.ProtocolHTTP:
  75. stats.observe(r.Status.Http(), "", r.Duration)
  76. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  77. //trace.HttpRequest(method, path, r.Status, r.Duration)
  78. apmTrace, ok := c.getTrace(r.TraceId)
  79. if ok {
  80. apmTrace.HttpTraceRequest(method, path, hostIp, port, r.Status, r.Duration)
  81. }
  82. case l7.ProtocolHTTP2:
  83. if conn.http2Parser == nil {
  84. conn.http2Parser = l7.NewHttp2Parser()
  85. }
  86. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  87. for _, req := range requests {
  88. stats.observe(req.Status.Http(), "", req.Duration)
  89. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  90. }
  91. case l7.ProtocolPostgres:
  92. if r.Method != l7.MethodStatementClose {
  93. stats.observe(r.Status.String(), "", r.Duration)
  94. }
  95. if conn.postgresParser == nil {
  96. conn.postgresParser = l7.NewPostgresParser()
  97. }
  98. query := conn.postgresParser.Parse(r.Payload)
  99. trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  100. case l7.ProtocolMysql:
  101. //fmt.Println("mysql mysql")
  102. //fmt.Println(conn)
  103. if r.Method != l7.MethodStatementClose {
  104. stats.observe(r.Status.String(), "", r.Duration)
  105. }
  106. if conn.mysqlParser == nil {
  107. conn.mysqlParser = l7.NewMysqlParser()
  108. }
  109. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  110. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  111. apmTrace, ok := c.getTrace(r.TraceId)
  112. //fmt.Println("mysql r.TraceId:", r.TraceId)
  113. //fmt.Println("ok:", ok)
  114. //fmt.Println("traceMap:", len(c.traceMap))
  115. if ok {
  116. apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  117. }
  118. case l7.ProtocolMemcached:
  119. stats.observe(r.Status.String(), "", r.Duration)
  120. cmd, items := l7.ParseMemcached(r.Payload)
  121. trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  122. case l7.ProtocolRedis:
  123. fmt.Println("redis redis")
  124. stats.observe(r.Status.String(), "", r.Duration)
  125. cmd, args := l7.ParseRedis(r.Payload)
  126. fmt.Println("cmd", cmd)
  127. fmt.Println("args", args)
  128. apmTrace, ok := c.getTrace(r.TraceId)
  129. fmt.Println("redis r.TraceId:", r.TraceId)
  130. fmt.Println("ok:", ok)
  131. fmt.Println("traceMap:", len(c.traceMap))
  132. if ok {
  133. apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
  134. }
  135. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  136. case l7.ProtocolMongo:
  137. stats.observe(r.Status.String(), "", r.Duration)
  138. query := l7.ParseMongo(r.Payload)
  139. trace.MongoQuery(query, r.Status.Error(), r.Duration)
  140. case l7.ProtocolKafka, l7.ProtocolCassandra:
  141. stats.observe(r.Status.String(), "", r.Duration)
  142. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  143. stats.observe(r.Status.String(), r.Method.String(), 0)
  144. }
  145. }