container_apm.go 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990
  1. package containers
  2. import (
  3. "bufio"
  4. "bytes"
  5. "debug/elf"
  6. "fmt"
  7. "net"
  8. "os"
  9. "path"
  10. "sort"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/cilium/ebpf/link"
  15. "github.com/coroot/coroot-node-agent/flags"
  16. "github.com/coroot/coroot-node-agent/ebpftracer"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  18. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  19. "github.com/coroot/coroot-node-agent/proc"
  20. "github.com/coroot/coroot-node-agent/tracing"
  21. "github.com/coroot/coroot-node-agent/utils"
  22. . "github.com/coroot/coroot-node-agent/utils/modelse"
  23. "github.com/pkg/errors"
  24. klog "github.com/sirupsen/logrus"
  25. semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
  26. "inet.af/netaddr"
  27. )
  28. const (
  29. TRACE_STATUS = 1
  30. )
  31. func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
  32. trace, ok := c.traceMap[traceId]
  33. return trace, ok
  34. }
  35. func (c *Container) createTraceMap(traceId uint64, trace *tracing.Trace) {
  36. c.traceMap[traceId] = trace
  37. }
  38. // 查询或创建trace信息
  39. func (c *Container) getOrInitTrace(traceId uint64) (*tracing.Trace, error) {
  40. trace, ok := c.getTrace(traceId)
  41. if !ok {
  42. //new trace
  43. trace = tracing.NewTraceFromEvent(string(c.id))
  44. //create TraceMap
  45. c.createTraceMap(traceId, trace)
  46. //create ParentSpan
  47. trace.CreateRootSpan(traceId)
  48. }
  49. return trace, nil
  50. }
  51. // getGrpcServerNetworkInfo 获取 gRPC server 的网络信息
  52. // 返回: IP地址, 端口号, 容器ID
  53. func (c *Container) getGrpcServerNetworkInfo() (string, uint16, string) {
  54. containerID := ""
  55. if c.cgroup != nil {
  56. containerID = c.cgroup.ContainerId
  57. }
  58. ipAddr := ""
  59. ifaces, err := net.Interfaces()
  60. if err == nil {
  61. for _, iface := range ifaces {
  62. if iface.Name == "eth0" {
  63. addrs, err := iface.Addrs()
  64. if err == nil {
  65. for _, addr := range addrs {
  66. var ipnet *net.IPNet
  67. switch v := addr.(type) {
  68. case *net.IPNet:
  69. ipnet = v
  70. case *net.IPAddr:
  71. ipnet = &net.IPNet{IP: v.IP, Mask: v.IP.DefaultMask()}
  72. }
  73. if ipnet != nil && ipnet.IP.To4() != nil {
  74. ipAddr = ipnet.IP.String()
  75. break
  76. }
  77. }
  78. }
  79. break
  80. }
  81. }
  82. }
  83. klog.Debugf("grpc server ip %s", ipAddr)
  84. // 本地端口尝试从AppInfo.Sport获取
  85. port := c.AppInfo.Sport
  86. klog.Debugf("grpc server port %d", port)
  87. return ipAddr, uint16(port), containerID
  88. }
  89. // Deprecated: InitTrace not used
  90. //func (c *Container) InitTrace(traceId uint64, r *l7.RequestData) error {
  91. // method, path, hostIp, port := l7.ParseHttpHost(r.Payload)
  92. // ip, err := netaddr.ParseIP(hostIp)
  93. // if err != nil {
  94. // //fmt.Println("host ip error")
  95. // hostIp = "127.0.0.1"
  96. // }
  97. // addr := netaddr.IPPortFrom(ip, port)
  98. // trace := tracing.NewTrace(string(c.id), addr)
  99. // if trace == nil {
  100. // return fmt.Errorf("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT is null")
  101. // }
  102. // c.traceMap[traceId] = trace
  103. //
  104. // trace.TraceStart(method, path, r.Status, r.Duration)
  105. // return nil
  106. //}
  107. // 在任意阶段,r.TraceId 不等于0 则创建 traceMap && createParentSpan
  108. // 更新 createTraceSpan 机制,更新触发traceEnd机制,当事件个数满足时,任意event均可触发end
  109. func (c *Container) SendEvent(t *tracing.Trace, traceID uint64) {
  110. if t.AllEventReady(traceID) {
  111. t.SendEvent()
  112. klog.Debugf("SendEvent %d", traceID)
  113. //fmt.Println(t.GetSpan())
  114. //fmt.Println("===============")
  115. delete(c.traceMap, traceID)
  116. }
  117. }
  118. func (c *Container) valuableTrace(traceID uint64) bool {
  119. return traceID != 0
  120. }
  121. func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
  122. c.lock.Lock()
  123. defer c.lock.Unlock()
  124. if r.Protocol == l7.ProtocolDNS {
  125. ip2fqdn, _type, fqdn, ttl, ips := c.onDNSRequest(r)
  126. if c.l7Attach && c.valuableTrace(r.TraceId) {
  127. apmTrace, err := c.getOrInitTrace(r.TraceId)
  128. if err == nil {
  129. apmTrace.DNSTraceQueryEvent(r, _type, fqdn, ttl, ips)
  130. c.SendEvent(apmTrace, r.TraceId)
  131. }
  132. }
  133. return ip2fqdn
  134. }
  135. //if !c.valuableTrace(r.TraceId) {
  136. // return nil
  137. //}
  138. //klog.Infof("====ProtocolTrace+++++ start==== %d %d", pid, r.TraceId)
  139. // klog.Infof("====ProtocolTrace===== start==== %d %d", r.Protocol == l7.ProtocolTrace, c.l7Attach)
  140. if r.Protocol == l7.ProtocolTrace && c.l7Attach && c.valuableTrace(r.TraceId) {
  141. // klog.Infof("====ProtocolTrace---- start==== %d %d", pid, r.TraceId)
  142. if r.TraceStart == TRACE_STATUS {
  143. // klog.Infof("====ProtocolTrace start==== %d %d", pid, r.TraceId)
  144. trace, err := c.getOrInitTrace(r.TraceId)
  145. if c.AppInfo.AppName != "" {
  146. klog.Debugf("->>> [%s] -> payload:[%s]", c.AppInfo.AppName, r.Payload)
  147. }
  148. if err == nil {
  149. if r.TraceType == 0 {
  150. method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
  151. ip, _ := netaddr.ParseIP(sn)
  152. //codeType := c.GetCodeTypeFromCache(pid)
  153. container_id := ""
  154. if c.cgroup != nil {
  155. container_id = c.cgroup.ContainerId
  156. }
  157. trace.TraceStartEvent(method, requestURI, sn, sport, r.Status, netaddr.IPPortFrom(ip, sport), pid, c.GetAppInfo(), container_id)
  158. c.SendEvent(trace, r.TraceId)
  159. } else if r.TraceType == 1 {
  160. ipAddr, port, containerID := c.getGrpcServerNetworkInfo()
  161. trace.GrpcServerTraceStartEvent(ipAddr, port, r, c.GetAppInfo(), containerID)
  162. c.SendEvent(trace, r.TraceId)
  163. // apmTrace, err := c.getOrInitTrace(r.TraceId)
  164. // if err == nil {
  165. // apmTrace.GrpcServerTraceQueryEvent(r, c.GetAppInfo())
  166. // c.SendEvent(apmTrace, r.TraceId)
  167. // }
  168. }
  169. }
  170. return nil
  171. }
  172. if r.TraceEnd == TRACE_STATUS {
  173. klog.Debugf("====ProtocolTrace end==== %d %d", pid, r.TraceId)
  174. trace, err := c.getOrInitTrace(r.TraceId)
  175. if err == nil {
  176. trace.TraceEndEvent(r)
  177. c.SendEvent(trace, r.TraceId)
  178. }
  179. return nil
  180. }
  181. }
  182. if r.Protocol == l7.ProtocolHTTP {
  183. if c.l7Attach && c.valuableTrace(r.TraceId) {
  184. method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
  185. apmTrace, err := c.getOrInitTrace(r.TraceId)
  186. //fmt.Println("ProtocolHTTP-----", r.TraceId, err)
  187. if err == nil {
  188. apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
  189. c.SendEvent(apmTrace, r.TraceId)
  190. }
  191. }
  192. //return nil
  193. }
  194. conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
  195. //fmt.Println("l7.connectionsByPidFd", conn, pid, fd)
  196. if conn == nil {
  197. if r.Protocol == l7.ProtocolGrpc {
  198. klog.Infoln("conn == nil r.Protocol == l7.ProtocolGrpc")
  199. klog.Infoln("enter the l7.ProtocolGrpc")
  200. if c.l7Attach && c.valuableTrace(r.TraceId) {
  201. apmTrace, err := c.getOrInitTrace(r.TraceId)
  202. if err == nil {
  203. apmTrace.GrpcClientTraceQueryEvent(r)
  204. c.SendEvent(apmTrace, r.TraceId)
  205. }
  206. }
  207. }
  208. return nil
  209. }
  210. if timestamp != 0 && conn.Timestamp != timestamp {
  211. if r.Protocol == l7.ProtocolGrpc {
  212. klog.Infoln("timestamp != 0 && conn.Timestamp != timestamp r.Protocol == l7.ProtocolGrpc")
  213. klog.Infoln("enter the l7.ProtocolGrpc")
  214. if c.l7Attach && c.valuableTrace(r.TraceId) {
  215. apmTrace, err := c.getOrInitTrace(r.TraceId)
  216. if err == nil {
  217. apmTrace.GrpcClientTraceQueryEvent(r)
  218. c.SendEvent(apmTrace, r.TraceId)
  219. }
  220. }
  221. }
  222. return nil
  223. }
  224. stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
  225. //trace := tracing.NewTrace(string(c.id), conn.ActualDest)
  226. switch r.Protocol {
  227. case l7.ProtocolHTTP:
  228. if c.AppInfo.AppName != "" {
  229. klog.Debugf("[%s] ->>>>> curl -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, r.Payload)
  230. }
  231. stats.observe(r.Status.Http(), "", r.Duration)
  232. case l7.ProtocolHTTP2:
  233. if conn.http2Parser == nil {
  234. conn.http2Parser = l7.NewHttp2Parser()
  235. }
  236. requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
  237. for _, req := range requests {
  238. stats.observe(req.Status.Http(), "", req.Duration)
  239. //trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
  240. }
  241. case l7.ProtocolPostgres:
  242. if r.Method != l7.MethodStatementClose {
  243. stats.observe(r.Status.String(), "", r.Duration)
  244. }
  245. //if conn.postgresParser == nil {
  246. // conn.postgresParser = l7.NewPostgresParser()
  247. //}
  248. //query := conn.postgresParser.Parse(r.Payload)
  249. //trace.PostgresQuery(query, r.Status.Error(), r.Duration)
  250. if c.l7Attach && c.valuableTrace(r.TraceId) {
  251. if conn.postgresParser == nil {
  252. conn.postgresParser = l7.NewPostgresParser()
  253. }
  254. query := conn.postgresParser.Parse(r.Payload)
  255. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  256. if c.AppInfo.AppName != "" {
  257. klog.Debugf("[%s] ->>>>> Postgres -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
  258. }
  259. //apmTrace, ok := c.getTrace(r.TraceId)
  260. apmTrace, err := c.getOrInitTrace(r.TraceId)
  261. //fmt.Println("mysql r.TraceId:", r.TraceId)
  262. //fmt.Println("ok:", ok)
  263. //fmt.Println("traceMap:", len(c.traceMap))
  264. if err == nil {
  265. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  266. //apmTrace.PostGreSqlTraceQueryEvent(query, r, conn.ActualDest)
  267. apmTrace.SQLTraceQueryEvent(l7.ProtocolPostgres, semconv.DBSystemPostgreSQL, query, r, conn.ActualDest)
  268. c.SendEvent(apmTrace, r.TraceId)
  269. }
  270. }
  271. case l7.ProtocolMysql:
  272. if r.Method != l7.MethodStatementClose {
  273. stats.observe(r.Status.String(), "", r.Duration)
  274. }
  275. if c.l7Attach && c.valuableTrace(r.TraceId) {
  276. if conn.mysqlParser == nil {
  277. conn.mysqlParser = l7.NewMysqlParser()
  278. }
  279. query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
  280. //trace.MysqlQuery(query, r.Status.Error(), r.Duration)
  281. if c.AppInfo.AppName != "" {
  282. klog.Debugf("[%s] ->>>>> Mysql -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
  283. }
  284. //apmTrace, ok := c.getTrace(r.TraceId)
  285. apmTrace, err := c.getOrInitTrace(r.TraceId)
  286. //fmt.Println("mysql r.TraceId:", r.TraceId)
  287. //fmt.Println("ok:", ok)
  288. //fmt.Println("traceMap:", len(c.traceMap))
  289. if err == nil {
  290. dbSystem := semconv.DBSystemMySQL
  291. // 根据端口白名单确定协议类型
  292. l7Type := flags.GetProtocolByPort(uint16(conn.ActualDest.Port()))
  293. if l7Type == l7.ProtocolMariaDB {
  294. dbSystem = semconv.DBSystemMariaDB
  295. }
  296. //apmTrace.MysqlTraceQuery(query, r.Status.Error(), r.Duration, conn.ActualDest)
  297. //apmTrace.MysqlTraceQueryEvent(query, r, conn.ActualDest)
  298. apmTrace.SQLTraceQueryEvent(l7Type, dbSystem, query, r, conn.ActualDest)
  299. c.SendEvent(apmTrace, r.TraceId)
  300. }
  301. }
  302. case l7.ProtocolDM:
  303. //统计dm的query次数
  304. stats.observe(r.Status.String(), "", r.Duration)
  305. //是否发送数据
  306. if c.l7Attach && c.valuableTrace(r.TraceId) {
  307. if conn.dmParser == nil {
  308. conn.dmParser = l7.NewDmParser()
  309. }
  310. query := conn.dmParser.Parse(r.Payload, r.StatementId)
  311. if c.AppInfo.AppName != "" {
  312. klog.Debugf("[%s] ->>>>> Mysql -> %s DMSQL:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
  313. }
  314. apmTrace, err := c.getOrInitTrace(r.TraceId)
  315. if err == nil {
  316. //apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest)
  317. apmTrace.SQLTraceQueryEvent(l7.ProtocolDM, semconv.DBSystemDaMengDB, query, r, conn.ActualDest)
  318. c.SendEvent(apmTrace, r.TraceId)
  319. }
  320. }
  321. case l7.ProtocolMemcached:
  322. stats.observe(r.Status.String(), "", r.Duration)
  323. if c.l7Attach && c.valuableTrace(r.TraceId) {
  324. }
  325. //cmd, items := l7.ParseMemcached(r.Payload)
  326. //trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
  327. case l7.ProtocolRedis:
  328. stats.observe(r.Status.String(), "", r.Duration)
  329. if c.l7Attach && c.valuableTrace(r.TraceId) {
  330. cmd, args := l7.ParseRedis(r.Payload)
  331. //fmt.Println("cmd", cmd)
  332. //fmt.Println("args", args)
  333. //apmTrace, ok := c.getTrace(r.TraceId)
  334. if c.AppInfo.AppName != "" {
  335. klog.Debugf("[%s] ->>>>> Redis -> %s DMSQL:[%s]", c.AppInfo.AppName, conn.ActualDest, cmd)
  336. }
  337. apmTrace, err := c.getOrInitTrace(r.TraceId)
  338. if err == nil {
  339. //apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
  340. apmTrace.RedisTraceQueryEvent(cmd, args, r, conn.ActualDest)
  341. c.SendEvent(apmTrace, r.TraceId)
  342. }
  343. }
  344. //trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
  345. case l7.ProtocolGrpc:
  346. klog.Debugln("enter the l7.ProtocolGrpc")
  347. stats.observe(r.Status.String(), "", r.Duration)
  348. if c.l7Attach && c.valuableTrace(r.TraceId) {
  349. apmTrace, err := c.getOrInitTrace(r.TraceId)
  350. if err == nil {
  351. apmTrace.GrpcClientTraceQueryEvent(r)
  352. c.SendEvent(apmTrace, r.TraceId)
  353. }
  354. }
  355. case l7.ProtocolMongo:
  356. stats.observe(r.Status.String(), "", r.Duration)
  357. if c.l7Attach && c.valuableTrace(r.TraceId) {
  358. query := l7.ParseMongo(r.Payload)
  359. if c.AppInfo.AppName != "" {
  360. klog.Debugf("[%s] ->>>>> MongoDB -> %s SQL:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
  361. }
  362. apmTrace, err := c.getOrInitTrace(r.TraceId)
  363. if err == nil {
  364. //apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)
  365. apmTrace.MongoTraceQueryEvent(query, r, conn.ActualDest)
  366. c.SendEvent(apmTrace, r.TraceId)
  367. }
  368. }
  369. case l7.ProtocolKafka, l7.ProtocolCassandra:
  370. stats.observe(r.Status.String(), "", r.Duration)
  371. if c.l7Attach && c.valuableTrace(r.TraceId) {
  372. }
  373. case l7.ProtocolRabbitmq, l7.ProtocolNats:
  374. stats.observe(r.Status.String(), r.Method.String(), 0)
  375. if c.l7Attach && c.valuableTrace(r.TraceId) {
  376. }
  377. case l7.ProtocolDubbo2:
  378. stats.observe(r.Status.String(), "", r.Duration)
  379. if c.l7Attach && c.valuableTrace(r.TraceId) {
  380. }
  381. }
  382. return nil
  383. }
  384. func (c *Container) buildIDs(pid uint32) bool {
  385. c.lock.Lock()
  386. defer c.lock.Unlock()
  387. p := c.processes[pid]
  388. if p != nil {
  389. p.cmdline = string(proc.GetRealCmdline(pid))
  390. }
  391. var sns []string
  392. var sport uint16
  393. for address, val := range c.getListens() {
  394. if val == 1 {
  395. ip := address.IP()
  396. if ip.Is4() && !ip.IsLoopback() {
  397. // 获取端口号
  398. sport = address.Port()
  399. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  400. ////c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  401. //c.AppInfo.Sn = ip.String()
  402. //c.AppInfo.Sport = int(port)
  403. //strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", ip.String(), port))
  404. //fmt.Println(port)
  405. ////os.Exit(1)
  406. //c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  407. //c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  408. ////c.AppInfo.InstanceId = c.instanceID.IntVal
  409. //strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", strInstanceID, string(proc.GetExe(pid))))
  410. //c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  411. //c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  412. //return true
  413. }
  414. }
  415. }
  416. if len(sns) > 0 {
  417. //c.instanceID.IntVal, c.instanceID.HashtVal, _ =
  418. snsStr := strings.Join(sns, ",")
  419. c.AppInfo.Sn = snsStr
  420. c.AppInfo.Sport = int(sport)
  421. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  422. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  423. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  424. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  425. // c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  426. // c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  427. return true
  428. }
  429. return false
  430. }
  431. func (c *Container) ReBuildIds(pid uint32) {
  432. c.lock.Lock()
  433. defer c.lock.Unlock()
  434. var sns []string
  435. var sport uint16
  436. for address, val := range c.getListens() {
  437. if val == 1 {
  438. ip := address.IP()
  439. if ip.Is4() && !ip.IsLoopback() {
  440. // 获取端口号
  441. sport = address.Port()
  442. sns = append(sns, fmt.Sprintf("%s:%d", ip, sport))
  443. }
  444. }
  445. }
  446. if len(sns) > 0 {
  447. snsStr := strings.Join(sns, ",")
  448. c.AppInfo.Sn = snsStr
  449. c.AppInfo.Sport = int(sport)
  450. strInstanceID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", c.AppInfo.Sn, sport))
  451. c.AppInfo.InstanceIdHash.IntVal, _ = strInstanceID.ToInt64()
  452. c.AppInfo.InstanceIdHash.HashtVal = strInstanceID.ToHashByte()
  453. // strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%s", utils.GetHostIP(), string(proc.GetExe(pid))))
  454. strAgentID := utils.BuildInt64ID(fmt.Sprintf("%s:%d", utils.GetHostIP(), c.AppInfo.AppIdHash.IntVal))
  455. c.AppInfo.AgentId, _ = strAgentID.ToInt64()
  456. c.AppInfo.CodeType = c.GetCodeTypeFromCache(pid)
  457. }
  458. }
  459. func (c *Container) StackProcess(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  460. c.lock.Lock()
  461. defer c.lock.Unlock()
  462. // get the associated uprobe
  463. uprobe, err := c.GetUprobe(event, tracer)
  464. if err != nil {
  465. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  466. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  467. return
  468. }
  469. if event.TraceId <= 0 {
  470. //fmt.Println("StackProcess TraceId id 0")
  471. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  472. return
  473. }
  474. // fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  475. stackFun := ebpftracer.StackFunEvent{}
  476. stackFun.Uprobe = &uprobe
  477. stackFun.StackEvent = event
  478. apmTrace, ok := c.getTrace(event.TraceId)
  479. if ok {
  480. apmTrace.FunAdd(stackFun)
  481. }
  482. }
  483. func byteExtractString(nameString [100]byte) string {
  484. n := bytes.IndexFunc(nameString[:], func(r rune) bool {
  485. return r == 0 || r < 32 || r > 126 // 截取到第一个零值或非打印字符
  486. })
  487. if n == -1 {
  488. n = len(nameString) // 没找到零值或非打印字符,使用数组长度
  489. }
  490. return string(nameString[:n])
  491. }
  492. func (c *Container) StackProcess2(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) {
  493. c.lock.Lock()
  494. defer c.lock.Unlock()
  495. // get the associated uprobe
  496. switch event.Location {
  497. case 0: // ret
  498. Funcname := ""
  499. if event.Type != uint64(CodeTypeJava) {
  500. uprobe, err := c.GetUprobe(event, tracer)
  501. if err != nil {
  502. //fmt.Println("GetUprobeGetUprobe errer: %v", err)
  503. klog.Errorf("failed to get uprobe for event %+v: %+v", event, err)
  504. return
  505. }
  506. Funcname = uprobe.Funcname
  507. } else {
  508. ClassName := byteExtractString(event.ClassName)
  509. MethedName := byteExtractString(event.MethedName)
  510. Funcname = ClassName + "." + MethedName
  511. }
  512. if event.TraceId <= 0 {
  513. //fmt.Println("StackProcess TraceId id 0")
  514. klog.Errorf("failed to get uprobe(traceId is <= 0) for event %+v", event)
  515. return
  516. }
  517. //fmt.Printf("StackProcess 函数入口开始处理 fun:TraceId:%lld, Funcname:%s, time: %lld\n", event.TraceId, uprobe.Funcname, event.TimeNsEnd-event.TimeNsStart)
  518. apmTrace, err := c.getOrInitTrace(event.TraceId)
  519. if err == nil {
  520. //fmt.Println("append FuncTraceQuery fun:", event.TraceId, uprobe.Funcname, event.Pid)
  521. duration := event.TimeNsEnd - event.TimeNsStart
  522. apmTrace.FuncTraceQuery(Funcname, time.Duration(duration), event.TimeNsStart, event.TimeNsEnd)
  523. c.SendEvent(apmTrace, event.TraceId)
  524. }
  525. }
  526. }
  527. // ResolveAddress returns the symbol(s) and offset of the given address.
  528. func (c *Container) ResolveAddress(addr uint64, symbols []elf.Symbol) (syms []elf.Symbol, offset uint, err error) {
  529. if addr == 0 {
  530. // err = errors.Wrapf(SymbolNotFoundError, "0")
  531. return
  532. }
  533. // symbols, _, err := e.Symbols()
  534. if err != nil {
  535. return
  536. }
  537. idx := sort.Search(len(symbols), func(i int) bool { return symbols[i].Value > addr })
  538. if idx == 0 {
  539. // err = errors.Wrap(SymbolNotFoundError, fmt.Sprintf("%x", addr))
  540. return
  541. }
  542. // why diff symbol may contains the same addr?
  543. sym := symbols[idx-1]
  544. for i := idx - 1; i >= 0 && symbols[i].Value == sym.Value; i-- {
  545. syms = append(syms, symbols[i])
  546. }
  547. for i := idx; i < len(symbols) && symbols[i].Value == sym.Value; i++ {
  548. syms = append(syms, symbols[i])
  549. }
  550. return syms, uint(addr - sym.Value), nil
  551. }
  552. type MemoryMap struct {
  553. Start, End uint64
  554. }
  555. // ReadFirstLineOfMapsFile reads the first line of /proc/<pid>/maps file and return the memory map as a MemoryMap struct
  556. func ReadFirstLineOfMapsFile(pid string) (*MemoryMap, error) {
  557. file, err := os.Open(fmt.Sprintf("/proc/%s/maps", pid))
  558. if err != nil {
  559. return nil, err
  560. }
  561. defer file.Close()
  562. scanner := bufio.NewScanner(file)
  563. if scanner.Scan() {
  564. fields := strings.Fields(scanner.Text())
  565. addresses := strings.Split(fields[0], "-")
  566. if len(addresses) != 2 {
  567. return nil, errors.New("unexpected format in /proc/<pid>/maps")
  568. }
  569. start, err := strconv.ParseUint(addresses[0], 16, 64)
  570. if err != nil {
  571. return nil, err
  572. }
  573. end, err := strconv.ParseUint(addresses[1], 16, 64)
  574. if err != nil {
  575. return nil, err
  576. }
  577. return &MemoryMap{
  578. Start: start,
  579. End: end,
  580. }, nil
  581. }
  582. if err := scanner.Err(); err != nil {
  583. return nil, err
  584. }
  585. return nil, errors.New("empty /proc/<pid>/maps")
  586. }
  587. func (c *Container) GetUprobe(event ebpftracer.StackEvent, tracer *ebpftracer.Tracer) (uprobe tracer.Uprobe, err error) {
  588. //fmt.Println("GetUprobe entory:")
  589. memoryMap, _ := ReadFirstLineOfMapsFile(strconv.Itoa(int(event.Pid)))
  590. Address := event.Ip - memoryMap.Start
  591. // fmt.Printf("memoryMap.Start: %x, event.Ip: %x, Address: %x\n", memoryMap.Start, event.Ip, Address)
  592. for _, fun := range c.UprobesMap {
  593. funAddress := fun.Address + fun.AbsOffset
  594. // fmt.Printf("GetUprobeGetUprobeGetUprobe:fun.Address %x, fun.AbsOffset: %x\n", fun.Address, fun.AbsOffset)
  595. if funAddress == Address {
  596. // fmt.Printf("---GetUprobeGetUprobeGetUprobe: %x, event.Ip: %x ---- %s--%x\n", memoryMap.Start, event.Ip, fun.Funcname, fun.Address)
  597. return fun, nil
  598. }
  599. }
  600. syms, _, err := c.ResolveAddress(event.Ip, tracer.Symbols)
  601. if err != nil {
  602. return
  603. }
  604. for _, sym := range syms {
  605. //fmt.Println("GetUprobeGetUprobeGetUprobe: %s+%d", sym.Name, offset)
  606. uprobe, ok := tracer.UprobesMap[fmt.Sprintf("%s-%s", sym.Name, sym.Value)]
  607. if ok {
  608. return uprobe, nil
  609. }
  610. }
  611. err = errors.New("uprobe not found")
  612. return
  613. }
  614. func (c *Container) GetAppInfo() AppInfo {
  615. return c.AppInfo
  616. }
  617. // 可注入前置
  618. func (c *Container) checkEventReady() bool {
  619. c.lock.Lock()
  620. defer c.lock.Unlock()
  621. return c.l7EventReady
  622. }
  623. func (c *Container) eventReady() {
  624. c.lock.Lock()
  625. defer c.lock.Unlock()
  626. c.l7EventReady = true
  627. }
  628. // uprobe前置
  629. func (c *Container) Isl7AttachSuccess() bool {
  630. c.lock.Lock()
  631. defer c.lock.Unlock()
  632. return c.l7Attach
  633. }
  634. func (c *Container) l7AttachSuccess() {
  635. c.lock.Lock()
  636. defer c.lock.Unlock()
  637. c.l7Attach = true
  638. }
  639. func (c *Container) ctrlStack(r *Registry, pid uint32) {
  640. resp, err := c.GetCodeSetting(r)
  641. if err != nil {
  642. klog.WithField("pid", pid).WithError(err).Error("[ctrlStack] GetCodeSetting failed.")
  643. return
  644. }
  645. if resp.BlackWhiteSettings.CollectStack == OPEN_STACK {
  646. // 有黑白名单规则 &&
  647. // 之前有注入 先卸载再注入
  648. // 之前没注入 直接注入
  649. // 没有有黑白名单 直接卸载
  650. if c.hasStackRule(resp) {
  651. if c.stackRuleUpdate(resp) {
  652. // 重新注入
  653. err = c.DetachStack(pid, APP_UNINSTALL)
  654. if err != nil {
  655. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  656. }
  657. }
  658. klog.WithField("pid", pid).Infoln("[ctrlStack] Attach app stack.")
  659. c.saveWhiteStackSettingInfo(resp)
  660. err = c.AttachStack(r.tracer, pid)
  661. if err != nil {
  662. c.AppInfo.SetAppStackError()
  663. klog.WithField("pid", pid).WithError(err).Errorf("[ctrlStack][end] Failed attach stack trace!")
  664. }
  665. } else {
  666. if c.noOrigRule() {
  667. return
  668. }
  669. c.saveWhiteStackSettingInfo(resp)
  670. // 关闭堆栈
  671. err = c.DetachStack(pid, APP_UNINSTALL)
  672. if err != nil {
  673. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  674. }
  675. }
  676. } else {
  677. if c.noOrigRule() {
  678. return
  679. }
  680. c.saveWhiteStackSettingInfo(resp)
  681. // 关闭堆栈
  682. err = c.DetachStack(pid, APP_UNINSTALL)
  683. if err != nil {
  684. klog.WithError(err).Errorf("[ctrlStack][end] Failed detach stack trace!")
  685. }
  686. }
  687. }
  688. func (c *Container) verifyAttachConditions(r *Registry, pid uint32) (bool, int) {
  689. p := c.processes[pid]
  690. if p != nil && c.checkEventReady() {
  691. codeType := c.GetCodeTypeFromCache(pid)
  692. if codeType.IsUnknownCode() {
  693. klog.WithField("pid", pid).Debug("[verify] unknown language.")
  694. return false, 0
  695. }
  696. cmdline := p.GetCmdline()
  697. if len(cmdline) == 0 {
  698. return false, 0
  699. }
  700. //whiteListByCode := r.getWhiteListByCodeType(codeType)
  701. whiteListByCode := r.getWhiteListAll()
  702. //klog.WithField("pid", pid).WithField("codeType", codeType.String()).
  703. // Infof("[verify] white list %v", utils.ToString(whiteListByCode))
  704. // 当前语言的白名单规则
  705. for _, setting := range whiteListByCode {
  706. ruleVal := setting.Filters
  707. if ruleVal == "" {
  708. continue
  709. }
  710. // 判断规则
  711. if strings.Contains(cmdline, ruleVal) {
  712. //if !codeType.IsJvmCode() {
  713. // klog.WithField("pid", pid).Warning("[verify] This agent version only supports JVM applications.")
  714. // return false, 0
  715. //}
  716. c.WhiteSettingInfo.AppName = setting.AppName
  717. c.WhiteSettingInfo.Filters = setting.Filters
  718. klog.WithField("pid", pid).
  719. WithField("codeType", codeType.String()).
  720. WithField("ruleVal", ruleVal).
  721. WithField("cmdline", cmdline).
  722. //WithField("stack", setting.OpenStack).
  723. WithField("white list", utils.ToString(whiteListByCode)).
  724. Infoln("[verify] check successful.")
  725. return true, 0
  726. }
  727. }
  728. }
  729. return false, 0
  730. }
  731. // 1.卸载入口
  732. func (c *Container) Detach(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) {
  733. c.lock.Lock()
  734. defer c.lock.Unlock()
  735. if p := c.processes[pid]; p != nil {
  736. err := c.DetachUprobes(tracer, pid, detachType)
  737. if err != nil {
  738. klog.WithError(err).Errorln("DetachUprobes Error.")
  739. }
  740. err = c.DetachStack(pid, detachType)
  741. if err != nil {
  742. klog.WithError(err).Errorln("DetachStack Error.")
  743. }
  744. // 关闭7层监控
  745. c.l7Attach = false
  746. // 变更应用状态
  747. if err != nil {
  748. detachType = detachType.Error()
  749. }
  750. c.AppInfo.SetAppStatus(detachType)
  751. }
  752. }
  753. // 1.1卸载uprobe
  754. func (c *Container) DetachUprobes(tracer *ebpftracer.Tracer, pid uint32, detachType APP_TYPE) error {
  755. // close uprobe
  756. if p := c.processes[pid]; p != nil {
  757. for _, u := range p.uprobes {
  758. err := u.Close()
  759. if err != nil {
  760. return err
  761. }
  762. }
  763. p.uprobes = []link.Link{}
  764. switch detachType {
  765. case APP_UNINSTALL, APP_FUSE:
  766. codeType := c.GetCodeTypeFromCache(pid)
  767. switch codeType {
  768. case CodeTypeJava:
  769. p.jvmAttachOnce = false
  770. case CodeTypeGo:
  771. p.goTlsUprobesChecked = false
  772. p.openSslUprobesChecked = false
  773. default:
  774. }
  775. case APP_UPROBE_ERROR:
  776. klog.Infof("[DetachUprobes] ERROR_DETACH for pid %d", pid)
  777. default:
  778. }
  779. //delete the proc info form proc_info_map(for kernel) when the uprobe detached
  780. if err := tracer.DelKProcInfo(pid); err != nil {
  781. return fmt.Errorf("[DetachUprobes] failed to delete KProcInfo for pid %d, detach type is:%s", pid, detachType)
  782. } else {
  783. klog.Infof("[DetachUprobes] delete KProcInfo success for pid %d,detachType:%d", pid, detachType)
  784. c.AppInfo.EBPFProcInfo = nil
  785. }
  786. } else {
  787. return fmt.Errorf("[DetachUprobes] cannot find uprobe for pid %d", pid)
  788. }
  789. return nil
  790. }
  791. // 1.2卸载堆栈
  792. func (c *Container) DetachStack(pid uint32, detachType APP_TYPE) error {
  793. if p := c.processes[pid]; p != nil {
  794. var err error
  795. codeType := c.GetCodeTypeFromCache(pid)
  796. switch codeType {
  797. // 1.2.1 卸载 jvm堆栈
  798. case CodeTypeJava:
  799. err = c.detachJvmStack(pid)
  800. default:
  801. err = p.closeStackUprobes()
  802. }
  803. if err != nil {
  804. klog.WithError(err).Errorln("[detachStack] failed to detach stack")
  805. return err
  806. }
  807. p.stackAttachOnce = false
  808. } else {
  809. return fmt.Errorf("[DetachStack] cannot find uprobe for pid %d", pid)
  810. }
  811. return nil
  812. }
  813. // 1.2.1 卸载 jvm堆栈
  814. func (c *Container) detachJvmStack(pid uint32) error {
  815. if p := c.processes[pid]; p != nil {
  816. //if p.stackStatus.IsStackUprobesSuccess() || len(p.stackUprobes) > 0 {
  817. //}
  818. // 卸载 JavaAgent
  819. var err error
  820. if p.stackStatus.IsJattachSuccess() {
  821. // 卸载堆栈probes
  822. err = p.closeStackUprobes()
  823. if err != nil {
  824. klog.WithError(err).Errorf("[detachJvmStack] closeStackUprobes")
  825. }
  826. err = p.uninstallJavaAgent()
  827. if err != nil {
  828. klog.WithError(err).Errorf("[detachJvmStack] uninstallJavaAgent")
  829. }
  830. }
  831. return err
  832. }
  833. return nil
  834. }
  835. func (c *Container) getRootfs() string {
  836. if c.metadata != nil && c.metadata.rootfs != "" {
  837. return path.Join(*flags.HostDirPathPrefix, c.metadata.rootfs)
  838. }
  839. return ""
  840. }
  841. func (c *Container) BuildActiveApps(runtimeApps map[uint32]AppStatusInfo, pid uint32) {
  842. if c == nil {
  843. //klog.WithField("pid", pid).Warningln("[BuildActiveApps] container_apm is nil.")
  844. return
  845. }
  846. if c.AppInfo.AppName == "" {
  847. return
  848. }
  849. klog.WithField("pid", pid).WithField("appname", c.AppInfo.AppName).Infof("[BuildActiveApps] container %s is running.", c.AppInfo.AppName)
  850. detail := AppStatusInfo{
  851. Pid: pid,
  852. ProcName: c.containerName,
  853. AppName: c.AppInfo.AppName,
  854. Language: c.AppInfo.CodeType.String(),
  855. AppID: c.AppInfo.AppIdHash.IntVal,
  856. AgentID: c.AppInfo.AgentId,
  857. InstanceID: c.AppInfo.InstanceIdHash.IntVal,
  858. Sn: c.AppInfo.Sn,
  859. Sport: c.AppInfo.Sport,
  860. RegisterAt: time.Unix(c.AppInfo.RegisterAt, 0).Format("060102 15:04:05"),
  861. PreStatus: c.AppInfo.PreStatus,
  862. Status: c.AppInfo.Status,
  863. Rule: c.WhiteSettingInfo.Filters,
  864. Container: string(c.id),
  865. }
  866. detail.Rule = fmt.Sprintf("%s|%d", c.WhiteSettingInfo.Filters, c.WhiteSettingInfo.WhiteStackSettingInfo.OpenStack)
  867. if c.AppInfo.UpdateAt != 0 {
  868. detail.UpdateAt = time.Unix(c.AppInfo.UpdateAt, 0).Format("060102 15:04:05")
  869. }
  870. p := c.processes[pid]
  871. if p != nil {
  872. detail.StackStatus = p.stackStatus.String()
  873. v := 0
  874. if !p.versionFailed {
  875. v = 1
  876. }
  877. detail.StackStatus += fmt.Sprintf("V=%d", v)
  878. }
  879. runtimeApps[pid] = detail
  880. }
  881. func (c *Container) AgentCtrl(r *Registry, pid uint32) {
  882. if c == nil {
  883. //klog.WithField("pid", pid).Warningln("[AgentCtrl] cannot find container.")
  884. return
  885. }
  886. var err error
  887. verifyAttachConditions, _ := c.verifyAttachConditions(r, pid)
  888. // fusing UNINSTALL
  889. if r.isFusing && c.Isl7AttachSuccess() {
  890. c.Detach(r.tracer, pid, APP_FUSE)
  891. klog.WithField("pid", pid).Infoln("[AgentCtrl] fusing")
  892. return
  893. }
  894. // verify UNINSTALL
  895. if !verifyAttachConditions && c.Isl7AttachSuccess() {
  896. c.Detach(r.tracer, pid, APP_UNINSTALL)
  897. klog.WithField("pid", pid).Infoln("[AgentCtrl] rule uninstall.")
  898. return
  899. }
  900. if verifyAttachConditions {
  901. err = c.RegisterAppInfo(r, pid)
  902. if err != nil {
  903. klog.WithError(err).Errorf("[AgentCtrl] Failed registerAppInfo.")
  904. return
  905. }
  906. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes.")
  907. err = c.AttachUprobes(r.tracer, pid, "Agentctrl")
  908. if err != nil {
  909. klog.WithField("pid", pid).WithError(err).Errorf("[AgentCtrl] Failed attach uprobes error!")
  910. return
  911. } else {
  912. klog.WithField("pid", pid).Infoln("[AgentCtrl] Attach uprobes success!")
  913. }
  914. // 堆栈控制
  915. c.ctrlStack(r, pid)
  916. }
  917. }