container_apm.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854
  1. package containers
  2. import (
  3. "bufio"
  4. "bytes"
  5. "debug/elf"
  6. "fmt"
  7. "os"
  8. "path"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/cilium/ebpf/link"
  14. "github.com/coroot/coroot-node-agent/flags"
  15. "github.com/coroot/coroot-node-agent/ebpftracer"
  16. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  18. "github.com/coroot/coroot-node-agent/proc"
  19. "github.com/coroot/coroot-node-agent/tracing"
  20. "github.com/coroot/coroot-node-agent/utils"
  21. . "github.com/coroot/coroot-node-agent/utils/modelse"
  22. "github.com/pkg/errors"
  23. klog "github.com/sirupsen/logrus"
  24. semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
  25. "inet.af/netaddr"
  26. )
  27. const (
  28. TRACE_STATUS = 1
  29. )
  30. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  31. trace, ok := c.traceMap[traceId]
  32. return trace, ok
  33. }
  34. func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
  35. c.traceMap[traceId] = trace
  36. }
  37. // 查询或创建trace信息
  38. func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
  39. trace, ok := c.getTrace(traceId)
  40. if !ok {
  41. //new trace
  42. trace = tracing.NewTraceFromEvent(string(c.id))
  43. //create TraceMap
  44. c.createTraceMap(traceId, trace)
  45. //create ParentSpan
  46. trace.CreateRootSpan(traceId)
  47. }
  48. return trace, nil
  49. }
  50. func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  51. method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  52. ip, err := netaddr.ParseIP(hostIp)
  53. if err != nil {
  54. //fmt.Println("host ip error")
  55. hostIp = "127.0.0.1"
  56. }
  57. addr := netaddr.IPPortFrom(ip, port)
  58. trace := tracing.NewTrace(string(c.id), addr)
  59. if trace == nil {
  60. return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  61. }
  62. c.traceMap[traceId] = trace
  63. trace.TraceStart(method, path, r.Status, r.Duration)
  64. return nil
  65. }
  66. // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
  67. // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
  68. func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
  69. if t.AllEventReady(traceID) {
  70. t.SendEvent()
  71. klog.Debugf("SendEvent %d", traceID)
  72. //fmt.Println(t.GetSpan())
  73. //fmt.Println("===============")
  74. delete(c.traceMap, traceID)
  75. }
  76. }
  77. func (c *Container) valuableTrace(traceID uint64) bool {
  78. return traceID != 0
  79. }
  80. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  81. c.lock.Lock()
  82. defer c.lock.Unlock()
  83. if r.Protocol == l7.ProtocolDNS {
  84. ip2fqdn, _type, fqdn, ttl, ips := c.onDNSRequest(r)
  85. if c.l7Attach && c.valuableTrace(r.TraceId) {
  86. apmTrace, err := c.getOrInitTrace(r.TraceId)
  87. if err == nil {
  88. apmTrace.DNSTraceQueryEvent(r, _type, fqdn, ttl, ips)
  89. c.SendEvent(apmTrace, r.TraceId)
  90. }
  91. }
  92. return ip2fqdn
  93. }
  94. //if !c.valuableTrace(r.TraceId) {
  95. // return nil
  96. //}
  97. // klog.Infof("====ProtocolTrace+++++ start==== %d %d", pid, r.TraceId)
  98. // klog.Infof("====ProtocolTrace===== start==== %d %d", r.Protocol == l7.ProtocolTrace, c.l7Attach)
  99. if r.Protocol == l7.ProtocolTrace && c.l7Attach && c.valuableTrace(r.TraceId) {
  100. // klog.Infof("====ProtocolTrace---- start==== %d %d", pid, r.TraceId)
  101. if r.TraceStart == TRACE_STATUS {
  102. // klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId)
  103. trace, err := c.getOrInitTrace(r.TraceId)
  104. if c.AppInfo.AppName != "" {
  105. klog.Debugf("->>> [%s] -> payload:[%s]", c.AppInfo.AppName, r.Payload)
  106. }
  107. if err == nil {
  108. method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload)
  109. ip, _ := netaddr.ParseIP(sn)
  110. //codeType := c.GetCodeTypeFromCache(pid)
  111. trace.TraceStartEvent(method, requestURI, sn, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo())
  112. c.SendEvent(trace, r.TraceId)
  113. }
  114. return nil
  115. }
  116. if r.TraceEnd == TRACE_STATUS {
  117. klog.Debugf("====ProtocolTrace end==== %d %d", pid, r.TraceId)
  118. trace, err := c.getOrInitTrace(r.TraceId)
  119. if err == nil {
  120. trace.TraceEndEvent(r)
  121. c.SendEvent(trace, r.TraceId)
  122. }
  123. return nil
  124. }
  125. }
  126. if r.Protocol == l7.ProtocolHTTP {
  127. if c.l7Attach && c.valuableTrace(r.TraceId) {
  128. method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload)
  129. apmTrace, err := c.getOrInitTrace(r.TraceId)
  130. //fmt.Println("ProtocolHTTP-----", r.TraceId, err)
  131. if err == nil {
  132. apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
  133. c.SendEvent(apmTrace, r.TraceId)
  134. }
  135. }
  136. //return nil
  137. }
  138. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  139. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  140. if conn == nil {
  141. return nil
  142. }
  143. if timestamp != 0 && conn.Timestamp != timestamp {
  144. return nil
  145. }
  146. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  147. //trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  148. switch r.Protocol {
  149. case l7.ProtocolHTTP:
  150. if c.AppInfo.AppName != "" {
  151. klog.Debugf("[%s] ->>>>> curl -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, r.Payload)
  152. }
  153. stats.observe(r.Status.Http(), "", r.Duration)
  154. case l7.ProtocolHTTP2:
  155. if conn.http2Parser == nil {
  156. conn.http2Parser = l7.NewHttp2Parser()
  157. }
  158. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  159. for _, req := range requests {
  160. stats.observe(req.Status.Http(), "", req.Duration)
  161. //trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  162. }
  163. case l7.ProtocolPostgres:
  164. if r.Method != l7.MethodStatementClose {
  165. stats.observe(r.Status.String(), "", r.Duration)
  166. }
  167. //if conn.postgresParser == nil {
  168. // conn.postgresParser = l7.NewPostgresParser()
  169. //}
  170. //query := conn.postgresParser.Parse(r.Payload)
  171. //trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  172. if c.l7Attach && c.valuableTrace(r.TraceId) {
  173. if conn.postgresParser == nil {
  174. conn.postgresParser = l7.NewPostgresParser()
  175. }
  176. query := conn.postgresParser.Parse(r.Payload)
  177. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  178. //apmTrace, ok := c.getTrace(r.TraceId)
  179. apmTrace, err := c.getOrInitTrace(r.TraceId)
  180. //fmt.Println("mysql r.TraceId:", r.TraceId)
  181. //fmt.Println("ok:", ok)
  182. //fmt.Println("traceMap:", len(c.traceMap))
  183. if err == nil {
  184. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  185. //apmTrace.PostGreSqlTraceQueryEvent(query, r, conn.ActualDest)
  186. apmTrace.SQLTraceQueryEvent(l7.ProtocolPostgres, semconv.DBSystemPostgreSQL, query, r, conn.ActualDest)
  187. c.SendEvent(apmTrace, r.TraceId)
  188. }
  189. }
  190. case l7.ProtocolMysql:
  191. if r.Method != l7.MethodStatementClose {
  192. stats.observe(r.Status.String(), "", r.Duration)
  193. }
  194. if c.l7Attach && c.valuableTrace(r.TraceId) {
  195. if conn.mysqlParser == nil {
  196. conn.mysqlParser = l7.NewMysqlParser()
  197. }
  198. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  199. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  200. //apmTrace, ok := c.getTrace(r.TraceId)
  201. apmTrace, err := c.getOrInitTrace(r.TraceId)
  202. //fmt.Println("mysql r.TraceId:", r.TraceId)
  203. //fmt.Println("ok:", ok)
  204. //fmt.Println("traceMap:", len(c.traceMap))
  205. if err == nil {
  206. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  207. //apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
  208. apmTrace.SQLTraceQueryEvent(l7.ProtocolMysql, semconv.DBSystemMySQL, query, r, conn.ActualDest)
  209. c.SendEvent(apmTrace, r.TraceId)
  210. }
  211. }
  212. case l7.ProtocolDM:
  213. //统计dm的query次数
  214. stats.observe(r.Status.String(), "", r.Duration)
  215. //是否发送数据
  216. if c.l7Attach && c.valuableTrace(r.TraceId) {
  217. if conn.dmParser == nil {
  218. conn.dmParser = l7.NewDmParser()
  219. }
  220. query := conn.dmParser.Parse(r.Payload, r.StatementId)
  221. apmTrace, err := c.getOrInitTrace(r.TraceId)
  222. if err == nil {
  223. //apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest)
  224. apmTrace.SQLTraceQueryEvent(l7.ProtocolDM, semconv.DBSystemDaMengDB, query, r, conn.ActualDest)
  225. c.SendEvent(apmTrace, r.TraceId)
  226. }
  227. }
  228. case l7.ProtocolMemcached:
  229. stats.observe(r.Status.String(), "", r.Duration)
  230. if c.l7Attach && c.valuableTrace(r.TraceId) {
  231. }
  232. //cmd, items := l7.ParseMemcached(r.Payload)
  233. //trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  234. case l7.ProtocolRedis:
  235. stats.observe(r.Status.String(), "", r.Duration)
  236. if c.l7Attach && c.valuableTrace(r.TraceId) {
  237. cmd, args := l7.ParseRedis(r.Payload)
  238. //fmt.Println("cmd", cmd)
  239. //fmt.Println("args", args)
  240. //apmTrace, ok := c.getTrace(r.TraceId)
  241. apmTrace, err := c.getOrInitTrace(r.TraceId)
  242. if err == nil {
  243. //apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
  244. apmTrace.RedisTraceQueryEvent(cmd, args, r, conn.ActualDest)
  245. c.SendEvent(apmTrace, r.TraceId)
  246. }
  247. }
  248. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  249. case l7.ProtocolMongo:
  250. stats.observe(r.Status.String(), "", r.Duration)
  251. if c.l7Attach && c.valuableTrace(r.TraceId) {
  252. }
  253. //query := l7.ParseMongo(r.Payload)
  254. //trace.MongoQuery(query, r.Status.Error(), r.Duration)
  255. case l7.ProtocolKafka, l7.ProtocolCassandra:
  256. stats.observe(r.Status.String(), "", r.Duration)
  257. if c.l7Attach && c.valuableTrace(r.TraceId) {
  258. }
  259. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  260. stats.observe(r.Status.String(), r.Method.String(), 0)
  261. if c.l7Attach && c.valuableTrace(r.TraceId) {
  262. }
  263. case l7.ProtocolDubbo2:
  264. stats.observe(r.Status.String(), "", r.Duration)
  265. if c.l7Attach && c.valuableTrace(r.TraceId) {
  266. }
  267. }
  268. return nil
  269. }
  270. func (c *Container) buildIDs(pid uint32) bool {
  271. c.lock.Lock()
  272. defer c.lock.Unlock()
  273. p := c.processes[pid]
  274. if p != nil {
  275. p.cmdline = string(proc.GetRealCmdline(pid))
  276. }
  277. var sns []string
  278. var sport uint16
  279. for address, val := range c.getListens() {
  280. if val == 1 {
  281. ip := address.IP()
  282. if ip.Is4() && !ip.IsLoopback() {
  283. // 获取端口号
  284. sport = address.Port()
  285. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  286. ////c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  287. //c.AppInfo.Sn = ip.String()
  288. //c.AppInfo.Sport = int(port)
  289. //strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip.String(), port))
  290. //fmt.Println(port)
  291. ////os.Exit(1)
  292. //c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  293. //c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  294. ////c.AppInfo.InstanceId = c.instanceID.IntVal
  295. //strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid))))
  296. //c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  297. //c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  298. //return true
  299. }
  300. }
  301. }
  302. if len(sns) > 0 {
  303. //c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  304. snsStr := strings.Join(sns, ",")
  305. c.AppInfo.Sn = snsStr
  306. c.AppInfo.Sport = int(sport)
  307. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  308. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  309. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  310. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  311. // c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  312. c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  313. return true
  314. }
  315. return false
  316. }
  317. func (c *Container) ReBuildIds(pid uint32) {
  318. c.lock.Lock()
  319. defer c.lock.Unlock()
  320. var sns []string
  321. var sport uint16
  322. for address, val := range c.getListens() {
  323. if val == 1 {
  324. ip := address.IP()
  325. if ip.Is4() && !ip.IsLoopback() {
  326. // 获取端口号
  327. sport = address.Port()
  328. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  329. }
  330. }
  331. }
  332. if len(sns) > 0 {
  333. snsStr := strings.Join(sns, ",")
  334. c.AppInfo.Sn = snsStr
  335. c.AppInfo.Sport = int(sport)
  336. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  337. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  338. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  339. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  340. strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", utils.GetHostIP(), c.AppInfo.AppIdHash.IntVal))
  341. c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  342. c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  343. }
  344. }
  345. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  346. c.lock.Lock()
  347. defer c.lock.Unlock()
  348. // get the associated uprobe
  349. uprobe, err := c.GetUprobe(event, tracer)
  350. if err != nil {
  351. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  352. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  353. return
  354. }
  355. if event.TraceId <= 0 {
  356. //fmt.Println("StackProcess TraceId id 0")
  357. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  358. return
  359. }
  360. // fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  361. stackFun := ebpftracer.StackFunEvent{}
  362. stackFun.Uprobe = &uprobe
  363. stackFun.StackEvent = event
  364. apmTrace, ok := c.getTrace(event.TraceId)
  365. if ok {
  366. apmTrace.FunAdd(stackFun)
  367. }
  368. }
  369. func byteExtractString(nameString [100]byte) string {
  370. n := bytes.IndexFunc(nameString[:], func(r rune) bool {
  371. return r == 0 || r < 32 || r > 126 // 截取到第一个零值或非打印字符
  372. })
  373. if n == -1 {
  374. n = len(nameString) // 没找到零值或非打印字符,使用数组长度
  375. }
  376. return string(nameString[:n])
  377. }
  378. func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  379. c.lock.Lock()
  380. defer c.lock.Unlock()
  381. // get the associated uprobe
  382. switch event.Location {
  383. case 0: // ret
  384. Funcname := ""
  385. if event.Type != uint64(CodeTypeJava) {
  386. uprobe, err := c.GetUprobe(event, tracer)
  387. if err != nil {
  388. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  389. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  390. return
  391. }
  392. Funcname = uprobe.Funcname
  393. } else {
  394. ClassName := byteExtractString(event.ClassName)
  395. MethedName := byteExtractString(event.MethedName)
  396. Funcname = ClassName + "." + MethedName
  397. }
  398. if event.TraceId <= 0 {
  399. //fmt.Println("StackProcess TraceId id 0")
  400. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  401. return
  402. }
  403. //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  404. apmTrace, err := c.getOrInitTrace(event.TraceId)
  405. if err == nil {
  406. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  407. duration := event.TimeNsEnd - event.TimeNsStart
  408. apmTrace.FuncTraceQuery(Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
  409. c.SendEvent(apmTrace, event.TraceId)
  410. }
  411. }
  412. }
  413. // ResolveAddress returns the symbol(s) and offset of the given address.
  414. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  415. if addr == 0 {
  416. // err = errors.Wrapf(SymbolNotFoundError, "0")
  417. return
  418. }
  419. // symbols, _, err := e.Symbols()
  420. if err != nil {
  421. return
  422. }
  423. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  424. if idx == 0 {
  425. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  426. return
  427. }
  428. // why diff symbol may contains the same addr?
  429. sym := symbols[idx-1]
  430. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  431. syms = append(syms, symbols[i])
  432. }
  433. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  434. syms = append(syms, symbols[i])
  435. }
  436. return syms, uint(addr - sym.Value), nil
  437. }
  438. type MemoryMap struct {
  439. Start, End uint64
  440. }
  441. // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
  442. func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
  443. file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
  444. if err != nil {
  445. return nil, err
  446. }
  447. defer file.Close()
  448. scanner := bufio.NewScanner(file)
  449. if scanner.Scan() {
  450. fields := strings.Fields(scanner.Text())
  451. addresses := strings.Split(fields[0], "-")
  452. if len(addresses) != 2 {
  453. return nil, errors.New("unexpected format in /proc/<pid>/maps")
  454. }
  455. start, err := strconv.ParseUint(addresses[0], 16, 64)
  456. if err != nil {
  457. return nil, err
  458. }
  459. end, err := strconv.ParseUint(addresses[1], 16, 64)
  460. if err != nil {
  461. return nil, err
  462. }
  463. return &MemoryMap{
  464. Start: start,
  465. End: end,
  466. }, nil
  467. }
  468. if err := scanner.Err(); err != nil {
  469. return nil, err
  470. }
  471. return nil, errors.New("empty /proc/<pid>/maps")
  472. }
  473. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  474. //fmt.Println("GetUprobe entory:")
  475. memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
  476. Address := event.Ip - memoryMap.Start
  477. // fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
  478. for _, fun := range c.UprobesMap {
  479. funAddress := fun.Address + fun.AbsOffset
  480. // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
  481. if funAddress == Address {
  482. // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x ---- %s--%x\n", memoryMap.Start, event.Ip, fun.Funcname, fun.Address)
  483. return fun, nil
  484. }
  485. }
  486. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  487. if err != nil {
  488. return
  489. }
  490. for _, sym := range syms {
  491. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  492. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
  493. if ok {
  494. return uprobe, nil
  495. }
  496. }
  497. err = errors.New("uprobe not found")
  498. return
  499. }
  500. func (c *Container) GetAppInfo() AppInfo {
  501. return c.AppInfo
  502. }
  503. // 可注入前置
  504. func (c *Container) checkEventReady() bool {
  505. c.lock.Lock()
  506. defer c.lock.Unlock()
  507. return c.l7EventReady
  508. }
  509. func (c *Container) eventReady() {
  510. c.lock.Lock()
  511. defer c.lock.Unlock()
  512. c.l7EventReady = true
  513. }
  514. // uprobe前置
  515. func (c *Container) Isl7AttachSuccess() bool {
  516. c.lock.Lock()
  517. defer c.lock.Unlock()
  518. return c.l7Attach
  519. }
  520. func (c *Container) l7AttachSuccess() {
  521. c.lock.Lock()
  522. defer c.lock.Unlock()
  523. c.l7Attach = true
  524. }
  525. func (c *Container) ctrlStack(r *Registry, pid uint32) {
  526. resp, err := c.GetCodeSetting(r)
  527. if err != nil {
  528. klog.WithField("pid", pid).WithError(err).Error("[ctrlStack] GetCodeSetting failed.")
  529. return
  530. }
  531. if resp.BlackWhiteSettings.CollectStack == OPEN_STACK {
  532. // 有黑白名单规则 &&
  533. // 之前有注入 先卸载再注入
  534. // 之前没注入 直接注入
  535. // 没有有黑白名单 直接卸载
  536. if c.hasStackRule(resp) {
  537. if c.stackRuleUpdate(resp) {
  538. // 重新注入
  539. err = c.DetachStack(pid, APP_UNINSTALL)
  540. if err != nil {
  541. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  542. }
  543. }
  544. klog.WithField("pid", pid).Infoln("[ctrlStack] Attach app stack.")
  545. c.saveWhiteStackSettingInfo(resp)
  546. err = c.AttachStack(r.tracer, pid)
  547. if err != nil {
  548. c.AppInfo.SetAppStackError()
  549. klog.WithField("pid", pid).WithError(err).Errorf("[ctrlStack][end] Failed attach stack trace!")
  550. }
  551. } else {
  552. if c.noOrigRule() {
  553. return
  554. }
  555. c.saveWhiteStackSettingInfo(resp)
  556. // 关闭堆栈
  557. err = c.DetachStack(pid, APP_UNINSTALL)
  558. if err != nil {
  559. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  560. }
  561. }
  562. } else {
  563. if c.noOrigRule() {
  564. return
  565. }
  566. c.saveWhiteStackSettingInfo(resp)
  567. // 关闭堆栈
  568. err = c.DetachStack(pid, APP_UNINSTALL)
  569. if err != nil {
  570. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  571. }
  572. }
  573. }
  574. func (c *Container) verifyAttachConditions(r *Registry, pid uint32) (bool, int) {
  575. p := c.processes[pid]
  576. if p != nil && c.checkEventReady() {
  577. codeType := c.GetCodeTypeFromCache(pid)
  578. if codeType.IsUnknownCode() {
  579. klog.WithField("pid", pid).Debug("[verify] unknown language.")
  580. return false, 0
  581. }
  582. cmdline := p.GetCmdline()
  583. if len(cmdline) == 0 {
  584. return false, 0
  585. }
  586. //whiteListByCode := r.getWhiteListByCodeType(codeType)
  587. whiteListByCode := r.getWhiteListAll()
  588. //klog.WithField("pid", pid).WithField("codeType", codeType.String()).
  589. // Infof("[verify] white list %v", utils.ToString(whiteListByCode))
  590. // 当前语言的白名单规则
  591. for _, setting := range whiteListByCode {
  592. ruleVal := setting.Filters
  593. if ruleVal == "" {
  594. continue
  595. }
  596. // 判断规则
  597. if strings.Contains(cmdline, ruleVal) {
  598. //if !codeType.IsJvmCode() {
  599. // klog.WithField("pid", pid).Warning("[verify] This agent version only supports JVM applications.")
  600. // return false, 0
  601. //}
  602. c.WhiteSettingInfo.AppName = setting.AppName
  603. c.WhiteSettingInfo.Filters = setting.Filters
  604. klog.WithField("pid", pid).
  605. WithField("codeType", codeType.String()).
  606. WithField("ruleVal", ruleVal).
  607. WithField("cmdline", cmdline).
  608. //WithField("stack", setting.OpenStack).
  609. WithField("white list", utils.ToString(whiteListByCode)).
  610. Infoln("[verify] check successful.")
  611. return true, 0
  612. }
  613. }
  614. }
  615. return false, 0
  616. }
  617. // 1.卸载入口
  618. func (c *Container) Detach(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) {
  619. c.lock.Lock()
  620. defer c.lock.Unlock()
  621. if p := c.processes[pid]; p != nil {
  622. err := c.DetachUprobes(tracer, pid, detachType)
  623. if err != nil {
  624. klog.WithError(err).Errorln("DetachUprobes Error.")
  625. }
  626. err = c.DetachStack(pid, detachType)
  627. if err != nil {
  628. klog.WithError(err).Errorln("DetachStack Error.")
  629. }
  630. // 关闭7层监控
  631. c.l7Attach = false
  632. // 变更应用状态
  633. if err != nil {
  634. detachType = detachType.Error()
  635. }
  636. c.AppInfo.SetAppStatus(detachType)
  637. }
  638. }
  639. // 1.1卸载uprobe
  640. func (c *Container) DetachUprobes(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) error {
  641. // close uprobe
  642. if p := c.processes[pid]; p != nil {
  643. for _, u := range p.uprobes {
  644. err := u.Close()
  645. if err != nil {
  646. return err
  647. }
  648. }
  649. p.uprobes = []link.Link{}
  650. switch detachType {
  651. case APP_UNINSTALL, APP_FUSE:
  652. codeType := c.GetCodeTypeFromCache(pid)
  653. switch codeType {
  654. case CodeTypeJava:
  655. p.jvmAttachOnce = false
  656. case CodeTypeGo:
  657. p.goTlsUprobesChecked = false
  658. p.openSslUprobesChecked = false
  659. default:
  660. }
  661. case APP_UPROBE_ERROR:
  662. klog.Infof("[DetachUprobes] ERROR_DETACH for pid %d", pid)
  663. default:
  664. }
  665. //delete the proc info form proc_info_map(for kernel) when the uprobe detached
  666. if err := tracer.DelKProcInfo(pid); err != nil {
  667. return fmt.Errorf("[DetachUprobes] failed to delete KProcInfo for pid %d, detach type is:%s", pid, detachType)
  668. } else {
  669. klog.Infof("[DetachUprobes] delete KProcInfo success for pid %d,detachType:%d", pid, detachType)
  670. c.AppInfo.EBPFProcInfo = nil
  671. }
  672. } else {
  673. return fmt.Errorf("[DetachUprobes] cannot find uprobe for pid %d", pid)
  674. }
  675. return nil
  676. }
  677. // 1.2卸载堆栈
  678. func (c *Container) DetachStack(pid uint32, detachType APP_TYPE) error {
  679. if p := c.processes[pid]; p != nil {
  680. var err error
  681. codeType := c.GetCodeTypeFromCache(pid)
  682. switch codeType {
  683. // 1.2.1 卸载 jvm堆栈
  684. case CodeTypeJava:
  685. err = c.detachJvmStack(pid)
  686. default:
  687. err = p.closeStackUprobes()
  688. }
  689. if err != nil {
  690. klog.WithError(err).Errorln("[detachStack] failed to detach stack")
  691. return err
  692. }
  693. p.stackAttachOnce = false
  694. } else {
  695. return fmt.Errorf("[DetachStack] cannot find uprobe for pid %d", pid)
  696. }
  697. return nil
  698. }
  699. // 1.2.1 卸载 jvm堆栈
  700. func (c *Container) detachJvmStack(pid uint32) error {
  701. if p := c.processes[pid]; p != nil {
  702. //if p.stackStatus.IsStackUprobesSuccess() || len(p.stackUprobes) > 0 {
  703. //}
  704. // 卸载 JavaAgent
  705. var err error
  706. if p.stackStatus.IsJattachSuccess() {
  707. // 卸载堆栈probes
  708. err = p.closeStackUprobes()
  709. if err != nil {
  710. klog.WithError(err).Errorf("[detachJvmStack] closeStackUprobes")
  711. }
  712. err = p.uninstallJavaAgent()
  713. if err != nil {
  714. klog.WithError(err).Errorf("[detachJvmStack] uninstallJavaAgent")
  715. }
  716. }
  717. return err
  718. }
  719. return nil
  720. }
  721. func (c *Container) getRootfs() string {
  722. if c.metadata != nil && c.metadata.rootfs != "" {
  723. return path.Join(*flags.HostDirPathPrefix, c.metadata.rootfs)
  724. }
  725. return ""
  726. }
  727. func (c *Container) BuildActiveApps(runtimeApps map[uint32]AppStatusInfo, pid uint32) {
  728. if c != nil && c.AppInfo.AppName != "" {
  729. detail := AppStatusInfo{
  730. Pid: pid,
  731. ProcName: c.containerName,
  732. AppName: c.AppInfo.AppName,
  733. Language: c.AppInfo.CodeType.String(),
  734. AppID: c.AppInfo.AppIdHash.IntVal,
  735. AgentID: c.AppInfo.AgentId,
  736. InstanceID: c.AppInfo.InstanceIdHash.IntVal,
  737. Sn: c.AppInfo.Sn,
  738. Sport: c.AppInfo.Sport,
  739. RegisterAt: time.Unix(c.AppInfo.RegisterAt, 0).Format("060102 15:04:05"),
  740. PreStatus: c.AppInfo.PreStatus,
  741. Status: c.AppInfo.Status,
  742. Rule: c.WhiteSettingInfo.Filters,
  743. }
  744. detail.Rule = fmt.Sprintf("%s|%d", c.WhiteSettingInfo.Filters, c.WhiteSettingInfo.WhiteStackSettingInfo.OpenStack)
  745. if c.AppInfo.UpdateAt != 0 {
  746. detail.UpdateAt = time.Unix(c.AppInfo.UpdateAt, 0).Format("060102 15:04:05")
  747. }
  748. p := c.processes[pid]
  749. if p != nil {
  750. detail.StackStatus = p.stackStatus.String()
  751. detail.StackStatus += fmt.Sprintf("|vFailed:%v", p.versionFailed)
  752. }
  753. runtimeApps[pid] = detail
  754. }
  755. }
  756. func (c *Container) AgentCtrl(r *Registry, pid uint32) {
  757. var err error
  758. verifyAttachConditions, _ := c.verifyAttachConditions(r, pid)
  759. // UNINSTALL
  760. if r.isFusing && c.Isl7AttachSuccess() {
  761. c.Detach(r.tracer, pid, APP_FUSE)
  762. return
  763. }
  764. // verify UNINSTALL
  765. if !verifyAttachConditions && c.Isl7AttachSuccess() {
  766. c.Detach(r.tracer, pid, APP_UNINSTALL)
  767. return
  768. }
  769. if verifyAttachConditions {
  770. err = c.RegisterAppInfo(r, pid)
  771. if err != nil {
  772. klog.WithError(err).Errorf("[AgentCtrl] Failed registerAppInfo.")
  773. return
  774. }
  775. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes.")
  776. err = c.AttachUprobes(r.tracer, pid)
  777. if err != nil {
  778. klog.WithField("pid", pid).WithError(err).Errorf("[AgentCtrl] Failed attach uprobes error!")
  779. return
  780. } else {
  781. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes success!")
  782. }
  783. // 堆栈控制
  784. c.ctrlStack(r, pid)
  785. }
  786. }