container_apm.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. package containers
  2. import (
  3. "bufio"
  4. "debug/elf"
  5. "fmt"
  6. "github.com/coroot/coroot-node-agent/ebpftracer"
  7. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  8. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  9. "github.com/coroot/coroot-node-agent/tracing"
  10. "github.com/coroot/coroot-node-agent/utils"
  11. "github.com/pkg/errors"
  12. "inet.af/netaddr"
  13. "os"
  14. "sort"
  15. "strconv"
  16. "strings"
  17. "time"
  18. )
  19. type CodeType int16
  20. const (
  21. CodeTypeUnknown CodeType = -1
  22. CodeTypeWaitCheck CodeType = 0
  23. CodeTypeGo CodeType = 1006
  24. CodeTypeJava CodeType = 1002
  25. CodeTypeJavaAot CodeType = 1003
  26. CodeTypeNetCore CodeType = 1005
  27. )
  28. func (p CodeType) String() string {
  29. switch p {
  30. case CodeTypeGo:
  31. return "GO"
  32. case CodeTypeJava:
  33. return "JAVA"
  34. }
  35. return "UNKNOWN:Language"
  36. }
  37. func (p CodeType) IsWaitCheck() bool {
  38. if p == CodeTypeWaitCheck {
  39. return true
  40. }
  41. return false
  42. }
  43. func (p CodeType) IsUnknownCode() bool {
  44. if p == CodeTypeUnknown {
  45. return true
  46. }
  47. return false
  48. }
  49. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  50. trace, ok := c.traceMap[traceId]
  51. return trace, ok
  52. }
  53. func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
  54. c.traceMap[traceId] = trace
  55. }
  56. // 查询或创建trace信息
  57. func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
  58. trace, ok := c.getTrace(traceId)
  59. if !ok {
  60. //new trace
  61. trace = tracing.NewTraceFromEvent(string(c.id))
  62. //create TraceMap
  63. c.createTraceMap(traceId, trace)
  64. //create ParentSpan
  65. trace.CreateRootSpan(traceId)
  66. }
  67. return trace, nil
  68. }
  69. func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  70. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  71. ip, err := netaddr.ParseIP(hostIp)
  72. if err != nil {
  73. fmt.Println("host ip error")
  74. hostIp = "127.0.0.1"
  75. }
  76. addr := netaddr.IPPortFrom(ip, port)
  77. trace := tracing.NewTrace(string(c.id), addr)
  78. if trace == nil {
  79. return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  80. }
  81. c.traceMap[traceId] = trace
  82. trace.TraceStart(method, path, r.Status, r.Duration)
  83. return nil
  84. }
  85. // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
  86. // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
  87. func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
  88. if t.AllEventReady(traceID) {
  89. t.SendEvent()
  90. fmt.Printf("----send:%d \n", traceID)
  91. //fmt.Println(t.GetSpan())
  92. //fmt.Println("===============")
  93. delete(c.traceMap, traceID)
  94. }
  95. }
  96. func (c *Container) valuableTrace(traceID uint64) bool {
  97. return traceID != 0
  98. }
  99. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  100. c.lock.Lock()
  101. defer c.lock.Unlock()
  102. if r.Protocol == l7.ProtocolDNS {
  103. return c.onDNSRequest(r)
  104. }
  105. if !c.valuableTrace(r.TraceId) {
  106. return nil
  107. }
  108. if r.Protocol == l7.ProtocolTrace {
  109. //fmt.Println("r.TraceStart:", r.TraceStart)
  110. //fmt.Println("r.TraceEnd:", r.TraceEnd)
  111. if r.TraceStart == 1 {
  112. fmt.Println("====ProtocolTrace start====", r.TraceId)
  113. trace, err := c.getOrInitTrace(r.TraceId)
  114. if err == nil {
  115. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  116. ip, _ := netaddr.ParseIP(hostIp)
  117. trace.TraceStartEvent(method, path, r.Status, netaddr.IPPortFrom(ip, port))
  118. c.SendEvent(trace, r.TraceId)
  119. }
  120. return nil
  121. }
  122. if r.TraceEnd == 1 {
  123. fmt.Println("====ProtocolTrace end====", r.TraceId, r.EventCount)
  124. trace, err := c.getOrInitTrace(r.TraceId)
  125. if err == nil {
  126. trace.TraceEndEvent(r)
  127. c.SendEvent(trace, r.TraceId)
  128. }
  129. return nil
  130. }
  131. }
  132. if r.Protocol == l7.ProtocolHTTP {
  133. //stats.observe(r.Status.Http(), "", r.Duration)
  134. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  135. //trace.HttpRequest(method, path, r.Status, r.Duration)
  136. //apmTrace, ok := c.getTrace(r.TraceId)
  137. //if ok {
  138. // apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
  139. //}
  140. apmTrace, err := c.getOrInitTrace(r.TraceId)
  141. fmt.Println("ProtocolHTTP-----", r.TraceId, err)
  142. if err == nil {
  143. apmTrace.HttpTraceRequestEvent(method, path, hostIp, port, r)
  144. c.SendEvent(apmTrace, r.TraceId)
  145. }
  146. return nil
  147. }
  148. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  149. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  150. if conn == nil {
  151. return nil
  152. }
  153. if timestamp != 0 && conn.Timestamp != timestamp {
  154. return nil
  155. }
  156. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  157. trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  158. switch r.Protocol {
  159. case l7.ProtocolHTTP:
  160. //fmt.Println("l7.ProtocolHTTP", r.TraceId)
  161. ////stats.observe(r.Status.Http(), "", r.Duration)
  162. //method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  163. ////trace.HttpRequest(method, path, r.Status, r.Duration)
  164. //
  165. //apmTrace, ok := c.getTrace(r.TraceId)
  166. //if ok {
  167. // apmTrace.HttpTraceRequest(method, path, hostIp, port, r)
  168. //}
  169. case l7.ProtocolHTTP2:
  170. if conn.http2Parser == nil {
  171. conn.http2Parser = l7.NewHttp2Parser()
  172. }
  173. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  174. for _, req := range requests {
  175. stats.observe(req.Status.Http(), "", req.Duration)
  176. trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  177. }
  178. case l7.ProtocolPostgres:
  179. //if r.Method != l7.MethodStatementClose {
  180. // stats.observe(r.Status.String(), "", r.Duration)
  181. //}
  182. //if conn.postgresParser == nil {
  183. // conn.postgresParser = l7.NewPostgresParser()
  184. //}
  185. //query := conn.postgresParser.Parse(r.Payload)
  186. //trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  187. case l7.ProtocolMysql:
  188. //fmt.Println("mysql mysql")
  189. //fmt.Println(conn)
  190. if r.Method != l7.MethodStatementClose {
  191. stats.observe(r.Status.String(), "", r.Duration)
  192. }
  193. if conn.mysqlParser == nil {
  194. conn.mysqlParser = l7.NewMysqlParser()
  195. }
  196. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  197. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  198. //apmTrace, ok := c.getTrace(r.TraceId)
  199. apmTrace, err := c.getOrInitTrace(r.TraceId)
  200. //fmt.Println("mysql r.TraceId:", r.TraceId)
  201. //fmt.Println("ok:", ok)
  202. //fmt.Println("traceMap:", len(c.traceMap))
  203. if err == nil {
  204. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  205. apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
  206. c.SendEvent(apmTrace, r.TraceId)
  207. }
  208. case l7.ProtocolMemcached:
  209. //stats.observe(r.Status.String(), "", r.Duration)
  210. //cmd, items := l7.ParseMemcached(r.Payload)
  211. //trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  212. case l7.ProtocolRedis:
  213. stats.observe(r.Status.String(), "", r.Duration)
  214. cmd, args := l7.ParseRedis(r.Payload)
  215. fmt.Println("cmd", cmd)
  216. fmt.Println("args", args)
  217. //apmTrace, ok := c.getTrace(r.TraceId)
  218. apmTrace, err := c.getOrInitTrace(r.TraceId)
  219. if err == nil {
  220. //apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
  221. apmTrace.RedisTraceQueryEvent(cmd, args, r, conn.ActualDest)
  222. c.SendEvent(apmTrace, r.TraceId)
  223. }
  224. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  225. case l7.ProtocolMongo:
  226. //stats.observe(r.Status.String(), "", r.Duration)
  227. //query := l7.ParseMongo(r.Payload)
  228. //trace.MongoQuery(query, r.Status.Error(), r.Duration)
  229. case l7.ProtocolKafka, l7.ProtocolCassandra:
  230. //stats.observe(r.Status.String(), "", r.Duration)
  231. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  232. //stats.observe(r.Status.String(), r.Method.String(), 0)
  233. }
  234. return nil
  235. }
  236. func (c *Container) buildInstanceID() {
  237. c.lock.Lock()
  238. defer c.lock.Unlock()
  239. for address, val := range c.getListens() {
  240. if val == 1 {
  241. ip := address.IP()
  242. if ip.Is4() && !ip.IsLoopback() {
  243. // 获取端口号
  244. port := address.Port()
  245. c.instanceID.IntVal, c.instanceID.HashtVal = utils.SetInsID(fmt.Sprintf("%s:%d", ip, port))
  246. break
  247. }
  248. }
  249. }
  250. }
  251. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  252. c.lock.Lock()
  253. defer c.lock.Unlock()
  254. // get the associated uprobe
  255. uprobe, err := c.GetUprobe(event, tracer)
  256. if err != nil {
  257. fmt.Println("GetUprobeGetUprobe errer: %v", err)
  258. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  259. return
  260. }
  261. if event.TraceId <= 0 {
  262. fmt.Println("StackProcess TraceId id 0")
  263. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  264. return
  265. }
  266. // fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  267. stackFun := ebpftracer.StackFunEvent{}
  268. stackFun.Uprobe = &uprobe
  269. stackFun.StackEvent = event
  270. apmTrace, ok := c.getTrace(event.TraceId)
  271. if ok {
  272. apmTrace.FunAdd(stackFun)
  273. }
  274. }
  275. func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  276. c.lock.Lock()
  277. defer c.lock.Unlock()
  278. // get the associated uprobe
  279. switch event.Location {
  280. case 0: // ret
  281. uprobe, err := c.GetUprobe(event, tracer)
  282. if err != nil {
  283. fmt.Println("GetUprobeGetUprobe errer: %v", err)
  284. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  285. return
  286. }
  287. if event.TraceId <= 0 {
  288. fmt.Println("StackProcess TraceId id 0")
  289. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  290. return
  291. }
  292. //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  293. apmTrace, err := c.getOrInitTrace(event.TraceId)
  294. if err == nil {
  295. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  296. duration := event.TimeNsEnd - event.TimeNsStart
  297. apmTrace.FuncTraceQuery(uprobe.Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
  298. c.SendEvent(apmTrace, event.TraceId)
  299. }
  300. }
  301. }
  302. // ResolveAddress returns the symbol(s) and offset of the given address.
  303. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  304. if addr == 0 {
  305. // err = errors.Wrapf(SymbolNotFoundError, "0")
  306. return
  307. }
  308. // symbols, _, err := e.Symbols()
  309. if err != nil {
  310. return
  311. }
  312. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  313. if idx == 0 {
  314. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  315. return
  316. }
  317. // why diff symbol may contains the same addr?
  318. sym := symbols[idx-1]
  319. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  320. syms = append(syms, symbols[i])
  321. }
  322. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  323. syms = append(syms, symbols[i])
  324. }
  325. return syms, uint(addr - sym.Value), nil
  326. }
  327. type MemoryMap struct {
  328. Start, End uint64
  329. }
  330. // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
  331. func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
  332. file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
  333. if err != nil {
  334. return nil, err
  335. }
  336. defer file.Close()
  337. scanner := bufio.NewScanner(file)
  338. if scanner.Scan() {
  339. fields := strings.Fields(scanner.Text())
  340. addresses := strings.Split(fields[0], "-")
  341. if len(addresses) != 2 {
  342. return nil, errors.New("unexpected format in /proc/<pid>/maps")
  343. }
  344. start, err := strconv.ParseUint(addresses[0], 16, 64)
  345. if err != nil {
  346. return nil, err
  347. }
  348. end, err := strconv.ParseUint(addresses[1], 16, 64)
  349. if err != nil {
  350. return nil, err
  351. }
  352. return &MemoryMap{
  353. Start: start,
  354. End: end,
  355. }, nil
  356. }
  357. if err := scanner.Err(); err != nil {
  358. return nil, err
  359. }
  360. return nil, errors.New("empty /proc/<pid>/maps")
  361. }
  362. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  363. //fmt.Println("GetUprobe entory:")
  364. memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
  365. Address := event.Ip - memoryMap.Start
  366. // fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
  367. for _, fun := range c.UprobesMap {
  368. funAddress := fun.Address + fun.AbsOffset
  369. // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
  370. if funAddress == Address {
  371. // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x ---- %s--%x\n", memoryMap.Start, event.Ip, fun.Funcname, fun.Address)
  372. return fun, nil
  373. }
  374. }
  375. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  376. if err != nil {
  377. return
  378. }
  379. for _, sym := range syms {
  380. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  381. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
  382. if ok {
  383. return uprobe, nil
  384. }
  385. }
  386. err = errors.New("uprobe not found")
  387. return
  388. }