container_apm.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. package containers
  2. import (
  3. "bufio"
  4. "debug/elf"
  5. "fmt"
  6. "os"
  7. "sort"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/coroot/coroot-node-agent/ebpftracer"
  12. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  13. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  14. "github.com/coroot/coroot-node-agent/tracing"
  15. "github.com/coroot/coroot-node-agent/utils"
  16. "github.com/pkg/errors"
  17. "inet.af/netaddr"
  18. )
  19. type CodeType int16
  20. const (
  21. CodeTypeUnknown CodeType = -1
  22. CodeTypeWaitCheck CodeType = 0
  23. CodeTypeGo CodeType = 1006
  24. CodeTypeJava CodeType = 1002
  25. )
  26. func (p CodeType) String() string {
  27. switch p {
  28. case CodeTypeGo:
  29. return "GO"
  30. case CodeTypeJava:
  31. return "JAVA"
  32. }
  33. return "UNKNOWN:Language"
  34. }
  35. func (p CodeType) IsWaitCheck() bool {
  36. if p == CodeTypeWaitCheck {
  37. return true
  38. }
  39. return false
  40. }
  41. func (p CodeType) IsUnknownCode() bool {
  42. if p == CodeTypeUnknown {
  43. return true
  44. }
  45. return false
  46. }
  47. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  48. trace, ok := c.traceMap[traceId]
  49. return trace, ok
  50. }
  51. func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  52. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  53. ip, err := netaddr.ParseIP(hostIp)
  54. if err != nil {
  55. return fmt.Errorf("host ip error")
  56. }
  57. addr := netaddr.IPPortFrom(ip, port)
  58. trace := tracing.NewTrace(string(c.id), addr)
  59. if trace == nil {
  60. return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  61. }
  62. c.traceMap[traceId] = trace
  63. trace.TraceStart(method, path, r.Status, r.Duration)
  64. return nil
  65. }
  66. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  67. c.lock.Lock()
  68. defer c.lock.Unlock()
  69. if r.Protocol == l7.ProtocolDNS {
  70. return c.onDNSRequest(r)
  71. }
  72. if r.Protocol == l7.ProtocolTrace {
  73. //fmt.Println("r.TraceStart:", r.TraceStart)
  74. //fmt.Println("r.TraceEnd:", r.TraceEnd)
  75. if r.TraceStart == 1 {
  76. //fmt.Println("====ProtocolTrace start1====", r.TraceId)
  77. err := c.InitTrace(r.TraceId, r)
  78. if err != nil {
  79. fmt.Println(err)
  80. }
  81. //fmt.Println("init r.TraceId:", r.TraceId)
  82. //trace, _ := c.getTrace(r.TraceId)
  83. //fmt.Println("init traceId", trace)
  84. //stats.observe(r.Status.Http(), "", r.Duration)
  85. //method, path := l7.ParseHttp(r.Payload)
  86. //fmt.Println("r.Payload:", string(r.Payload))
  87. //fmt.Println("method:", method)
  88. //fmt.Println("path:", path)
  89. //fmt.Println("====ProtocolTrace start2====")
  90. return nil
  91. }
  92. if r.TraceEnd == 1 {
  93. //fmt.Println("r:", r)
  94. //fmt.Println("r.Payload:", string(r.Payload))
  95. //fmt.Println("====ProtocolTrace end2====")
  96. trace, ok := c.getTrace(r.TraceId)
  97. if ok {
  98. trace.TraceEnd(r)
  99. delete(c.traceMap, r.TraceId)
  100. }
  101. //fmt.Println("====ProtocolTrace end1====", ok, r.TraceId)
  102. return nil
  103. }
  104. }
  105. if r.Protocol == l7.ProtocolHTTP {
  106. //stats.observe(r.Status.Http(), "", r.Duration)
  107. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  108. //trace.HttpRequest(method, path, r.Status, r.Duration)
  109. apmTrace, ok := c.getTrace(r.TraceId)
  110. if ok {
  111. apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
  112. }
  113. return nil
  114. }
  115. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  116. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  117. if conn == nil {
  118. return nil
  119. }
  120. if timestamp != 0 && conn.Timestamp != timestamp {
  121. return nil
  122. }
  123. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  124. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  125. switch r.Protocol {
  126. case l7.ProtocolHTTP:
  127. fmt.Println("l7.ProtocolHTTP", r.TraceId)
  128. //stats.observe(r.Status.Http(), "", r.Duration)
  129. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  130. //trace.HttpRequest(method, path, r.Status, r.Duration)
  131. apmTrace, ok := c.getTrace(r.TraceId)
  132. if ok {
  133. apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
  134. }
  135. case l7.ProtocolHTTP2:
  136. if conn.http2Parser == nil {
  137. conn.http2Parser = l7.NewHttp2Parser()
  138. }
  139. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  140. for _, req := range requests {
  141. stats.observe(req.Status.Http(), "", req.Duration)
  142. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  143. }
  144. case l7.ProtocolPostgres:
  145. if r.Method != l7.MethodStatementClose {
  146. stats.observe(r.Status.String(), "", r.Duration)
  147. }
  148. if conn.postgresParser == nil {
  149. conn.postgresParser = l7.NewPostgresParser()
  150. }
  151. query := conn.postgresParser.Parse(r.Payload)
  152. trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  153. case l7.ProtocolMysql:
  154. //fmt.Println("mysql mysql")
  155. //fmt.Println(conn)
  156. if r.Method != l7.MethodStatementClose {
  157. stats.observe(r.Status.String(), "", r.Duration)
  158. }
  159. if conn.mysqlParser == nil {
  160. conn.mysqlParser = l7.NewMysqlParser()
  161. }
  162. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  163. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  164. apmTrace, ok := c.getTrace(r.TraceId)
  165. //fmt.Println("mysql r.TraceId:", r.TraceId)
  166. //fmt.Println("ok:", ok)
  167. //fmt.Println("traceMap:", len(c.traceMap))
  168. if ok {
  169. apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  170. }
  171. case l7.ProtocolMemcached:
  172. stats.observe(r.Status.String(), "", r.Duration)
  173. cmd, items := l7.ParseMemcached(r.Payload)
  174. trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  175. case l7.ProtocolRedis:
  176. fmt.Println("redis redis")
  177. stats.observe(r.Status.String(), "", r.Duration)
  178. cmd, args := l7.ParseRedis(r.Payload)
  179. fmt.Println("cmd", cmd)
  180. fmt.Println("args", args)
  181. apmTrace, ok := c.getTrace(r.TraceId)
  182. fmt.Println("redis r.TraceId:", r.TraceId)
  183. fmt.Println("ok:", ok)
  184. fmt.Println("traceMap:", len(c.traceMap))
  185. if ok {
  186. apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
  187. }
  188. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  189. case l7.ProtocolMongo:
  190. stats.observe(r.Status.String(), "", r.Duration)
  191. query := l7.ParseMongo(r.Payload)
  192. trace.MongoQuery(query, r.Status.Error(), r.Duration)
  193. case l7.ProtocolKafka, l7.ProtocolCassandra:
  194. stats.observe(r.Status.String(), "", r.Duration)
  195. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  196. stats.observe(r.Status.String(), r.Method.String(), 0)
  197. }
  198. return nil
  199. }
  200. func (c *Container) buildInstanceID() {
  201. c.lock.Lock()
  202. defer c.lock.Unlock()
  203. for address, val := range c.getListens() {
  204. if val == 1 {
  205. ip := address.IP()
  206. if ip.Is4() && !ip.IsLoopback() {
  207. // 获取端口号
  208. port := address.Port()
  209. c.instanceID.IntVal, c.instanceID.HashtVal = utils.SetInsID(fmt.Sprintf("%s:%d", ip, port))
  210. break
  211. }
  212. }
  213. }
  214. }
  215. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  216. c.lock.Lock()
  217. defer c.lock.Unlock()
  218. // get the associated uprobe
  219. switch event.Location {
  220. case 0: // ret
  221. uprobe, err := c.GetUprobe(event, tracer)
  222. if err != nil {
  223. fmt.Println("GetUprobeGetUprobe errer: %v", err)
  224. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  225. return
  226. }
  227. if event.TraceId <= 0 {
  228. fmt.Println("StackProcess TraceId id 0")
  229. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  230. return
  231. }
  232. fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  233. apmTrace, ok := c.getTrace(event.TraceId)
  234. if ok {
  235. fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  236. duration := event.TimeNsEnd - event.TimeNsStart
  237. apmTrace.FuncTraceQuery(uprobe.Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
  238. }
  239. }
  240. }
  241. // ResolveAddress returns the symbol(s) and offset of the given address.
  242. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  243. if addr == 0 {
  244. // err = errors.Wrapf(SymbolNotFoundError, "0")
  245. return
  246. }
  247. // symbols, _, err := e.Symbols()
  248. if err != nil {
  249. return
  250. }
  251. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  252. if idx == 0 {
  253. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  254. return
  255. }
  256. // why diff symbol may contains the same addr?
  257. sym := symbols[idx-1]
  258. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  259. syms = append(syms, symbols[i])
  260. }
  261. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  262. syms = append(syms, symbols[i])
  263. }
  264. return syms, uint(addr - sym.Value), nil
  265. }
  266. type MemoryMap struct {
  267. Start, End uint64
  268. }
  269. // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
  270. func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
  271. file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
  272. if err != nil {
  273. return nil, err
  274. }
  275. defer file.Close()
  276. scanner := bufio.NewScanner(file)
  277. if scanner.Scan() {
  278. fields := strings.Fields(scanner.Text())
  279. addresses := strings.Split(fields[0], "-")
  280. if len(addresses) != 2 {
  281. return nil, errors.New("unexpected format in /proc/<pid>/maps")
  282. }
  283. start, err := strconv.ParseUint(addresses[0], 16, 64)
  284. if err != nil {
  285. return nil, err
  286. }
  287. end, err := strconv.ParseUint(addresses[1], 16, 64)
  288. if err != nil {
  289. return nil, err
  290. }
  291. return &MemoryMap{
  292. Start: start,
  293. End: end,
  294. }, nil
  295. }
  296. if err := scanner.Err(); err != nil {
  297. return nil, err
  298. }
  299. return nil, errors.New("empty /proc/<pid>/maps")
  300. }
  301. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  302. //fmt.Println("GetUprobe entory:")
  303. memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
  304. Address := event.Ip - memoryMap.Start
  305. fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
  306. for _, fun := range tracer.UprobesMap {
  307. // funAddress := fun.Address + fun.AbsOffset
  308. // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
  309. if fun.Address == Address {
  310. // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x\n", memoryMap.Start, event.Ip)
  311. return fun, nil
  312. }
  313. }
  314. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  315. if err != nil {
  316. return
  317. }
  318. for _, sym := range syms {
  319. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  320. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
  321. if ok {
  322. return uprobe, nil
  323. }
  324. }
  325. err = errors.New("uprobe not found")
  326. return
  327. }