container_apm.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. package containers
  2. import (
  3. "bufio"
  4. "debug/elf"
  5. "fmt"
  6. "github.com/coroot/coroot-node-agent/ebpftracer"
  7. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  8. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  9. "github.com/coroot/coroot-node-agent/proc"
  10. "github.com/coroot/coroot-node-agent/tracing"
  11. "github.com/coroot/coroot-node-agent/utils"
  12. . "github.com/coroot/coroot-node-agent/utils/modelse"
  13. "github.com/pkg/errors"
  14. klog "github.com/sirupsen/logrus"
  15. "inet.af/netaddr"
  16. "os"
  17. "sort"
  18. "strconv"
  19. "strings"
  20. "time"
  21. )
  22. const TRACE_STATUS = 1
  23. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  24. trace, ok := c.traceMap[traceId]
  25. return trace, ok
  26. }
  27. func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
  28. c.traceMap[traceId] = trace
  29. }
  30. // 查询或创建trace信息
  31. func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
  32. trace, ok := c.getTrace(traceId)
  33. if !ok {
  34. //new trace
  35. trace = tracing.NewTraceFromEvent(string(c.id))
  36. //create TraceMap
  37. c.createTraceMap(traceId, trace)
  38. //create ParentSpan
  39. trace.CreateRootSpan(traceId)
  40. }
  41. return trace, nil
  42. }
  43. func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  44. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  45. ip, err := netaddr.ParseIP(hostIp)
  46. if err != nil {
  47. fmt.Println("host ip error")
  48. hostIp = "127.0.0.1"
  49. }
  50. addr := netaddr.IPPortFrom(ip, port)
  51. trace := tracing.NewTrace(string(c.id), addr)
  52. if trace == nil {
  53. return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  54. }
  55. c.traceMap[traceId] = trace
  56. trace.TraceStart(method, path, r.Status, r.Duration)
  57. return nil
  58. }
  59. // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
  60. // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
  61. func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
  62. if t.AllEventReady(traceID) {
  63. t.SendEvent()
  64. klog.Infof("SendEvent %d", traceID)
  65. //fmt.Println(t.GetSpan())
  66. //fmt.Println("===============")
  67. delete(c.traceMap, traceID)
  68. }
  69. }
  70. func (c *Container) valuableTrace(traceID uint64) bool {
  71. return traceID != 0
  72. }
  73. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  74. c.lock.Lock()
  75. defer c.lock.Unlock()
  76. if r.Protocol == l7.ProtocolDNS {
  77. return c.onDNSRequest(r)
  78. }
  79. //if !c.valuableTrace(r.TraceId) {
  80. // return nil
  81. //}
  82. if r.Protocol == l7.ProtocolTrace && c.l7Attach && c.valuableTrace(r.TraceId) {
  83. if r.TraceStart == TRACE_STATUS {
  84. klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId)
  85. trace, err := c.getOrInitTrace(r.TraceId)
  86. if err == nil {
  87. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  88. ip, _ := netaddr.ParseIP(hostIp)
  89. //codeType := c.GetCodeTypeFromCache(pid)
  90. trace.TraceStartEvent(method, path, r.Status, netaddr.IPPortFrom(ip, port), pid, c.GetAppInfo())
  91. c.SendEvent(trace, r.TraceId)
  92. }
  93. return nil
  94. }
  95. if r.TraceEnd == TRACE_STATUS {
  96. klog.Infof("====ProtocolTrace end==== %d %d", pid, r.TraceId)
  97. trace, err := c.getOrInitTrace(r.TraceId)
  98. if err == nil {
  99. trace.TraceEndEvent(r)
  100. c.SendEvent(trace, r.TraceId)
  101. }
  102. return nil
  103. }
  104. }
  105. if r.Protocol == l7.ProtocolHTTP {
  106. if c.l7Attach && c.valuableTrace(r.TraceId) {
  107. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  108. apmTrace, err := c.getOrInitTrace(r.TraceId)
  109. //fmt.Println("ProtocolHTTP-----", r.TraceId, err)
  110. if err == nil {
  111. apmTrace.HttpTraceRequestEvent(method, path, hostIp, port, r)
  112. c.SendEvent(apmTrace, r.TraceId)
  113. }
  114. }
  115. //return nil
  116. }
  117. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  118. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  119. if conn == nil {
  120. return nil
  121. }
  122. if timestamp != 0 && conn.Timestamp != timestamp {
  123. return nil
  124. }
  125. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  126. //trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  127. switch r.Protocol {
  128. case l7.ProtocolHTTP:
  129. stats.observe(r.Status.Http(), "", r.Duration)
  130. case l7.ProtocolHTTP2:
  131. if conn.http2Parser == nil {
  132. conn.http2Parser = l7.NewHttp2Parser()
  133. }
  134. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  135. for _, req := range requests {
  136. stats.observe(req.Status.Http(), "", req.Duration)
  137. //trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  138. }
  139. case l7.ProtocolPostgres:
  140. if r.Method != l7.MethodStatementClose {
  141. stats.observe(r.Status.String(), "", r.Duration)
  142. }
  143. //if conn.postgresParser == nil {
  144. // conn.postgresParser = l7.NewPostgresParser()
  145. //}
  146. //query := conn.postgresParser.Parse(r.Payload)
  147. //trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  148. case l7.ProtocolMysql:
  149. if r.Method != l7.MethodStatementClose {
  150. stats.observe(r.Status.String(), "", r.Duration)
  151. }
  152. if c.l7Attach && c.valuableTrace(r.TraceId) {
  153. if conn.mysqlParser == nil {
  154. conn.mysqlParser = l7.NewMysqlParser()
  155. }
  156. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  157. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  158. //apmTrace, ok := c.getTrace(r.TraceId)
  159. apmTrace, err := c.getOrInitTrace(r.TraceId)
  160. //fmt.Println("mysql r.TraceId:", r.TraceId)
  161. //fmt.Println("ok:", ok)
  162. //fmt.Println("traceMap:", len(c.traceMap))
  163. if err == nil {
  164. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  165. apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
  166. c.SendEvent(apmTrace, r.TraceId)
  167. }
  168. }
  169. case l7.ProtocolMemcached:
  170. stats.observe(r.Status.String(), "", r.Duration)
  171. if c.l7Attach && c.valuableTrace(r.TraceId) {
  172. }
  173. //cmd, items := l7.ParseMemcached(r.Payload)
  174. //trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  175. case l7.ProtocolRedis:
  176. stats.observe(r.Status.String(), "", r.Duration)
  177. if c.l7Attach && c.valuableTrace(r.TraceId) {
  178. cmd, args := l7.ParseRedis(r.Payload)
  179. //fmt.Println("cmd", cmd)
  180. //fmt.Println("args", args)
  181. //apmTrace, ok := c.getTrace(r.TraceId)
  182. apmTrace, err := c.getOrInitTrace(r.TraceId)
  183. if err == nil {
  184. //apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
  185. apmTrace.RedisTraceQueryEvent(cmd, args, r, conn.ActualDest)
  186. c.SendEvent(apmTrace, r.TraceId)
  187. }
  188. }
  189. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  190. case l7.ProtocolMongo:
  191. stats.observe(r.Status.String(), "", r.Duration)
  192. if c.l7Attach && c.valuableTrace(r.TraceId) {
  193. }
  194. //query := l7.ParseMongo(r.Payload)
  195. //trace.MongoQuery(query, r.Status.Error(), r.Duration)
  196. case l7.ProtocolKafka, l7.ProtocolCassandra:
  197. stats.observe(r.Status.String(), "", r.Duration)
  198. if c.l7Attach && c.valuableTrace(r.TraceId) {
  199. }
  200. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  201. stats.observe(r.Status.String(), r.Method.String(), 0)
  202. if c.l7Attach && c.valuableTrace(r.TraceId) {
  203. }
  204. case l7.ProtocolDubbo2:
  205. stats.observe(r.Status.String(), "", r.Duration)
  206. if c.l7Attach && c.valuableTrace(r.TraceId) {
  207. }
  208. }
  209. return nil
  210. }
  211. func (c *Container) buildIDs(pid uint32) bool {
  212. c.lock.Lock()
  213. defer c.lock.Unlock()
  214. p := c.processes[pid]
  215. if p != nil {
  216. p.cmdline = string(proc.GetRealCmdline(pid))
  217. }
  218. for address, val := range c.getListens() {
  219. if val == 1 {
  220. ip := address.IP()
  221. if ip.Is4() && !ip.IsLoopback() {
  222. // 获取端口号
  223. port := address.Port()
  224. //c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  225. c.AppInfo.Sn = ip.String()
  226. c.AppInfo.Sport = int(port)
  227. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip.String(), port))
  228. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  229. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  230. //c.AppInfo.InstanceId = c.instanceID.IntVal
  231. strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid))))
  232. c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  233. c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  234. return true
  235. }
  236. }
  237. }
  238. return false
  239. }
  240. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  241. c.lock.Lock()
  242. defer c.lock.Unlock()
  243. // get the associated uprobe
  244. uprobe, err := c.GetUprobe(event, tracer)
  245. if err != nil {
  246. fmt.Println("GetUprobeGetUprobe errer: %v", err)
  247. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  248. return
  249. }
  250. if event.TraceId <= 0 {
  251. fmt.Println("StackProcess TraceId id 0")
  252. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  253. return
  254. }
  255. // fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  256. stackFun := ebpftracer.StackFunEvent{}
  257. stackFun.Uprobe = &uprobe
  258. stackFun.StackEvent = event
  259. apmTrace, ok := c.getTrace(event.TraceId)
  260. if ok {
  261. apmTrace.FunAdd(stackFun)
  262. }
  263. }
  264. func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  265. c.lock.Lock()
  266. defer c.lock.Unlock()
  267. // get the associated uprobe
  268. switch event.Location {
  269. case 0: // ret
  270. uprobe, err := c.GetUprobe(event, tracer)
  271. if err != nil {
  272. fmt.Println("GetUprobeGetUprobe errer: %v", err)
  273. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  274. return
  275. }
  276. if event.TraceId <= 0 {
  277. fmt.Println("StackProcess TraceId id 0")
  278. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  279. return
  280. }
  281. //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  282. apmTrace, err := c.getOrInitTrace(event.TraceId)
  283. if err == nil {
  284. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  285. duration := event.TimeNsEnd - event.TimeNsStart
  286. apmTrace.FuncTraceQuery(uprobe.Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
  287. c.SendEvent(apmTrace, event.TraceId)
  288. }
  289. }
  290. }
  291. // ResolveAddress returns the symbol(s) and offset of the given address.
  292. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  293. if addr == 0 {
  294. // err = errors.Wrapf(SymbolNotFoundError, "0")
  295. return
  296. }
  297. // symbols, _, err := e.Symbols()
  298. if err != nil {
  299. return
  300. }
  301. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  302. if idx == 0 {
  303. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  304. return
  305. }
  306. // why diff symbol may contains the same addr?
  307. sym := symbols[idx-1]
  308. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  309. syms = append(syms, symbols[i])
  310. }
  311. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  312. syms = append(syms, symbols[i])
  313. }
  314. return syms, uint(addr - sym.Value), nil
  315. }
  316. type MemoryMap struct {
  317. Start, End uint64
  318. }
  319. // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
  320. func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
  321. file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
  322. if err != nil {
  323. return nil, err
  324. }
  325. defer file.Close()
  326. scanner := bufio.NewScanner(file)
  327. if scanner.Scan() {
  328. fields := strings.Fields(scanner.Text())
  329. addresses := strings.Split(fields[0], "-")
  330. if len(addresses) != 2 {
  331. return nil, errors.New("unexpected format in /proc/<pid>/maps")
  332. }
  333. start, err := strconv.ParseUint(addresses[0], 16, 64)
  334. if err != nil {
  335. return nil, err
  336. }
  337. end, err := strconv.ParseUint(addresses[1], 16, 64)
  338. if err != nil {
  339. return nil, err
  340. }
  341. return &MemoryMap{
  342. Start: start,
  343. End: end,
  344. }, nil
  345. }
  346. if err := scanner.Err(); err != nil {
  347. return nil, err
  348. }
  349. return nil, errors.New("empty /proc/<pid>/maps")
  350. }
  351. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  352. //fmt.Println("GetUprobe entory:")
  353. memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
  354. Address := event.Ip - memoryMap.Start
  355. // fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
  356. for _, fun := range c.UprobesMap {
  357. funAddress := fun.Address + fun.AbsOffset
  358. // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
  359. if funAddress == Address {
  360. // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x ---- %s--%x\n", memoryMap.Start, event.Ip, fun.Funcname, fun.Address)
  361. return fun, nil
  362. }
  363. }
  364. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  365. if err != nil {
  366. return
  367. }
  368. for _, sym := range syms {
  369. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  370. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
  371. if ok {
  372. return uprobe, nil
  373. }
  374. }
  375. err = errors.New("uprobe not found")
  376. return
  377. }
  378. func (c *Container) GetAppInfo() AppInfo {
  379. return c.AppInfo
  380. }
  381. // 可注入前置
  382. func (c *Container) checkEventReady() bool {
  383. c.lock.Lock()
  384. defer c.lock.Unlock()
  385. return c.l7EventReady
  386. }
  387. func (c *Container) eventReady() {
  388. c.lock.Lock()
  389. defer c.lock.Unlock()
  390. c.l7EventReady = true
  391. }
  392. // uprobe前置
  393. func (c *Container) checkL7AttachReady() bool {
  394. c.lock.Lock()
  395. defer c.lock.Unlock()
  396. return c.l7Attach
  397. }
  398. func (c *Container) l7AttachSuccess() {
  399. c.lock.Lock()
  400. defer c.lock.Unlock()
  401. c.l7Attach = true
  402. }
  403. func (c *Container) verifyAttachConditions(r *Registry, pid uint32) bool {
  404. p := c.processes[pid]
  405. if p != nil && c.checkEventReady() {
  406. codeType := c.GetCodeTypeFromCache(pid)
  407. if codeType.IsUnknownCode() {
  408. klog.WithField("pid", pid).Infof("[verify] unknown language.")
  409. return false
  410. }
  411. cmdline := p.GetCmdline()
  412. if len(cmdline) == 0 {
  413. return false
  414. }
  415. whiteListByCode := r.getWhiteListByCodeType(codeType)
  416. klog.WithField("pid", pid).WithField("codeType", codeType.String()).
  417. Infof("[verify] white list %v", whiteListByCode)
  418. // 当前语言的白名单规则
  419. for _, setting := range whiteListByCode {
  420. ruleVal := setting.Filters
  421. if ruleVal == "" {
  422. continue
  423. }
  424. // 判断规则
  425. if strings.Contains(cmdline, ruleVal) {
  426. c.WhiteSettingInfo = setting
  427. klog.WithField("pid", pid).
  428. WithField("ruleVal", ruleVal).
  429. WithField("cmdline", cmdline).
  430. Infoln("[verify] check successful.")
  431. return true
  432. }
  433. }
  434. }
  435. return false
  436. }
  437. func (c *Container) detachUprobes(pid uint32) {
  438. c.lock.Lock()
  439. defer c.lock.Unlock()
  440. // close uprobe
  441. if p := c.processes[pid]; p != nil {
  442. if len(p.uprobes) > 0 {
  443. klog.Infof("detachUprobes", pid)
  444. p.DynamicClose()
  445. c.l7Attach = false
  446. c.AppInfo = AppInfo{}
  447. }
  448. }
  449. }