container_apm.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762
  1. package containers
  2. import (
  3. "bufio"
  4. "bytes"
  5. "debug/elf"
  6. "fmt"
  7. "os"
  8. "path"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/cilium/ebpf/link"
  14. "github.com/coroot/coroot-node-agent/flags"
  15. "github.com/coroot/coroot-node-agent/ebpftracer"
  16. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  18. "github.com/coroot/coroot-node-agent/proc"
  19. "github.com/coroot/coroot-node-agent/tracing"
  20. "github.com/coroot/coroot-node-agent/utils"
  21. . "github.com/coroot/coroot-node-agent/utils/modelse"
  22. "github.com/pkg/errors"
  23. klog "github.com/sirupsen/logrus"
  24. "inet.af/netaddr"
  25. )
  26. const (
  27. TRACE_STATUS = 1
  28. )
  29. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  30. trace, ok := c.traceMap[traceId]
  31. return trace, ok
  32. }
  33. func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
  34. c.traceMap[traceId] = trace
  35. }
  36. // 查询或创建trace信息
  37. func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
  38. trace, ok := c.getTrace(traceId)
  39. if !ok {
  40. //new trace
  41. trace = tracing.NewTraceFromEvent(string(c.id))
  42. //create TraceMap
  43. c.createTraceMap(traceId, trace)
  44. //create ParentSpan
  45. trace.CreateRootSpan(traceId)
  46. }
  47. return trace, nil
  48. }
  49. func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  50. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  51. ip, err := netaddr.ParseIP(hostIp)
  52. if err != nil {
  53. fmt.Println("host ip error")
  54. hostIp = "127.0.0.1"
  55. }
  56. addr := netaddr.IPPortFrom(ip, port)
  57. trace := tracing.NewTrace(string(c.id), addr)
  58. if trace == nil {
  59. return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  60. }
  61. c.traceMap[traceId] = trace
  62. trace.TraceStart(method, path, r.Status, r.Duration)
  63. return nil
  64. }
  65. // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
  66. // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
  67. func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
  68. if t.AllEventReady(traceID) {
  69. t.SendEvent()
  70. klog.Infof("SendEvent %d", traceID)
  71. //fmt.Println(t.GetSpan())
  72. //fmt.Println("===============")
  73. delete(c.traceMap, traceID)
  74. }
  75. }
  76. func (c *Container) valuableTrace(traceID uint64) bool {
  77. return traceID != 0
  78. }
  79. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  80. c.lock.Lock()
  81. defer c.lock.Unlock()
  82. if r.Protocol == l7.ProtocolDNS {
  83. ip2fqdn, _type, fqdn := c.onDNSRequest(r)
  84. if c.l7Attach && c.valuableTrace(r.TraceId) {
  85. apmTrace, err := c.getOrInitTrace(r.TraceId)
  86. if err == nil {
  87. apmTrace.DNSTraceQueryEvent(r, _type, fqdn)
  88. c.SendEvent(apmTrace, r.TraceId)
  89. }
  90. }
  91. return ip2fqdn
  92. }
  93. //if !c.valuableTrace(r.TraceId) {
  94. // return nil
  95. //}
  96. // klog.Infof("====ProtocolTrace+++++ start==== %d %d", pid, r.TraceId)
  97. // klog.Infof("====ProtocolTrace===== start==== %d %d", r.Protocol == l7.ProtocolTrace, c.l7Attach)
  98. if r.Protocol == l7.ProtocolTrace && c.l7Attach && c.valuableTrace(r.TraceId) {
  99. // klog.Infof("====ProtocolTrace---- start==== %d %d", pid, r.TraceId)
  100. if r.TraceStart == TRACE_STATUS {
  101. // klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId)
  102. trace, err := c.getOrInitTrace(r.TraceId)
  103. klog.Infof("payload:[%s]", r.Payload)
  104. if err == nil {
  105. method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload)
  106. ip, _ := netaddr.ParseIP(sn)
  107. //codeType := c.GetCodeTypeFromCache(pid)
  108. trace.TraceStartEvent(method, requestURI, sn, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo())
  109. c.SendEvent(trace, r.TraceId)
  110. }
  111. return nil
  112. }
  113. if r.TraceEnd == TRACE_STATUS {
  114. klog.Infof("====ProtocolTrace end==== %d %d", pid, r.TraceId)
  115. trace, err := c.getOrInitTrace(r.TraceId)
  116. if err == nil {
  117. trace.TraceEndEvent(r)
  118. c.SendEvent(trace, r.TraceId)
  119. }
  120. return nil
  121. }
  122. }
  123. if r.Protocol == l7.ProtocolHTTP {
  124. if c.l7Attach && c.valuableTrace(r.TraceId) {
  125. method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload)
  126. apmTrace, err := c.getOrInitTrace(r.TraceId)
  127. //fmt.Println("ProtocolHTTP-----", r.TraceId, err)
  128. if err == nil {
  129. apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
  130. c.SendEvent(apmTrace, r.TraceId)
  131. }
  132. }
  133. //return nil
  134. }
  135. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  136. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  137. if conn == nil {
  138. return nil
  139. }
  140. if timestamp != 0 && conn.Timestamp != timestamp {
  141. return nil
  142. }
  143. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  144. //trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  145. switch r.Protocol {
  146. case l7.ProtocolHTTP:
  147. stats.observe(r.Status.Http(), "", r.Duration)
  148. case l7.ProtocolHTTP2:
  149. if conn.http2Parser == nil {
  150. conn.http2Parser = l7.NewHttp2Parser()
  151. }
  152. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  153. for _, req := range requests {
  154. stats.observe(req.Status.Http(), "", req.Duration)
  155. //trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  156. }
  157. case l7.ProtocolPostgres:
  158. if r.Method != l7.MethodStatementClose {
  159. stats.observe(r.Status.String(), "", r.Duration)
  160. }
  161. //if conn.postgresParser == nil {
  162. // conn.postgresParser = l7.NewPostgresParser()
  163. //}
  164. //query := conn.postgresParser.Parse(r.Payload)
  165. //trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  166. if c.l7Attach && c.valuableTrace(r.TraceId) {
  167. if conn.postgresParser == nil {
  168. conn.postgresParser = l7.NewPostgresParser()
  169. }
  170. query := conn.postgresParser.Parse(r.Payload)
  171. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  172. //apmTrace, ok := c.getTrace(r.TraceId)
  173. apmTrace, err := c.getOrInitTrace(r.TraceId)
  174. fmt.Println(err)
  175. //fmt.Println("mysql r.TraceId:", r.TraceId)
  176. //fmt.Println("ok:", ok)
  177. //fmt.Println("traceMap:", len(c.traceMap))
  178. if err == nil {
  179. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  180. apmTrace.PostGreSqlTraceQueryEvent(query, r, conn.ActualDest)
  181. c.SendEvent(apmTrace, r.TraceId)
  182. }
  183. }
  184. case l7.ProtocolMysql:
  185. if r.Method != l7.MethodStatementClose {
  186. stats.observe(r.Status.String(), "", r.Duration)
  187. }
  188. if c.l7Attach && c.valuableTrace(r.TraceId) {
  189. if conn.mysqlParser == nil {
  190. conn.mysqlParser = l7.NewMysqlParser()
  191. }
  192. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  193. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  194. //apmTrace, ok := c.getTrace(r.TraceId)
  195. apmTrace, err := c.getOrInitTrace(r.TraceId)
  196. fmt.Println(err)
  197. //fmt.Println("mysql r.TraceId:", r.TraceId)
  198. //fmt.Println("ok:", ok)
  199. //fmt.Println("traceMap:", len(c.traceMap))
  200. if err == nil {
  201. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  202. apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
  203. c.SendEvent(apmTrace, r.TraceId)
  204. }
  205. }
  206. case l7.ProtocolDM:
  207. //统计dm的query次数
  208. stats.observe(r.Status.String(), "", r.Duration)
  209. //是否发送数据
  210. if c.l7Attach && c.valuableTrace(r.TraceId) {
  211. if conn.dmParser == nil {
  212. conn.dmParser = l7.NewDmParser()
  213. }
  214. query := conn.dmParser.Parse(r.Payload, r.StatementId)
  215. apmTrace, err := c.getOrInitTrace(r.TraceId)
  216. if err == nil {
  217. apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest)
  218. c.SendEvent(apmTrace, r.TraceId)
  219. }
  220. }
  221. case l7.ProtocolMemcached:
  222. stats.observe(r.Status.String(), "", r.Duration)
  223. if c.l7Attach && c.valuableTrace(r.TraceId) {
  224. }
  225. //cmd, items := l7.ParseMemcached(r.Payload)
  226. //trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  227. case l7.ProtocolRedis:
  228. stats.observe(r.Status.String(), "", r.Duration)
  229. if c.l7Attach && c.valuableTrace(r.TraceId) {
  230. cmd, args := l7.ParseRedis(r.Payload)
  231. //fmt.Println("cmd", cmd)
  232. //fmt.Println("args", args)
  233. //apmTrace, ok := c.getTrace(r.TraceId)
  234. apmTrace, err := c.getOrInitTrace(r.TraceId)
  235. if err == nil {
  236. //apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
  237. apmTrace.RedisTraceQueryEvent(cmd, args, r, conn.ActualDest)
  238. c.SendEvent(apmTrace, r.TraceId)
  239. }
  240. }
  241. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  242. case l7.ProtocolMongo:
  243. stats.observe(r.Status.String(), "", r.Duration)
  244. if c.l7Attach && c.valuableTrace(r.TraceId) {
  245. }
  246. //query := l7.ParseMongo(r.Payload)
  247. //trace.MongoQuery(query, r.Status.Error(), r.Duration)
  248. case l7.ProtocolKafka, l7.ProtocolCassandra:
  249. stats.observe(r.Status.String(), "", r.Duration)
  250. if c.l7Attach && c.valuableTrace(r.TraceId) {
  251. }
  252. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  253. stats.observe(r.Status.String(), r.Method.String(), 0)
  254. if c.l7Attach && c.valuableTrace(r.TraceId) {
  255. }
  256. case l7.ProtocolDubbo2:
  257. stats.observe(r.Status.String(), "", r.Duration)
  258. if c.l7Attach && c.valuableTrace(r.TraceId) {
  259. }
  260. }
  261. return nil
  262. }
  263. func (c *Container) buildIDs(pid uint32) bool {
  264. c.lock.Lock()
  265. defer c.lock.Unlock()
  266. p := c.processes[pid]
  267. if p != nil {
  268. p.cmdline = string(proc.GetRealCmdline(pid))
  269. }
  270. var sns []string
  271. var sport uint16
  272. for address, val := range c.getListens() {
  273. if val == 1 {
  274. ip := address.IP()
  275. if ip.Is4() && !ip.IsLoopback() {
  276. // 获取端口号
  277. sport = address.Port()
  278. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  279. ////c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  280. //c.AppInfo.Sn = ip.String()
  281. //c.AppInfo.Sport = int(port)
  282. //strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip.String(), port))
  283. //fmt.Println(port)
  284. ////os.Exit(1)
  285. //c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  286. //c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  287. ////c.AppInfo.InstanceId = c.instanceID.IntVal
  288. //strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid))))
  289. //c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  290. //c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  291. //return true
  292. }
  293. }
  294. }
  295. if len(sns) > 0 {
  296. //c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  297. snsStr := strings.Join(sns, ",")
  298. c.AppInfo.Sn = snsStr
  299. c.AppInfo.Sport = int(sport)
  300. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  301. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  302. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  303. strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  304. c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  305. c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  306. return true
  307. }
  308. return false
  309. }
  310. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  311. c.lock.Lock()
  312. defer c.lock.Unlock()
  313. // get the associated uprobe
  314. uprobe, err := c.GetUprobe(event, tracer)
  315. if err != nil {
  316. fmt.Println("GetUprobeGetUprobe errer: %v", err)
  317. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  318. return
  319. }
  320. if event.TraceId <= 0 {
  321. fmt.Println("StackProcess TraceId id 0")
  322. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  323. return
  324. }
  325. // fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  326. stackFun := ebpftracer.StackFunEvent{}
  327. stackFun.Uprobe = &uprobe
  328. stackFun.StackEvent = event
  329. apmTrace, ok := c.getTrace(event.TraceId)
  330. if ok {
  331. apmTrace.FunAdd(stackFun)
  332. }
  333. }
  334. func byteExtractString(nameString [100]byte) string {
  335. n := bytes.IndexFunc(nameString[:], func(r rune) bool {
  336. return r == 0 || r < 32 || r > 126 // 截取到第一个零值或非打印字符
  337. })
  338. if n == -1 {
  339. n = len(nameString) // 没找到零值或非打印字符,使用数组长度
  340. }
  341. return string(nameString[:n])
  342. }
  343. func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  344. c.lock.Lock()
  345. defer c.lock.Unlock()
  346. // get the associated uprobe
  347. switch event.Location {
  348. case 0: // ret
  349. Funcname := ""
  350. if event.Type != uint64(CodeTypeJava) {
  351. uprobe, err := c.GetUprobe(event, tracer)
  352. if err != nil {
  353. fmt.Println("GetUprobeGetUprobe errer: %v", err)
  354. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  355. return
  356. }
  357. Funcname = uprobe.Funcname
  358. } else {
  359. ClassName := byteExtractString(event.ClassName)
  360. MethedName := byteExtractString(event.MethedName)
  361. Funcname = ClassName + "." + MethedName
  362. }
  363. if event.TraceId <= 0 {
  364. fmt.Println("StackProcess TraceId id 0")
  365. // log.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  366. return
  367. }
  368. //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  369. apmTrace, err := c.getOrInitTrace(event.TraceId)
  370. if err == nil {
  371. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  372. duration := event.TimeNsEnd - event.TimeNsStart
  373. apmTrace.FuncTraceQuery(Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
  374. c.SendEvent(apmTrace, event.TraceId)
  375. }
  376. }
  377. }
  378. // ResolveAddress returns the symbol(s) and offset of the given address.
  379. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  380. if addr == 0 {
  381. // err = errors.Wrapf(SymbolNotFoundError, "0")
  382. return
  383. }
  384. // symbols, _, err := e.Symbols()
  385. if err != nil {
  386. return
  387. }
  388. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  389. if idx == 0 {
  390. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  391. return
  392. }
  393. // why diff symbol may contains the same addr?
  394. sym := symbols[idx-1]
  395. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  396. syms = append(syms, symbols[i])
  397. }
  398. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  399. syms = append(syms, symbols[i])
  400. }
  401. return syms, uint(addr - sym.Value), nil
  402. }
  403. type MemoryMap struct {
  404. Start, End uint64
  405. }
  406. // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
  407. func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
  408. file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
  409. if err != nil {
  410. return nil, err
  411. }
  412. defer file.Close()
  413. scanner := bufio.NewScanner(file)
  414. if scanner.Scan() {
  415. fields := strings.Fields(scanner.Text())
  416. addresses := strings.Split(fields[0], "-")
  417. if len(addresses) != 2 {
  418. return nil, errors.New("unexpected format in /proc/<pid>/maps")
  419. }
  420. start, err := strconv.ParseUint(addresses[0], 16, 64)
  421. if err != nil {
  422. return nil, err
  423. }
  424. end, err := strconv.ParseUint(addresses[1], 16, 64)
  425. if err != nil {
  426. return nil, err
  427. }
  428. return &MemoryMap{
  429. Start: start,
  430. End: end,
  431. }, nil
  432. }
  433. if err := scanner.Err(); err != nil {
  434. return nil, err
  435. }
  436. return nil, errors.New("empty /proc/<pid>/maps")
  437. }
  438. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  439. //fmt.Println("GetUprobe entory:")
  440. memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
  441. Address := event.Ip - memoryMap.Start
  442. // fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
  443. for _, fun := range c.UprobesMap {
  444. funAddress := fun.Address + fun.AbsOffset
  445. // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
  446. if funAddress == Address {
  447. // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x ---- %s--%x\n", memoryMap.Start, event.Ip, fun.Funcname, fun.Address)
  448. return fun, nil
  449. }
  450. }
  451. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  452. if err != nil {
  453. return
  454. }
  455. for _, sym := range syms {
  456. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  457. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
  458. if ok {
  459. return uprobe, nil
  460. }
  461. }
  462. err = errors.New("uprobe not found")
  463. return
  464. }
  465. func (c *Container) GetAppInfo() AppInfo {
  466. return c.AppInfo
  467. }
  468. // 可注入前置
  469. func (c *Container) checkEventReady() bool {
  470. c.lock.Lock()
  471. defer c.lock.Unlock()
  472. return c.l7EventReady
  473. }
  474. func (c *Container) eventReady() {
  475. c.lock.Lock()
  476. defer c.lock.Unlock()
  477. c.l7EventReady = true
  478. }
  479. // uprobe前置
  480. func (c *Container) Isl7AttachSuccess() bool {
  481. c.lock.Lock()
  482. defer c.lock.Unlock()
  483. return c.l7Attach
  484. }
  485. func (c *Container) l7AttachSuccess() {
  486. c.lock.Lock()
  487. defer c.lock.Unlock()
  488. c.l7Attach = true
  489. }
  490. func (c *Container) verifyAttachConditions(r *Registry, pid uint32) (bool, int) {
  491. p := c.processes[pid]
  492. if p != nil && c.checkEventReady() {
  493. codeType := c.GetCodeTypeFromCache(pid)
  494. if codeType.IsUnknownCode() {
  495. klog.WithField("pid", pid).Debug("[verify] unknown language.")
  496. return false, 0
  497. }
  498. cmdline := p.GetCmdline()
  499. if len(cmdline) == 0 {
  500. return false, 0
  501. }
  502. //whiteListByCode := r.getWhiteListByCodeType(codeType)
  503. whiteListByCode := r.getWhiteListAll()
  504. //klog.WithField("pid", pid).WithField("codeType", codeType.String()).
  505. // Infof("[verify] white list %v", utils.ToString(whiteListByCode))
  506. // 当前语言的白名单规则
  507. for _, setting := range whiteListByCode {
  508. ruleVal := setting.Filters
  509. if ruleVal == "" {
  510. continue
  511. }
  512. // 判断规则
  513. if strings.Contains(cmdline, ruleVal) {
  514. c.WhiteSettingInfo = setting
  515. klog.WithField("pid", pid).
  516. WithField("codeType", codeType.String()).
  517. WithField("ruleVal", ruleVal).
  518. WithField("cmdline", cmdline).
  519. WithField("stack", setting.OpenStack).
  520. WithField("white list", utils.ToString(whiteListByCode)).
  521. Infoln("[verify] check successful.")
  522. return true, setting.OpenStack
  523. }
  524. }
  525. }
  526. return false, 0
  527. }
  528. // 1.卸载入口
  529. func (c *Container) Detach(pid uint32, detachType APP_TYPE) {
  530. c.lock.Lock()
  531. defer c.lock.Unlock()
  532. if p := c.processes[pid]; p != nil {
  533. err := c.DetachUprobes(pid, detachType)
  534. if err != nil {
  535. klog.WithError(err).Errorln("DetachUprobes Error.")
  536. }
  537. err = c.DetachStack(pid, detachType)
  538. if err != nil {
  539. klog.WithError(err).Errorln("DetachStack Error.")
  540. }
  541. // 关闭7层监控
  542. c.l7Attach = false
  543. // 变更应用状态
  544. if err != nil {
  545. detachType = detachType.Error()
  546. }
  547. c.AppInfo.SetAppStatus(detachType)
  548. }
  549. }
  550. // 1.1卸载uprobe
  551. func (c *Container) DetachUprobes(pid uint32, detachType APP_TYPE) error {
  552. // close uprobe
  553. if p := c.processes[pid]; p != nil {
  554. for _, u := range p.uprobes {
  555. err := u.Close()
  556. if err != nil {
  557. return err
  558. }
  559. }
  560. p.uprobes = []link.Link{}
  561. switch detachType {
  562. case APP_UNINSTALL:
  563. codeType := c.GetCodeTypeFromCache(pid)
  564. switch codeType {
  565. case CodeTypeJava:
  566. p.jvmAttachOnce = false
  567. case CodeTypeGo:
  568. p.goTlsUprobesChecked = false
  569. p.openSslUprobesChecked = false
  570. default:
  571. }
  572. case APP_UPROBE_ERROR:
  573. klog.Infof("[DetachUprobes] ERROR_DETACH for pid %d", pid)
  574. default:
  575. }
  576. } else {
  577. return fmt.Errorf("[DetachUprobes] cannot find uprobe for pid %d", pid)
  578. }
  579. return nil
  580. }
  581. // 1.2卸载堆栈
  582. func (c *Container) DetachStack(pid uint32, detachType APP_TYPE) error {
  583. if p := c.processes[pid]; p != nil {
  584. var err error
  585. codeType := c.GetCodeTypeFromCache(pid)
  586. switch codeType {
  587. // 1.2.1 卸载 jvm堆栈
  588. case CodeTypeJava:
  589. err = c.detachJvmStack(pid)
  590. default:
  591. err = p.closeStackUprobes()
  592. }
  593. if err != nil {
  594. klog.WithError(err).Errorln("[detachStack] failed to detach stack")
  595. return err
  596. }
  597. p.stackAttachOnce = false
  598. } else {
  599. return fmt.Errorf("[DetachStack] cannot find uprobe for pid %d", pid)
  600. }
  601. return nil
  602. }
  603. // 1.2.1 卸载 jvm堆栈
  604. func (c *Container) detachJvmStack(pid uint32) error {
  605. if p := c.processes[pid]; p != nil {
  606. var err error
  607. // 卸载堆栈probes
  608. err = p.closeStackUprobes()
  609. klog.WithError(err).Infof("[detachJvmStack] closeStackUprobes")
  610. //if p.stackStatus.IsStackUprobesSuccess() || len(p.stackUprobes) > 0 {
  611. //}
  612. // 卸载 JavaAgent
  613. if p.stackStatus.IsJattachSuccess() {
  614. err = p.uninstallJavaAgent()
  615. klog.WithError(err).Infof("[detachJvmStack] uninstallJavaAgent")
  616. }
  617. return err
  618. }
  619. return nil
  620. }
  621. func (c *Container) getRootfs() string {
  622. if c.metadata != nil && c.metadata.rootfs != "" {
  623. return path.Join(*flags.HostDirPathPrefix, c.metadata.rootfs)
  624. }
  625. return ""
  626. }
  627. func (c *Container) BuildActiveApps(runtimeApps map[uint32]AppStatusInfo, pid uint32) {
  628. if c.AppInfo.AppName != "" {
  629. detail := AppStatusInfo{
  630. Pid: pid,
  631. ProcName: c.containerName,
  632. AppName: c.AppInfo.AppName,
  633. Language: c.AppInfo.CodeType.String(),
  634. AppID: c.AppInfo.AppIdHash.IntVal,
  635. AgentID: c.AppInfo.AgentId,
  636. InstanceID: c.AppInfo.InstanceIdHash.IntVal,
  637. Sn: c.AppInfo.Sn,
  638. Sport: c.AppInfo.Sport,
  639. RegisterAt: time.Unix(c.AppInfo.RegisterAt, 0).Format("060102 15:04:05"),
  640. PreStatus: c.AppInfo.PreStatus,
  641. Status: c.AppInfo.Status,
  642. Rule: c.WhiteSettingInfo.Filters,
  643. }
  644. detail.Rule = fmt.Sprintf("%s|%d", c.WhiteSettingInfo.Filters, c.WhiteSettingInfo.OpenStack)
  645. if c.AppInfo.UpdateAt != 0 {
  646. detail.UpdateAt = time.Unix(c.AppInfo.UpdateAt, 0).Format("060102 15:04:05")
  647. }
  648. p := c.processes[pid]
  649. if p != nil {
  650. detail.StackStatus = p.stackStatus.String()
  651. }
  652. runtimeApps[pid] = detail
  653. }
  654. }
  655. func (c *Container) AgentCtrl(r *Registry, pid uint32) {
  656. var err error
  657. verifyAttachConditions, openStack := c.verifyAttachConditions(r, pid)
  658. // UNINSTALL
  659. if r.isFusing && c.Isl7AttachSuccess() {
  660. c.Detach(pid, APP_FUSE)
  661. return
  662. }
  663. // verify UNINSTALL
  664. if !verifyAttachConditions && c.Isl7AttachSuccess() {
  665. c.Detach(pid, APP_UNINSTALL)
  666. return
  667. }
  668. if verifyAttachConditions {
  669. err = c.RegisterAppInfo(r, pid)
  670. if err != nil {
  671. klog.WithError(err).Errorf("[AgentCtrl] Failed registerAppInfo.")
  672. return
  673. }
  674. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes.")
  675. err = c.AttachUprobes(r.tracer, pid)
  676. if err != nil {
  677. klog.WithField("pid", pid).WithError(err).Errorf("[AgentCtrl] Failed attach uprobes error!")
  678. return
  679. } else {
  680. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes success!")
  681. }
  682. if openStack == OPEN_STACK {
  683. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach app stack.")
  684. err = c.AttachStack(r.tracer, pid)
  685. if err != nil {
  686. c.AppInfo.SetAppStackError()
  687. klog.WithField("pid", pid).WithError(err).Errorf("[AgentCtrl][end] Failed attach stack trace!")
  688. return
  689. }
  690. } else {
  691. // 关闭堆栈
  692. err = c.DetachStack(pid, APP_UNINSTALL)
  693. if err != nil {
  694. klog.WithError(err).Errorf("[AgentCtrl][end] Failed detach stack trace!")
  695. }
  696. }
  697. }
  698. }