container_apm.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897
  1. package containers
  2. import (
  3. "bufio"
  4. "bytes"
  5. "debug/elf"
  6. "fmt"
  7. "os"
  8. "path"
  9. "sort"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/cilium/ebpf/link"
  14. "github.com/coroot/coroot-node-agent/flags"
  15. "github.com/coroot/coroot-node-agent/ebpftracer"
  16. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  18. "github.com/coroot/coroot-node-agent/proc"
  19. "github.com/coroot/coroot-node-agent/tracing"
  20. "github.com/coroot/coroot-node-agent/utils"
  21. . "github.com/coroot/coroot-node-agent/utils/modelse"
  22. "github.com/pkg/errors"
  23. klog "github.com/sirupsen/logrus"
  24. semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
  25. "inet.af/netaddr"
  26. )
  27. const (
  28. TRACE_STATUS = 1
  29. )
  30. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  31. trace, ok := c.traceMap[traceId]
  32. return trace, ok
  33. }
  34. func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
  35. c.traceMap[traceId] = trace
  36. }
  37. // 查询或创建trace信息
  38. func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
  39. trace, ok := c.getTrace(traceId)
  40. if !ok {
  41. //new trace
  42. trace = tracing.NewTraceFromEvent(string(c.id))
  43. //create TraceMap
  44. c.createTraceMap(traceId, trace)
  45. //create ParentSpan
  46. trace.CreateRootSpan(traceId)
  47. }
  48. return trace, nil
  49. }
  50. // Deprecated: InitTrace not used
  51. //func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  52. // method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  53. // ip, err := netaddr.ParseIP(hostIp)
  54. // if err != nil {
  55. // //fmt.Println("host ip error")
  56. // hostIp = "127.0.0.1"
  57. // }
  58. // addr := netaddr.IPPortFrom(ip, port)
  59. // trace := tracing.NewTrace(string(c.id), addr)
  60. // if trace == nil {
  61. // return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  62. // }
  63. // c.traceMap[traceId] = trace
  64. //
  65. // trace.TraceStart(method, path, r.Status, r.Duration)
  66. // return nil
  67. //}
  68. // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
  69. // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
  70. func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
  71. if t.AllEventReady(traceID) {
  72. t.SendEvent()
  73. klog.Debugf("SendEvent %d", traceID)
  74. //fmt.Println(t.GetSpan())
  75. //fmt.Println("===============")
  76. delete(c.traceMap, traceID)
  77. }
  78. }
  79. func (c *Container) valuableTrace(traceID uint64) bool {
  80. return traceID != 0
  81. }
  82. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  83. c.lock.Lock()
  84. defer c.lock.Unlock()
  85. if r.Protocol == l7.ProtocolDNS {
  86. ip2fqdn, _type, fqdn, ttl, ips := c.onDNSRequest(r)
  87. if c.l7Attach && c.valuableTrace(r.TraceId) {
  88. apmTrace, err := c.getOrInitTrace(r.TraceId)
  89. if err == nil {
  90. apmTrace.DNSTraceQueryEvent(r, _type, fqdn, ttl, ips)
  91. c.SendEvent(apmTrace, r.TraceId)
  92. }
  93. }
  94. return ip2fqdn
  95. }
  96. //if !c.valuableTrace(r.TraceId) {
  97. // return nil
  98. //}
  99. //klog.Infof("====ProtocolTrace+++++ start==== %d %d", pid, r.TraceId)
  100. // klog.Infof("====ProtocolTrace===== start==== %d %d", r.Protocol == l7.ProtocolTrace, c.l7Attach)
  101. if r.Protocol == l7.ProtocolTrace && c.l7Attach && c.valuableTrace(r.TraceId) {
  102. // klog.Infof("====ProtocolTrace---- start==== %d %d", pid, r.TraceId)
  103. if r.TraceStart == TRACE_STATUS {
  104. // klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId)
  105. trace, err := c.getOrInitTrace(r.TraceId)
  106. if c.AppInfo.AppName != "" {
  107. klog.Debugf("->>> [%s] -> payload:[%s]", c.AppInfo.AppName, r.Payload)
  108. }
  109. if err == nil {
  110. method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
  111. ip, _ := netaddr.ParseIP(sn)
  112. //codeType := c.GetCodeTypeFromCache(pid)
  113. container_id := ""
  114. if c.cgroup != nil {
  115. container_id = c.cgroup.ContainerId
  116. }
  117. trace.TraceStartEvent(method, requestURI, sn, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo(), container_id)
  118. c.SendEvent(trace, r.TraceId)
  119. }
  120. return nil
  121. }
  122. if r.TraceEnd == TRACE_STATUS {
  123. klog.Debugf("====ProtocolTrace end==== %d %d", pid, r.TraceId)
  124. trace, err := c.getOrInitTrace(r.TraceId)
  125. if err == nil {
  126. trace.TraceEndEvent(r)
  127. c.SendEvent(trace, r.TraceId)
  128. }
  129. return nil
  130. }
  131. }
  132. if r.Protocol == l7.ProtocolHTTP {
  133. if c.l7Attach && c.valuableTrace(r.TraceId) {
  134. method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
  135. apmTrace, err := c.getOrInitTrace(r.TraceId)
  136. //fmt.Println("ProtocolHTTP-----", r.TraceId, err)
  137. if err == nil {
  138. apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
  139. c.SendEvent(apmTrace, r.TraceId)
  140. }
  141. }
  142. //return nil
  143. }
  144. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  145. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  146. if conn == nil {
  147. return nil
  148. }
  149. if timestamp != 0 && conn.Timestamp != timestamp {
  150. return nil
  151. }
  152. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  153. //trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  154. switch r.Protocol {
  155. case l7.ProtocolHTTP:
  156. if c.AppInfo.AppName != "" {
  157. klog.Debugf("[%s] ->>>>> curl -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, r.Payload)
  158. }
  159. stats.observe(r.Status.Http(), "", r.Duration)
  160. case l7.ProtocolHTTP2:
  161. if conn.http2Parser == nil {
  162. conn.http2Parser = l7.NewHttp2Parser()
  163. }
  164. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  165. for _, req := range requests {
  166. stats.observe(req.Status.Http(), "", req.Duration)
  167. //trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  168. }
  169. case l7.ProtocolPostgres:
  170. if r.Method != l7.MethodStatementClose {
  171. stats.observe(r.Status.String(), "", r.Duration)
  172. }
  173. //if conn.postgresParser == nil {
  174. // conn.postgresParser = l7.NewPostgresParser()
  175. //}
  176. //query := conn.postgresParser.Parse(r.Payload)
  177. //trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  178. if c.l7Attach && c.valuableTrace(r.TraceId) {
  179. if conn.postgresParser == nil {
  180. conn.postgresParser = l7.NewPostgresParser()
  181. }
  182. query := conn.postgresParser.Parse(r.Payload)
  183. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  184. if c.AppInfo.AppName != "" {
  185. klog.Debugf("[%s] ->>>>> Postgres -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
  186. }
  187. //apmTrace, ok := c.getTrace(r.TraceId)
  188. apmTrace, err := c.getOrInitTrace(r.TraceId)
  189. //fmt.Println("mysql r.TraceId:", r.TraceId)
  190. //fmt.Println("ok:", ok)
  191. //fmt.Println("traceMap:", len(c.traceMap))
  192. if err == nil {
  193. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  194. //apmTrace.PostGreSqlTraceQueryEvent(query, r, conn.ActualDest)
  195. apmTrace.SQLTraceQueryEvent(l7.ProtocolPostgres, semconv.DBSystemPostgreSQL, query, r, conn.ActualDest)
  196. c.SendEvent(apmTrace, r.TraceId)
  197. }
  198. }
  199. case l7.ProtocolMysql:
  200. if r.Method != l7.MethodStatementClose {
  201. stats.observe(r.Status.String(), "", r.Duration)
  202. }
  203. if c.l7Attach && c.valuableTrace(r.TraceId) {
  204. if conn.mysqlParser == nil {
  205. conn.mysqlParser = l7.NewMysqlParser()
  206. }
  207. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  208. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  209. if c.AppInfo.AppName != "" {
  210. klog.Debugf("[%s] ->>>>> Mysql -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
  211. }
  212. //apmTrace, ok := c.getTrace(r.TraceId)
  213. apmTrace, err := c.getOrInitTrace(r.TraceId)
  214. //fmt.Println("mysql r.TraceId:", r.TraceId)
  215. //fmt.Println("ok:", ok)
  216. //fmt.Println("traceMap:", len(c.traceMap))
  217. if err == nil {
  218. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  219. //apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
  220. apmTrace.SQLTraceQueryEvent(l7.ProtocolMysql, semconv.DBSystemMySQL, query, r, conn.ActualDest)
  221. c.SendEvent(apmTrace, r.TraceId)
  222. }
  223. }
  224. case l7.ProtocolDM:
  225. //统计dm的query次数
  226. stats.observe(r.Status.String(), "", r.Duration)
  227. //是否发送数据
  228. if c.l7Attach && c.valuableTrace(r.TraceId) {
  229. if conn.dmParser == nil {
  230. conn.dmParser = l7.NewDmParser()
  231. }
  232. query := conn.dmParser.Parse(r.Payload, r.StatementId)
  233. if c.AppInfo.AppName != "" {
  234. klog.Debugf("[%s] ->>>>> Mysql -> %s DMSQL:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
  235. }
  236. apmTrace, err := c.getOrInitTrace(r.TraceId)
  237. if err == nil {
  238. //apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest)
  239. apmTrace.SQLTraceQueryEvent(l7.ProtocolDM, semconv.DBSystemDaMengDB, query, r, conn.ActualDest)
  240. c.SendEvent(apmTrace, r.TraceId)
  241. }
  242. }
  243. case l7.ProtocolMemcached:
  244. stats.observe(r.Status.String(), "", r.Duration)
  245. if c.l7Attach && c.valuableTrace(r.TraceId) {
  246. }
  247. //cmd, items := l7.ParseMemcached(r.Payload)
  248. //trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  249. case l7.ProtocolRedis:
  250. stats.observe(r.Status.String(), "", r.Duration)
  251. if c.l7Attach && c.valuableTrace(r.TraceId) {
  252. cmd, args := l7.ParseRedis(r.Payload)
  253. //fmt.Println("cmd", cmd)
  254. //fmt.Println("args", args)
  255. //apmTrace, ok := c.getTrace(r.TraceId)
  256. if c.AppInfo.AppName != "" {
  257. klog.Debugf("[%s] ->>>>> Redis -> %s DMSQL:[%s]", c.AppInfo.AppName, conn.ActualDest, cmd)
  258. }
  259. apmTrace, err := c.getOrInitTrace(r.TraceId)
  260. if err == nil {
  261. //apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
  262. apmTrace.RedisTraceQueryEvent(cmd, args, r, conn.ActualDest)
  263. c.SendEvent(apmTrace, r.TraceId)
  264. }
  265. }
  266. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  267. case l7.ProtocolMongo:
  268. stats.observe(r.Status.String(), "", r.Duration)
  269. if c.l7Attach && c.valuableTrace(r.TraceId) {
  270. query := l7.ParseMongo(r.Payload)
  271. if c.AppInfo.AppName != "" {
  272. klog.Debugf("[%s] ->>>>> MongoDB -> %s SQL:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
  273. }
  274. apmTrace, err := c.getOrInitTrace(r.TraceId)
  275. if err == nil {
  276. //apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
  277. apmTrace.MongoTraceQueryEvent(query, r, conn.ActualDest)
  278. c.SendEvent(apmTrace, r.TraceId)
  279. }
  280. }
  281. case l7.ProtocolKafka, l7.ProtocolCassandra:
  282. stats.observe(r.Status.String(), "", r.Duration)
  283. if c.l7Attach && c.valuableTrace(r.TraceId) {
  284. }
  285. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  286. stats.observe(r.Status.String(), r.Method.String(), 0)
  287. if c.l7Attach && c.valuableTrace(r.TraceId) {
  288. }
  289. case l7.ProtocolDubbo2:
  290. stats.observe(r.Status.String(), "", r.Duration)
  291. if c.l7Attach && c.valuableTrace(r.TraceId) {
  292. }
  293. }
  294. return nil
  295. }
  296. func (c *Container) buildIDs(pid uint32) bool {
  297. c.lock.Lock()
  298. defer c.lock.Unlock()
  299. p := c.processes[pid]
  300. if p != nil {
  301. p.cmdline = string(proc.GetRealCmdline(pid))
  302. }
  303. var sns []string
  304. var sport uint16
  305. for address, val := range c.getListens() {
  306. if val == 1 {
  307. ip := address.IP()
  308. if ip.Is4() && !ip.IsLoopback() {
  309. // 获取端口号
  310. sport = address.Port()
  311. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  312. ////c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  313. //c.AppInfo.Sn = ip.String()
  314. //c.AppInfo.Sport = int(port)
  315. //strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip.String(), port))
  316. //fmt.Println(port)
  317. ////os.Exit(1)
  318. //c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  319. //c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  320. ////c.AppInfo.InstanceId = c.instanceID.IntVal
  321. //strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid))))
  322. //c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  323. //c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  324. //return true
  325. }
  326. }
  327. }
  328. if len(sns) > 0 {
  329. //c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  330. snsStr := strings.Join(sns, ",")
  331. c.AppInfo.Sn = snsStr
  332. c.AppInfo.Sport = int(sport)
  333. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  334. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  335. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  336. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  337. // c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  338. c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  339. return true
  340. }
  341. return false
  342. }
  343. func (c *Container) ReBuildIds(pid uint32) {
  344. c.lock.Lock()
  345. defer c.lock.Unlock()
  346. var sns []string
  347. var sport uint16
  348. for address, val := range c.getListens() {
  349. if val == 1 {
  350. ip := address.IP()
  351. if ip.Is4() && !ip.IsLoopback() {
  352. // 获取端口号
  353. sport = address.Port()
  354. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  355. }
  356. }
  357. }
  358. if len(sns) > 0 {
  359. snsStr := strings.Join(sns, ",")
  360. c.AppInfo.Sn = snsStr
  361. c.AppInfo.Sport = int(sport)
  362. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  363. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  364. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  365. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  366. strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", utils.GetHostIP(), c.AppInfo.AppIdHash.IntVal))
  367. c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  368. c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  369. }
  370. }
  371. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  372. c.lock.Lock()
  373. defer c.lock.Unlock()
  374. // get the associated uprobe
  375. uprobe, err := c.GetUprobe(event, tracer)
  376. if err != nil {
  377. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  378. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  379. return
  380. }
  381. if event.TraceId <= 0 {
  382. //fmt.Println("StackProcess TraceId id 0")
  383. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  384. return
  385. }
  386. // fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  387. stackFun := ebpftracer.StackFunEvent{}
  388. stackFun.Uprobe = &uprobe
  389. stackFun.StackEvent = event
  390. apmTrace, ok := c.getTrace(event.TraceId)
  391. if ok {
  392. apmTrace.FunAdd(stackFun)
  393. }
  394. }
  395. func byteExtractString(nameString [100]byte) string {
  396. n := bytes.IndexFunc(nameString[:], func(r rune) bool {
  397. return r == 0 || r < 32 || r > 126 // 截取到第一个零值或非打印字符
  398. })
  399. if n == -1 {
  400. n = len(nameString) // 没找到零值或非打印字符,使用数组长度
  401. }
  402. return string(nameString[:n])
  403. }
  404. func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  405. c.lock.Lock()
  406. defer c.lock.Unlock()
  407. // get the associated uprobe
  408. switch event.Location {
  409. case 0: // ret
  410. Funcname := ""
  411. if event.Type != uint64(CodeTypeJava) {
  412. uprobe, err := c.GetUprobe(event, tracer)
  413. if err != nil {
  414. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  415. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  416. return
  417. }
  418. Funcname = uprobe.Funcname
  419. } else {
  420. ClassName := byteExtractString(event.ClassName)
  421. MethedName := byteExtractString(event.MethedName)
  422. Funcname = ClassName + "." + MethedName
  423. }
  424. if event.TraceId <= 0 {
  425. //fmt.Println("StackProcess TraceId id 0")
  426. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  427. return
  428. }
  429. //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  430. apmTrace, err := c.getOrInitTrace(event.TraceId)
  431. if err == nil {
  432. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  433. duration := event.TimeNsEnd - event.TimeNsStart
  434. apmTrace.FuncTraceQuery(Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
  435. c.SendEvent(apmTrace, event.TraceId)
  436. }
  437. }
  438. }
  439. // ResolveAddress returns the symbol(s) and offset of the given address.
  440. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  441. if addr == 0 {
  442. // err = errors.Wrapf(SymbolNotFoundError, "0")
  443. return
  444. }
  445. // symbols, _, err := e.Symbols()
  446. if err != nil {
  447. return
  448. }
  449. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  450. if idx == 0 {
  451. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  452. return
  453. }
  454. // why diff symbol may contains the same addr?
  455. sym := symbols[idx-1]
  456. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  457. syms = append(syms, symbols[i])
  458. }
  459. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  460. syms = append(syms, symbols[i])
  461. }
  462. return syms, uint(addr - sym.Value), nil
  463. }
  464. type MemoryMap struct {
  465. Start, End uint64
  466. }
  467. // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
  468. func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
  469. file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
  470. if err != nil {
  471. return nil, err
  472. }
  473. defer file.Close()
  474. scanner := bufio.NewScanner(file)
  475. if scanner.Scan() {
  476. fields := strings.Fields(scanner.Text())
  477. addresses := strings.Split(fields[0], "-")
  478. if len(addresses) != 2 {
  479. return nil, errors.New("unexpected format in /proc/<pid>/maps")
  480. }
  481. start, err := strconv.ParseUint(addresses[0], 16, 64)
  482. if err != nil {
  483. return nil, err
  484. }
  485. end, err := strconv.ParseUint(addresses[1], 16, 64)
  486. if err != nil {
  487. return nil, err
  488. }
  489. return &MemoryMap{
  490. Start: start,
  491. End: end,
  492. }, nil
  493. }
  494. if err := scanner.Err(); err != nil {
  495. return nil, err
  496. }
  497. return nil, errors.New("empty /proc/<pid>/maps")
  498. }
  499. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  500. //fmt.Println("GetUprobe entory:")
  501. memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
  502. Address := event.Ip - memoryMap.Start
  503. // fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
  504. for _, fun := range c.UprobesMap {
  505. funAddress := fun.Address + fun.AbsOffset
  506. // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
  507. if funAddress == Address {
  508. // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x ---- %s--%x\n", memoryMap.Start, event.Ip, fun.Funcname, fun.Address)
  509. return fun, nil
  510. }
  511. }
  512. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  513. if err != nil {
  514. return
  515. }
  516. for _, sym := range syms {
  517. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  518. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
  519. if ok {
  520. return uprobe, nil
  521. }
  522. }
  523. err = errors.New("uprobe not found")
  524. return
  525. }
  526. func (c *Container) GetAppInfo() AppInfo {
  527. return c.AppInfo
  528. }
  529. // 可注入前置
  530. func (c *Container) checkEventReady() bool {
  531. c.lock.Lock()
  532. defer c.lock.Unlock()
  533. return c.l7EventReady
  534. }
  535. func (c *Container) eventReady() {
  536. c.lock.Lock()
  537. defer c.lock.Unlock()
  538. c.l7EventReady = true
  539. }
  540. // uprobe前置
  541. func (c *Container) Isl7AttachSuccess() bool {
  542. c.lock.Lock()
  543. defer c.lock.Unlock()
  544. return c.l7Attach
  545. }
  546. func (c *Container) l7AttachSuccess() {
  547. c.lock.Lock()
  548. defer c.lock.Unlock()
  549. c.l7Attach = true
  550. }
  551. func (c *Container) ctrlStack(r *Registry, pid uint32) {
  552. resp, err := c.GetCodeSetting(r)
  553. if err != nil {
  554. klog.WithField("pid", pid).WithError(err).Error("[ctrlStack] GetCodeSetting failed.")
  555. return
  556. }
  557. if resp.BlackWhiteSettings.CollectStack == OPEN_STACK {
  558. // 有黑白名单规则 &&
  559. // 之前有注入 先卸载再注入
  560. // 之前没注入 直接注入
  561. // 没有有黑白名单 直接卸载
  562. if c.hasStackRule(resp) {
  563. if c.stackRuleUpdate(resp) {
  564. // 重新注入
  565. err = c.DetachStack(pid, APP_UNINSTALL)
  566. if err != nil {
  567. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  568. }
  569. }
  570. klog.WithField("pid", pid).Infoln("[ctrlStack] Attach app stack.")
  571. c.saveWhiteStackSettingInfo(resp)
  572. err = c.AttachStack(r.tracer, pid)
  573. if err != nil {
  574. c.AppInfo.SetAppStackError()
  575. klog.WithField("pid", pid).WithError(err).Errorf("[ctrlStack][end] Failed attach stack trace!")
  576. }
  577. } else {
  578. if c.noOrigRule() {
  579. return
  580. }
  581. c.saveWhiteStackSettingInfo(resp)
  582. // 关闭堆栈
  583. err = c.DetachStack(pid, APP_UNINSTALL)
  584. if err != nil {
  585. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  586. }
  587. }
  588. } else {
  589. if c.noOrigRule() {
  590. return
  591. }
  592. c.saveWhiteStackSettingInfo(resp)
  593. // 关闭堆栈
  594. err = c.DetachStack(pid, APP_UNINSTALL)
  595. if err != nil {
  596. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  597. }
  598. }
  599. }
  600. func (c *Container) verifyAttachConditions(r *Registry, pid uint32) (bool, int) {
  601. p := c.processes[pid]
  602. if p != nil && c.checkEventReady() {
  603. codeType := c.GetCodeTypeFromCache(pid)
  604. if codeType.IsUnknownCode() {
  605. klog.WithField("pid", pid).Debug("[verify] unknown language.")
  606. return false, 0
  607. }
  608. cmdline := p.GetCmdline()
  609. if len(cmdline) == 0 {
  610. return false, 0
  611. }
  612. //whiteListByCode := r.getWhiteListByCodeType(codeType)
  613. whiteListByCode := r.getWhiteListAll()
  614. //klog.WithField("pid", pid).WithField("codeType", codeType.String()).
  615. // Infof("[verify] white list %v", utils.ToString(whiteListByCode))
  616. // 当前语言的白名单规则
  617. for _, setting := range whiteListByCode {
  618. ruleVal := setting.Filters
  619. if ruleVal == "" {
  620. continue
  621. }
  622. // 判断规则
  623. if strings.Contains(cmdline, ruleVal) {
  624. //if !codeType.IsJvmCode() {
  625. // klog.WithField("pid", pid).Warning("[verify] This agent version only supports JVM applications.")
  626. // return false, 0
  627. //}
  628. c.WhiteSettingInfo.AppName = setting.AppName
  629. c.WhiteSettingInfo.Filters = setting.Filters
  630. klog.WithField("pid", pid).
  631. WithField("codeType", codeType.String()).
  632. WithField("ruleVal", ruleVal).
  633. WithField("cmdline", cmdline).
  634. //WithField("stack", setting.OpenStack).
  635. WithField("white list", utils.ToString(whiteListByCode)).
  636. Infoln("[verify] check successful.")
  637. return true, 0
  638. }
  639. }
  640. }
  641. return false, 0
  642. }
  643. // 1.卸载入口
  644. func (c *Container) Detach(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) {
  645. c.lock.Lock()
  646. defer c.lock.Unlock()
  647. if p := c.processes[pid]; p != nil {
  648. err := c.DetachUprobes(tracer, pid, detachType)
  649. if err != nil {
  650. klog.WithError(err).Errorln("DetachUprobes Error.")
  651. }
  652. err = c.DetachStack(pid, detachType)
  653. if err != nil {
  654. klog.WithError(err).Errorln("DetachStack Error.")
  655. }
  656. // 关闭7层监控
  657. c.l7Attach = false
  658. // 变更应用状态
  659. if err != nil {
  660. detachType = detachType.Error()
  661. }
  662. c.AppInfo.SetAppStatus(detachType)
  663. }
  664. }
  665. // 1.1卸载uprobe
  666. func (c *Container) DetachUprobes(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) error {
  667. // close uprobe
  668. if p := c.processes[pid]; p != nil {
  669. for _, u := range p.uprobes {
  670. err := u.Close()
  671. if err != nil {
  672. return err
  673. }
  674. }
  675. p.uprobes = []link.Link{}
  676. switch detachType {
  677. case APP_UNINSTALL, APP_FUSE:
  678. codeType := c.GetCodeTypeFromCache(pid)
  679. switch codeType {
  680. case CodeTypeJava:
  681. p.jvmAttachOnce = false
  682. case CodeTypeGo:
  683. p.goTlsUprobesChecked = false
  684. p.openSslUprobesChecked = false
  685. default:
  686. }
  687. case APP_UPROBE_ERROR:
  688. klog.Infof("[DetachUprobes] ERROR_DETACH for pid %d", pid)
  689. default:
  690. }
  691. //delete the proc info form proc_info_map(for kernel) when the uprobe detached
  692. if err := tracer.DelKProcInfo(pid); err != nil {
  693. return fmt.Errorf("[DetachUprobes] failed to delete KProcInfo for pid %d, detach type is:%s", pid, detachType)
  694. } else {
  695. klog.Infof("[DetachUprobes] delete KProcInfo success for pid %d,detachType:%d", pid, detachType)
  696. c.AppInfo.EBPFProcInfo = nil
  697. }
  698. } else {
  699. return fmt.Errorf("[DetachUprobes] cannot find uprobe for pid %d", pid)
  700. }
  701. return nil
  702. }
  703. // 1.2卸载堆栈
  704. func (c *Container) DetachStack(pid uint32, detachType APP_TYPE) error {
  705. if p := c.processes[pid]; p != nil {
  706. var err error
  707. codeType := c.GetCodeTypeFromCache(pid)
  708. switch codeType {
  709. // 1.2.1 卸载 jvm堆栈
  710. case CodeTypeJava:
  711. err = c.detachJvmStack(pid)
  712. default:
  713. err = p.closeStackUprobes()
  714. }
  715. if err != nil {
  716. klog.WithError(err).Errorln("[detachStack] failed to detach stack")
  717. return err
  718. }
  719. p.stackAttachOnce = false
  720. } else {
  721. return fmt.Errorf("[DetachStack] cannot find uprobe for pid %d", pid)
  722. }
  723. return nil
  724. }
  725. // 1.2.1 卸载 jvm堆栈
  726. func (c *Container) detachJvmStack(pid uint32) error {
  727. if p := c.processes[pid]; p != nil {
  728. //if p.stackStatus.IsStackUprobesSuccess() || len(p.stackUprobes) > 0 {
  729. //}
  730. // 卸载 JavaAgent
  731. var err error
  732. if p.stackStatus.IsJattachSuccess() {
  733. // 卸载堆栈probes
  734. err = p.closeStackUprobes()
  735. if err != nil {
  736. klog.WithError(err).Errorf("[detachJvmStack] closeStackUprobes")
  737. }
  738. err = p.uninstallJavaAgent()
  739. if err != nil {
  740. klog.WithError(err).Errorf("[detachJvmStack] uninstallJavaAgent")
  741. }
  742. }
  743. return err
  744. }
  745. return nil
  746. }
  747. func (c *Container) getRootfs() string {
  748. if c.metadata != nil && c.metadata.rootfs != "" {
  749. return path.Join(*flags.HostDirPathPrefix, c.metadata.rootfs)
  750. }
  751. return ""
  752. }
  753. func (c *Container) BuildActiveApps(runtimeApps map[uint32]AppStatusInfo, pid uint32) {
  754. if c == nil {
  755. //klog.WithField("pid", pid).Warningln("[BuildActiveApps] container_apm is nil.")
  756. return
  757. }
  758. if c.AppInfo.AppName == "" {
  759. return
  760. }
  761. klog.WithField("pid", pid).WithField("appname", c.AppInfo.AppName).Infof("[BuildActiveApps] container %s is running.", c.AppInfo.AppName)
  762. detail := AppStatusInfo{
  763. Pid: pid,
  764. ProcName: c.containerName,
  765. AppName: c.AppInfo.AppName,
  766. Language: c.AppInfo.CodeType.String(),
  767. AppID: c.AppInfo.AppIdHash.IntVal,
  768. AgentID: c.AppInfo.AgentId,
  769. InstanceID: c.AppInfo.InstanceIdHash.IntVal,
  770. Sn: c.AppInfo.Sn,
  771. Sport: c.AppInfo.Sport,
  772. RegisterAt: time.Unix(c.AppInfo.RegisterAt, 0).Format("060102 15:04:05"),
  773. PreStatus: c.AppInfo.PreStatus,
  774. Status: c.AppInfo.Status,
  775. Rule: c.WhiteSettingInfo.Filters,
  776. Container: string(c.id),
  777. }
  778. detail.Rule = fmt.Sprintf("%s|%d", c.WhiteSettingInfo.Filters, c.WhiteSettingInfo.WhiteStackSettingInfo.OpenStack)
  779. if c.AppInfo.UpdateAt != 0 {
  780. detail.UpdateAt = time.Unix(c.AppInfo.UpdateAt, 0).Format("060102 15:04:05")
  781. }
  782. p := c.processes[pid]
  783. if p != nil {
  784. detail.StackStatus = p.stackStatus.String()
  785. v := 0
  786. if !p.versionFailed {
  787. v = 1
  788. }
  789. detail.StackStatus += fmt.Sprintf("V=%d", v)
  790. }
  791. runtimeApps[pid] = detail
  792. }
  793. func (c *Container) AgentCtrl(r *Registry, pid uint32) {
  794. if c == nil {
  795. //klog.WithField("pid", pid).Warningln("[AgentCtrl] cannot find container.")
  796. return
  797. }
  798. var err error
  799. verifyAttachConditions, _ := c.verifyAttachConditions(r, pid)
  800. // fusing UNINSTALL
  801. if r.isFusing && c.Isl7AttachSuccess() {
  802. c.Detach(r.tracer, pid, APP_FUSE)
  803. klog.WithField("pid", pid).Infoln("[AgentCtrl] fusing")
  804. return
  805. }
  806. // verify UNINSTALL
  807. if !verifyAttachConditions && c.Isl7AttachSuccess() {
  808. c.Detach(r.tracer, pid, APP_UNINSTALL)
  809. klog.WithField("pid", pid).Infoln("[AgentCtrl] rule uninstall.")
  810. return
  811. }
  812. if verifyAttachConditions {
  813. err = c.RegisterAppInfo(r, pid)
  814. if err != nil {
  815. klog.WithError(err).Errorf("[AgentCtrl] Failed registerAppInfo.")
  816. return
  817. }
  818. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes.")
  819. err = c.AttachUprobes(r.tracer, pid, "Agentctrl")
  820. if err != nil {
  821. klog.WithField("pid", pid).WithError(err).Errorf("[AgentCtrl] Failed attach uprobes error!")
  822. return
  823. } else {
  824. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes success!")
  825. }
  826. // 堆栈控制
  827. c.ctrlStack(r, pid)
  828. }
  829. }