container_apm.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package containers
  2. import (
  3. "fmt"
  4. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  5. "github.com/coroot/coroot-node-agent/tracing"
  6. "github.com/coroot/coroot-node-agent/utils"
  7. "inet.af/netaddr"
  8. )
  9. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  10. trace, ok := c.traceMap[traceId]
  11. return trace, ok
  12. }
  13. func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  14. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  15. ip, err := netaddr.ParseIP(hostIp)
  16. if err != nil {
  17. return fmt.Errorf("host ip error")
  18. }
  19. addr := netaddr.IPPortFrom(ip, port)
  20. trace := tracing.NewTrace(string(c.id), addr)
  21. if trace == nil {
  22. return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  23. }
  24. c.traceMap[traceId] = trace
  25. trace.TraceStart(method, path, r.Status, r.Duration)
  26. return nil
  27. }
  28. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  29. c.lock.Lock()
  30. defer c.lock.Unlock()
  31. if r.Protocol == l7.ProtocolDNS {
  32. return c.onDNSRequest(r)
  33. }
  34. if r.Protocol == l7.ProtocolTrace {
  35. //fmt.Println("r.TraceStart:", r.TraceStart)
  36. //fmt.Println("r.TraceEnd:", r.TraceEnd)
  37. if r.TraceStart == 1 {
  38. //fmt.Println("====ProtocolTrace start1====", r.TraceId)
  39. err := c.InitTrace(r.TraceId, r)
  40. if err != nil {
  41. fmt.Println(err)
  42. }
  43. //fmt.Println("init r.TraceId:", r.TraceId)
  44. //trace, _ := c.getTrace(r.TraceId)
  45. //fmt.Println("init traceId", trace)
  46. //stats.observe(r.Status.Http(), "", r.Duration)
  47. //method, path := l7.ParseHttp(r.Payload)
  48. //fmt.Println("r.Payload:", string(r.Payload))
  49. //fmt.Println("method:", method)
  50. //fmt.Println("path:", path)
  51. //fmt.Println("====ProtocolTrace start2====")
  52. return nil
  53. }
  54. if r.TraceEnd == 1 {
  55. //fmt.Println("r:", r)
  56. //fmt.Println("r.Payload:", string(r.Payload))
  57. //fmt.Println("====ProtocolTrace end2====")
  58. trace, ok := c.getTrace(r.TraceId)
  59. if ok {
  60. trace.TraceEnd(r)
  61. delete(c.traceMap, r.TraceId)
  62. }
  63. //fmt.Println("====ProtocolTrace end1====", ok, r.TraceId)
  64. return nil
  65. }
  66. }
  67. if r.Protocol == l7.ProtocolHTTP {
  68. //stats.observe(r.Status.Http(), "", r.Duration)
  69. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  70. //trace.HttpRequest(method, path, r.Status, r.Duration)
  71. apmTrace, ok := c.getTrace(r.TraceId)
  72. if ok {
  73. apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
  74. }
  75. return nil
  76. }
  77. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  78. fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  79. if conn == nil {
  80. return nil
  81. }
  82. if timestamp != 0 && conn.Timestamp != timestamp {
  83. return nil
  84. }
  85. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  86. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  87. switch r.Protocol {
  88. case l7.ProtocolHTTP:
  89. fmt.Println("l7.ProtocolHTTP", r.TraceId)
  90. //stats.observe(r.Status.Http(), "", r.Duration)
  91. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  92. //trace.HttpRequest(method, path, r.Status, r.Duration)
  93. apmTrace, ok := c.getTrace(r.TraceId)
  94. if ok {
  95. apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
  96. }
  97. case l7.ProtocolHTTP2:
  98. if conn.http2Parser == nil {
  99. conn.http2Parser = l7.NewHttp2Parser()
  100. }
  101. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  102. for _, req := range requests {
  103. stats.observe(req.Status.Http(), "", req.Duration)
  104. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  105. }
  106. case l7.ProtocolPostgres:
  107. if r.Method != l7.MethodStatementClose {
  108. stats.observe(r.Status.String(), "", r.Duration)
  109. }
  110. if conn.postgresParser == nil {
  111. conn.postgresParser = l7.NewPostgresParser()
  112. }
  113. query := conn.postgresParser.Parse(r.Payload)
  114. trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  115. case l7.ProtocolMysql:
  116. //fmt.Println("mysql mysql")
  117. //fmt.Println(conn)
  118. if r.Method != l7.MethodStatementClose {
  119. stats.observe(r.Status.String(), "", r.Duration)
  120. }
  121. if conn.mysqlParser == nil {
  122. conn.mysqlParser = l7.NewMysqlParser()
  123. }
  124. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  125. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  126. apmTrace, ok := c.getTrace(r.TraceId)
  127. //fmt.Println("mysql r.TraceId:", r.TraceId)
  128. //fmt.Println("ok:", ok)
  129. //fmt.Println("traceMap:", len(c.traceMap))
  130. if ok {
  131. apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  132. }
  133. case l7.ProtocolMemcached:
  134. stats.observe(r.Status.String(), "", r.Duration)
  135. cmd, items := l7.ParseMemcached(r.Payload)
  136. trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  137. case l7.ProtocolRedis:
  138. fmt.Println("redis redis")
  139. stats.observe(r.Status.String(), "", r.Duration)
  140. cmd, args := l7.ParseRedis(r.Payload)
  141. fmt.Println("cmd", cmd)
  142. fmt.Println("args", args)
  143. apmTrace, ok := c.getTrace(r.TraceId)
  144. fmt.Println("redis r.TraceId:", r.TraceId)
  145. fmt.Println("ok:", ok)
  146. fmt.Println("traceMap:", len(c.traceMap))
  147. if ok {
  148. apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
  149. }
  150. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  151. case l7.ProtocolMongo:
  152. stats.observe(r.Status.String(), "", r.Duration)
  153. query := l7.ParseMongo(r.Payload)
  154. trace.MongoQuery(query, r.Status.Error(), r.Duration)
  155. case l7.ProtocolKafka, l7.ProtocolCassandra:
  156. stats.observe(r.Status.String(), "", r.Duration)
  157. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  158. stats.observe(r.Status.String(), r.Method.String(), 0)
  159. }
  160. return nil
  161. }
  162. func (c *Container) buildInstanceID() {
  163. c.lock.Lock()
  164. defer c.lock.Unlock()
  165. for address, val := range c.getListens() {
  166. if val == 1 {
  167. ip := address.IP()
  168. if ip.Is4() && !ip.IsLoopback() {
  169. // 获取端口号
  170. port := address.Port()
  171. c.instanceID.IntVal, c.instanceID.HashtVal = utils.SetInsID(fmt.Sprintf("%s:%d", ip, port))
  172. break
  173. }
  174. }
  175. }
  176. }