tracer.go 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. debugelf "debug/elf"
  5. "encoding/binary"
  6. "encoding/hex"
  7. "errors"
  8. "fmt"
  9. "os"
  10. "path/filepath"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "github.com/coroot/coroot-node-agent/utils"
  15. "github.com/coroot/coroot-node-agent/utils/try"
  16. "github.com/cilium/ebpf"
  17. "github.com/cilium/ebpf/link"
  18. "github.com/cilium/ebpf/perf"
  19. "github.com/coroot/coroot-node-agent/common"
  20. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  21. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  22. "github.com/coroot/coroot-node-agent/proc"
  23. . "github.com/coroot/coroot-node-agent/utils/modelse"
  24. klog "github.com/sirupsen/logrus"
  25. "golang.org/x/sys/unix"
  26. "inet.af/netaddr"
  27. )
  28. /*
  29. #define TASK_COMM_LEN 16
  30. #define BURST_DATA_BUF_SIZE 8192 // For brust send buffer
  31. #include <linux/types.h>
  32. struct __tuple_t {
  33. __u8 daddr[16];
  34. __u8 rcv_saddr[16];
  35. __u8 addr_len;
  36. __u8 l4_protocol;
  37. __u16 dport;
  38. __u16 num;
  39. };
  40. struct __socket_data {
  41. __u32 pid;
  42. __u32 tgid;
  43. __u64 coroutine_id;
  44. __u8 source;
  45. __u8 comm[TASK_COMM_LEN];
  46. __u64 socket_id;
  47. struct __tuple_t tuple;
  48. __u32 extra_data;
  49. __u32 extra_data_count;
  50. __u32 tcp_seq;
  51. __u64 thread_trace_id;
  52. __u64 timestamp;
  53. __u8 direction: 1;
  54. __u8 msg_type: 7;
  55. __u64 syscall_len;
  56. __u64 data_seq;
  57. __u16 data_type;
  58. __u16 data_len;
  59. char data[BURST_DATA_BUF_SIZE];
  60. } __attribute__((packed));
  61. struct __socket_data_buffer {
  62. __u32 events_num;
  63. __u32 len;
  64. char data[32760];
  65. };
  66. */
  67. import "C"
  68. type SocketData C.struct___socket_data
  69. type SocketDataBuffer C.struct___socket_data_buffer
  70. const MaxPayloadSize = 1024
  71. type EventType uint32
  72. type EventReason uint32
  73. const (
  74. EventTypeProcessStart EventType = 1
  75. EventTypeProcessExit EventType = 2
  76. EventTypeConnectionOpen EventType = 3
  77. EventTypeConnectionClose EventType = 4
  78. EventTypeConnectionError EventType = 5
  79. EventTypeListenOpen EventType = 6
  80. EventTypeListenClose EventType = 7
  81. EventTypeFileOpen EventType = 8
  82. EventTypeTCPRetransmit EventType = 9
  83. EventTypeL7Request EventType = 10
  84. EventTypeFunEnt EventType = 11
  85. EventTypeFunRet EventType = 12
  86. EventTypeAcceptOpen EventType = 13
  87. EventTypeAcceptClose EventType = 14
  88. EventReasonNone EventReason = 0
  89. EventReasonOOMKill EventReason = 1
  90. )
  91. type TrafficStats struct {
  92. BytesSent uint64
  93. BytesReceived uint64
  94. }
  95. type Event struct {
  96. StackEvent *StackEvent
  97. Type EventType
  98. Reason EventReason
  99. Pid uint32
  100. SrcAddr netaddr.IPPort
  101. DstAddr netaddr.IPPort
  102. Fd uint64
  103. Timestamp uint64
  104. Duration time.Duration
  105. L7Request *l7.RequestData
  106. TrafficStats *TrafficStats
  107. FirstReadTime uint64
  108. FirstWriteTime uint64
  109. NewReadTime uint64
  110. }
  111. type perfMapType uint8
  112. const (
  113. perfMapTypeProcEvents perfMapType = 1
  114. perfMapTypeTCPEvents perfMapType = 2
  115. perfMapTypeFileEvents perfMapType = 3
  116. perfMapTypeL7Events perfMapType = 4
  117. perfMapTypeSocketEvents perfMapType = 5
  118. perfMapTypeEventQueue perfMapType = 6
  119. perfMapTypePythonThreadEvents perfMapType = 7
  120. )
  121. type Tracer struct {
  122. kernelVersion string
  123. disableL7Tracing bool
  124. disableE2ETracing bool
  125. disableStackTracing bool
  126. collection *ebpf.Collection
  127. collectionSpec *ebpf.CollectionSpec
  128. readers map[string]*perf.Reader
  129. links []link.Link
  130. uprobes map[string]*ebpf.Program
  131. Symbols []debugelf.Symbol
  132. Uprobes []tracer.Uprobe
  133. UprobesMap map[string]tracer.Uprobe
  134. }
  135. func NewTracer(kernelVersion string, disableL7Tracing, disableE2ETracing, disableStackTracing bool) *Tracer {
  136. if disableL7Tracing {
  137. klog.Infoln("L7 tracing is disabled")
  138. } else {
  139. klog.Infoln("L7 tracing is enabled")
  140. }
  141. if disableE2ETracing {
  142. klog.Infoln("e2e is disabled")
  143. } else {
  144. klog.Infoln("e2e is enabled")
  145. }
  146. if disableStackTracing {
  147. klog.Infoln("L7 stack is disabled")
  148. } else {
  149. klog.Infoln("L7 stack is enabled")
  150. }
  151. return &Tracer{
  152. kernelVersion: kernelVersion,
  153. disableL7Tracing: disableL7Tracing,
  154. disableE2ETracing: disableE2ETracing,
  155. disableStackTracing: disableStackTracing,
  156. readers: map[string]*perf.Reader{},
  157. uprobes: map[string]*ebpf.Program{},
  158. links: []link.Link{},
  159. }
  160. }
  161. func (t *Tracer) Run(events chan<- Event) error {
  162. if err := t.ebpf(events); err != nil {
  163. return err
  164. }
  165. if err := t.init(events); err != nil {
  166. return err
  167. }
  168. return nil
  169. }
  170. func (t *Tracer) Close() {
  171. for k, p := range t.uprobes {
  172. if p != nil {
  173. err := p.Close()
  174. klog.WithError(err).Infof("[close] uprobes %s", k)
  175. }
  176. }
  177. for _, l := range t.links {
  178. if l != nil {
  179. err := l.Close()
  180. klog.WithError(err).Infof("[close] links")
  181. }
  182. }
  183. for k, r := range t.readers {
  184. if r != nil {
  185. err := r.Close()
  186. klog.WithError(err).Infof("[close] readers %s", k)
  187. }
  188. }
  189. if t.collection != nil {
  190. t.collection.Close()
  191. }
  192. }
  193. func (t *Tracer) init(ch chan<- Event) error {
  194. pids, err := proc.ListPids()
  195. if err != nil {
  196. return fmt.Errorf("failed to list pids: %w", err)
  197. }
  198. for _, pid := range pids {
  199. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  200. }
  201. fds, sockets := readFds(pids)
  202. for _, fd := range fds {
  203. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  204. }
  205. listens := map[uint64]bool{}
  206. for _, s := range sockets {
  207. if s.Listen {
  208. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  209. }
  210. }
  211. ebpfConnectionsMap := t.collection.Maps["active_connections"]
  212. timestamp := uint64(time.Now().UnixNano())
  213. for _, s := range sockets {
  214. typ := EventTypeConnectionOpen
  215. if s.Listen {
  216. typ = EventTypeListenOpen
  217. //} else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  218. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] { // 存在误判
  219. continue
  220. }
  221. ch <- Event{
  222. Type: typ,
  223. Pid: s.pid,
  224. Timestamp: timestamp,
  225. Fd: s.fd,
  226. SrcAddr: s.SAddr,
  227. DstAddr: s.DAddr,
  228. }
  229. if typ == EventTypeConnectionOpen {
  230. id := ConnectionId{FD: s.fd, PID: s.pid}
  231. sip := s.SAddr.IP()
  232. sipbytes := sip.As16()
  233. dip := s.DAddr.IP()
  234. dipbytes := dip.As16()
  235. conn := Connection{Timestamp: timestamp, Saddr: sipbytes, Sport: s.SAddr.Port(), Daddr: dipbytes, Dport: s.DAddr.Port()}
  236. if err := ebpfConnectionsMap.Update(id, conn, ebpf.UpdateNoExist); err != nil {
  237. klog.Warningln(err)
  238. }
  239. }
  240. }
  241. return nil
  242. }
  243. func (t *Tracer) ActiveConnectionsIterator() *ebpf.MapIterator {
  244. return t.collection.Maps["active_connections"].Iterate()
  245. }
  246. func (t *Tracer) ActiveAcceptsIterator() *ebpf.MapIterator {
  247. return t.collection.Maps["active_accepts"].Iterate()
  248. }
  249. type perfMap struct {
  250. name string
  251. perCPUBufferSizePages int
  252. typ perfMapType
  253. }
  254. func (t *Tracer) ebpf(ch chan<- Event) error {
  255. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  256. path, prg, err := EbpfCode(kv)
  257. klog.Infof("kv is [%s], kernel version: [%s] path: [%s]", kv, t.kernelVersion, path)
  258. if len(prg) == 0 || err != nil {
  259. return fmt.Errorf("kv is %s, unsupported kernel version: [%s] path: [%s] err:<%v>", kv, t.kernelVersion, path, err)
  260. }
  261. _, debugFsErr := os.Stat("/sys/kernel/debug/tracing")
  262. _, traceFsErr := os.Stat("/sys/kernel/tracing")
  263. if debugFsErr != nil && traceFsErr != nil {
  264. return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted")
  265. }
  266. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  267. if err != nil {
  268. return fmt.Errorf("failed to load collection spec: %w", err)
  269. }
  270. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  271. tracer.PidFilter(collectionSpec)
  272. opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)}
  273. klog.Infof("[start] Look eBPF .maps")
  274. for _, spec := range collectionSpec.Maps {
  275. klog.Infoln(spec.Name)
  276. }
  277. klog.Infof("[end] Look eBPF .maps")
  278. tracer.MapInit(collectionSpec, opts)
  279. // TODO 多进程
  280. // tracer.SetConstants(collectionSpec)
  281. c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts)
  282. if err != nil {
  283. var verr *ebpf.VerifierError
  284. if errors.As(err, &verr) {
  285. klog.Errorf("----%+v", verr)
  286. }
  287. return fmt.Errorf("failed to load collection: %w", err)
  288. }
  289. tracer.Offset()
  290. t.collectionSpec = collectionSpec
  291. t.collection = c
  292. perfMaps := []perfMap{
  293. {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
  294. {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  295. {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  296. {name: "tcp_accept_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  297. {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  298. {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
  299. {name: "event_queue", typ: perfMapTypeEventQueue, perCPUBufferSizePages: 32},
  300. {name: "python_thread_events", typ: perfMapTypePythonThreadEvents, perCPUBufferSizePages: 4},
  301. }
  302. tracer.MapInsert(c)
  303. if !t.DisableL7Tracing() {
  304. perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
  305. }
  306. perfMaps = append(perfMaps, perfMap{name: MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64})
  307. klog.Infof("[start] Look eBPF perf_maps")
  308. for _, pm := range perfMaps {
  309. klog.Infoln(pm.name)
  310. m, ok := t.collection.Maps[pm.name]
  311. if ok {
  312. r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize())
  313. if err != nil {
  314. t.Close()
  315. return fmt.Errorf("failed to create ebpf reader: %w", err)
  316. }
  317. t.readers[pm.name] = r
  318. // event监听
  319. //go runEventsReader(pm.name, r, ch, pm.typ)
  320. try.GoParams(runEventsReader, utils.CatchFn, pm.name, r, ch, pm.typ)
  321. }
  322. }
  323. klog.Infof("[end] Look eBPF perf_maps")
  324. klog.Infof("[start] Look eBPF specPrograms")
  325. if err = t.LinkEbpfProg(); err != nil {
  326. return err
  327. }
  328. klog.Infof("[end] Look eBPF specPrograms")
  329. return nil
  330. }
  331. func (t *Tracer) LinkEbpfProg() error {
  332. klog.Infof("[start] Look eBPF specPrograms")
  333. var (
  334. l link.Link
  335. err error
  336. lastErr error
  337. )
  338. for _, programSpec := range t.collectionSpec.Programs {
  339. program := t.collection.Programs[programSpec.Name]
  340. klog.Infof("%s:[%s][%d]", programSpec.SectionName, programSpec.Name, program.Type())
  341. if t.DisableL7Tracing() {
  342. switch programSpec.Name {
  343. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg":
  344. continue
  345. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  346. continue
  347. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  348. continue
  349. }
  350. }
  351. switch programSpec.Type {
  352. case ebpf.SkSKB:
  353. klog.Infof("Processing SkSKB program: %s", programSpec.SectionName)
  354. // 对于sk_skb程序,我们需要先检查是否有sockhash map
  355. sockhashMap, exists := t.collection.Maps["sockhash"]
  356. if !exists {
  357. klog.Warnf("sockhash map not found, skipping sk_skb program attachment")
  358. continue
  359. }
  360. // 处理stream_verdict程序
  361. if programSpec.SectionName == "sk_skb/stream_verdict" {
  362. streamVerdictProg, exists := t.collection.Programs["http_hdr_inject"]
  363. if !exists {
  364. klog.Errorf("http_hdr_inject program not found")
  365. continue
  366. }
  367. klog.Infof("Found sockhash map (FD: %d) and stream_verdict program", sockhashMap.FD())
  368. // 尝试使用不同的附加方式
  369. // 首先尝试使用RawAttachProgram
  370. err = link.RawAttachProgram(link.RawAttachProgramOptions{
  371. Target: int(sockhashMap.FD()),
  372. Program: streamVerdictProg,
  373. Attach: ebpf.AttachSkSKBStreamVerdict,
  374. })
  375. if err != nil {
  376. klog.Errorf("Failed to attach stream_verdict program using RawAttachProgram: %v", err)
  377. // 尝试使用AttachRawLink作为备选方案
  378. l, err = link.AttachRawLink(link.RawLinkOptions{
  379. Target: int(sockhashMap.FD()),
  380. Program: streamVerdictProg,
  381. Attach: ebpf.AttachSkSKBStreamVerdict,
  382. })
  383. if err != nil {
  384. klog.Errorf("Failed to attach stream_verdict program using AttachRawLink: %v", err)
  385. continue
  386. }
  387. }
  388. klog.Infof("Successfully attached stream_verdict program to sockhash map")
  389. }
  390. // 处理stream_parser程序
  391. if programSpec.SectionName == "sk_skb/stream_parser" {
  392. streamParserProg, exists := t.collection.Programs["http_request_parser"]
  393. if !exists {
  394. klog.Errorf("http_request_parser program not found")
  395. continue
  396. }
  397. klog.Infof("Found sockhash map (FD: %d) and stream_parser program", sockhashMap.FD())
  398. // 尝试使用不同的附加方式
  399. // 首先尝试使用RawAttachProgram
  400. err = link.RawAttachProgram(link.RawAttachProgramOptions{
  401. Target: int(sockhashMap.FD()),
  402. Program: streamParserProg,
  403. Attach: ebpf.AttachSkSKBStreamParser,
  404. })
  405. if err != nil {
  406. klog.Errorf("Failed to attach stream_parser program using RawAttachProgram: %v", err)
  407. // 尝试使用AttachRawLink作为备选方案
  408. l, err = link.AttachRawLink(link.RawLinkOptions{
  409. Target: int(sockhashMap.FD()),
  410. Program: streamParserProg,
  411. Attach: ebpf.AttachSkSKBStreamParser,
  412. })
  413. if err != nil {
  414. klog.Errorf("Failed to attach stream_parser program using AttachRawLink: %v", err)
  415. continue
  416. }
  417. }
  418. klog.Infof("Successfully attached stream_parser program to sockhash map")
  419. }
  420. case ebpf.SockOps:
  421. klog.Infof("Processing SockOps program: %s", programSpec.SectionName)
  422. // 获取sockops程序
  423. sockopsProg, exists := t.collection.Programs["sockops_cb"]
  424. if !exists {
  425. klog.Errorf("sockops_cb program not found")
  426. continue
  427. }
  428. // 创建cgroup路径
  429. cgroupPath := "/sys/fs/cgroup/ebpf-sockops"
  430. if err := os.MkdirAll(cgroupPath, 0755); err != nil {
  431. klog.Errorf("Failed to create cgroup path: %v", err)
  432. continue
  433. }
  434. // 从环境变量获取要监控的PID
  435. filterPidStr := os.Getenv("FILTER_PID")
  436. if filterPidStr == "" {
  437. klog.Warnf("FILTER_PID environment variable not set, using current process")
  438. filterPidStr = fmt.Sprint(os.Getpid())
  439. }
  440. // 将指定PID添加到cgroup
  441. if err := os.WriteFile(filepath.Join(cgroupPath, "cgroup.procs"), []byte(filterPidStr), 0644); err != nil {
  442. klog.Errorf("Failed to add process %s to cgroup: %v", filterPidStr, err)
  443. continue
  444. }
  445. klog.Infof("Added process %s to cgroup for monitoring", filterPidStr)
  446. // 附加sockops程序到cgroup
  447. l, err = link.AttachCgroup(link.CgroupOptions{
  448. Path: cgroupPath,
  449. Program: sockopsProg,
  450. Attach: ebpf.AttachCGroupSockOps,
  451. })
  452. if err != nil {
  453. klog.Errorf("Failed to attach sockops program: %v", err)
  454. continue
  455. }
  456. klog.Infof("Successfully attached sockops program to cgroup: %s", cgroupPath)
  457. case ebpf.CGroupSKB:
  458. klog.Infof("Processing CGroupSKB program: %s", programSpec.SectionName)
  459. // 处理cgroup/skb程序
  460. if programSpec.SectionName == "cgroup/skb" {
  461. cgroupSkbProg, exists := t.collection.Programs["http_request_handler"]
  462. if !exists {
  463. klog.Errorf("http_request_handler program not found")
  464. continue
  465. }
  466. // 创建cgroup路径
  467. cgroupPath := "/sys/fs/cgroup/ebpf-sockops"
  468. if err := os.MkdirAll(cgroupPath, 0755); err != nil {
  469. klog.Errorf("Failed to create cgroup path: %v", err)
  470. continue
  471. }
  472. // 从环境变量获取要监控的PID
  473. filterPidStr := os.Getenv("FILTER_PID")
  474. if filterPidStr == "" {
  475. klog.Warnf("FILTER_PID environment variable not set, using current process")
  476. filterPidStr = fmt.Sprint(os.Getpid())
  477. }
  478. // 将指定PID添加到cgroup
  479. if err := os.WriteFile(filepath.Join(cgroupPath, "cgroup.procs"), []byte(filterPidStr), 0644); err != nil {
  480. klog.Errorf("Failed to add process %s to cgroup: %v", filterPidStr, err)
  481. continue
  482. }
  483. klog.Infof("Added process %s to cgroup for HTTP request monitoring", filterPidStr)
  484. // 附加cgroup/skb程序到cgroup
  485. l, err = link.AttachCgroup(link.CgroupOptions{
  486. Path: cgroupPath,
  487. Program: cgroupSkbProg,
  488. Attach: ebpf.AttachCGroupInetEgress,
  489. })
  490. if err != nil {
  491. klog.Errorf("Failed to attach cgroup/skb program: %v", err)
  492. continue
  493. }
  494. klog.Infof("Successfully attached cgroup/skb program to cgroup: %s", cgroupPath)
  495. }
  496. case ebpf.SkMsg:
  497. klog.Infof("Processing SkMsg program: %s", programSpec.SectionName)
  498. // 处理sk_msg程序
  499. if programSpec.SectionName == "sk_msg" {
  500. skMsgProg, exists := t.collection.Programs["sk_msg_handler"]
  501. if !exists {
  502. klog.Errorf("sk_msg_handler program not found")
  503. continue
  504. }
  505. skMsgMap, exists := t.collection.Maps["sk_msg_map"]
  506. if !exists {
  507. klog.Errorf("sk_msg_map not found")
  508. continue
  509. }
  510. klog.Infof("Found sk_msg_map (FD: %d) and sk_msg program", skMsgMap.FD())
  511. // 附加sk_msg程序到sockmap
  512. err = link.RawAttachProgram(link.RawAttachProgramOptions{
  513. Target: int(skMsgMap.FD()),
  514. Program: skMsgProg,
  515. Attach: ebpf.AttachSkMsgVerdict,
  516. })
  517. if err != nil {
  518. klog.Errorf("Failed to attach sk_msg program using RawAttachProgram: %v", err)
  519. // 尝试使用AttachRawLink作为备选方案
  520. l, err = link.AttachRawLink(link.RawLinkOptions{
  521. Target: int(skMsgMap.FD()),
  522. Program: skMsgProg,
  523. Attach: ebpf.AttachSkMsgVerdict,
  524. })
  525. if err != nil {
  526. klog.Errorf("Failed to attach sk_msg program using AttachRawLink: %v", err)
  527. continue
  528. }
  529. }
  530. klog.Infof("Successfully attached sk_msg program to sockmap")
  531. }
  532. case ebpf.TracePoint:
  533. if strings.Contains(programSpec.SectionName, "prog") {
  534. continue
  535. }
  536. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  537. l, err = link.Tracepoint(parts[0], parts[1], program, nil)
  538. case ebpf.Kprobe:
  539. if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
  540. t.uprobes[programSpec.Name] = program
  541. continue
  542. }
  543. if strings.HasPrefix(programSpec.SectionName, "kretprobe/") {
  544. l, err = link.Kretprobe(programSpec.AttachTo, program, nil)
  545. if err == nil {
  546. t.links = append(t.links, l)
  547. }
  548. continue
  549. }
  550. l, err = link.Kprobe(programSpec.AttachTo, program, nil)
  551. }
  552. if err != nil {
  553. lastErr = err
  554. t.Close()
  555. klog.Errorf("LinkEbpfProg failed to program[%s] link program: %s", programSpec.Name, err)
  556. //return fmt.Errorf("failed to link program: %w", err)
  557. } else {
  558. t.links = append(t.links, l)
  559. }
  560. }
  561. klog.Infof("[end] Look eBPF specPrograms")
  562. if lastErr != nil {
  563. return fmt.Errorf("failed to link program: %w", lastErr)
  564. }
  565. return nil
  566. }
  567. func (t *Tracer) UnlinkEbpfProg() error {
  568. var (
  569. lastErr error
  570. err error
  571. )
  572. /* 此处不应该处理 t.uprobes 中 ebpf-program
  573. for pName, p := range t.uprobes {
  574. if err = p.Close(); err != nil {
  575. lastErr = err
  576. klog.Errorf("UnlinkEbpfProg close program[%s] uprobe occurs error: %s", pName, err.Error())
  577. }
  578. }
  579. */
  580. for _, l := range t.links {
  581. if err = l.Close(); err != nil {
  582. lastErr = err
  583. klog.Errorf("UnlinkEbpfProg close link occurs error: %s", err.Error())
  584. }
  585. }
  586. return lastErr
  587. }
  588. func (t EventType) Int() int {
  589. return int(t)
  590. }
  591. func (t EventType) String() string {
  592. switch t {
  593. case EventTypeProcessStart:
  594. return "process-start"
  595. case EventTypeProcessExit:
  596. return "process-exit"
  597. case EventTypeConnectionOpen:
  598. return "connection-open"
  599. case EventTypeConnectionClose:
  600. return "connection-close"
  601. case EventTypeConnectionError:
  602. return "connection-error"
  603. case EventTypeListenOpen:
  604. return "listen-open"
  605. case EventTypeListenClose:
  606. return "listen-close"
  607. case EventTypeFileOpen:
  608. return "file-open"
  609. case EventTypeTCPRetransmit:
  610. return "tcp-retransmit"
  611. case EventTypeL7Request:
  612. return "l7-request"
  613. }
  614. return "unknown: " + strconv.Itoa(int(t))
  615. }
  616. func (t EventReason) String() string {
  617. switch t {
  618. case EventReasonNone:
  619. return "none"
  620. case EventReasonOOMKill:
  621. return "oom-kill"
  622. }
  623. return "unknown: " + strconv.Itoa(int(t))
  624. }
  625. type procEvent struct {
  626. Type EventType
  627. Pid uint32
  628. Reason uint32
  629. }
  630. type tcpEvent struct {
  631. Fd uint64
  632. Timestamp uint64
  633. Duration uint64
  634. FirstReadTime uint64
  635. FirstWriteTime uint64
  636. NewReadTime uint64
  637. Type EventType
  638. Pid uint32
  639. BytesSent uint64
  640. BytesReceived uint64
  641. SPort uint16
  642. DPort uint16
  643. SAddr [16]byte
  644. DAddr [16]byte
  645. }
  646. type fileEvent struct {
  647. Type EventType
  648. Pid uint32
  649. Fd uint64
  650. }
  651. // struct l7_event in l7.c
  652. type l7Event struct {
  653. Fd uint64
  654. ConnectionTimestamp uint64
  655. Pid uint32
  656. Status uint32
  657. Duration uint64
  658. Protocol uint8
  659. Method uint8
  660. Padding uint16
  661. StatementId uint32
  662. PayloadSize uint64
  663. TraceId uint64
  664. StartAt uint64 // ns
  665. EndtAt uint64 // ns
  666. TraceStart uint32
  667. TraceEnd uint32
  668. EventCount uint32
  669. Sport uint16
  670. Dport uint16
  671. SAddr [16]byte
  672. DAddr [16]byte
  673. ComponentSport uint16
  674. ComponentDport uint16
  675. ComponentSAddr [16]byte
  676. ComponentDAddr [16]byte
  677. AssumedAppId HashByte
  678. SpanId HashByte
  679. TraceIdFrom HashByte16
  680. CalledId HashByte
  681. InstanceIdFrom HashByte
  682. AppIdFrom HashByte
  683. SpanIdFrom HashByte
  684. TypeFrom [1]byte
  685. }
  686. type SocketDataBufferddd struct {
  687. EventsNum uint32
  688. Len uint32
  689. Data [32760]byte
  690. }
  691. const (
  692. TASK_COMM_LEN = 16
  693. BURST_DATA_BUF_SIZE = 8192
  694. )
  695. type Tuple struct {
  696. Daddr [16]uint8
  697. RcvSaddr [16]uint8
  698. AddrLen uint8
  699. L4Protocol uint8
  700. Dport uint16
  701. Num uint16
  702. }
  703. type SocketDatadddd struct {
  704. Pid uint32 // 表示线程号 如果'pid == tgid'表示一个进程, 否则是线程
  705. Tgid uint32 // 进程号
  706. CoroutineID uint64
  707. Source uint8
  708. Comm [TASK_COMM_LEN]byte
  709. SocketID uint64
  710. Tuple Tuple
  711. ExtraData uint32
  712. ExtraDataCount uint32
  713. TcpSeq uint32
  714. ThreadTraceID uint64
  715. Timestamp uint64
  716. Direction uint8
  717. MsgType uint8
  718. SyscallLen uint64
  719. DataSeq uint64
  720. DataType uint16
  721. DataLen uint16
  722. Data [BURST_DATA_BUF_SIZE]byte
  723. }
  724. type StackEvent struct {
  725. Type uint64
  726. Pid uint64
  727. TraceId uint64
  728. Goid uint64
  729. Ip uint64
  730. Bp uint64
  731. CallerIp uint64
  732. CallerBp uint64
  733. TimeNsStart uint64
  734. TimeNsEnd uint64
  735. // Nid uint64
  736. // Fpid uint64
  737. // Level uint64
  738. Location byte
  739. ClassName [100]byte
  740. MethedName [100]byte
  741. }
  742. type StackFunEvent struct {
  743. StackEvent StackEvent
  744. Uprobe *tracer.Uprobe
  745. }
  746. type pythonThreadEvent struct {
  747. Type EventType
  748. Pid uint32
  749. Duration uint64
  750. }
  751. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
  752. for {
  753. rec, err := r.Read()
  754. if err != nil {
  755. if errors.Is(err, perf.ErrClosed) {
  756. break
  757. }
  758. continue
  759. }
  760. if rec.LostSamples > 0 {
  761. klog.Errorln(name, "lost samples:", rec.LostSamples)
  762. continue
  763. }
  764. var event Event
  765. switch typ {
  766. case perfMapTypeSocketEvents:
  767. //fmt.Println("perfMapTypeSocketEvents")
  768. // 假设 rec.RawSample 包含数据,类型为 []byte
  769. //rawData := rec.RawSample
  770. //fmt.Println("perfMapTypeSocketEvents2")
  771. //
  772. //// 创建一个 SocketDataBuffer 结构体实例
  773. //var buffer SocketDataBuffer
  774. //
  775. //// 创建一个字节缓冲区,并将数据填充到其中
  776. //reader := bytes.NewReader(rawData)
  777. //fmt.Println("perfMapTypeSocketEvents3")
  778. //fmt.Println(len(rawData))
  779. //// 使用 binary.Read 函数读取数据并解析为 SocketDataBuffer 结构体实例
  780. //if err := binary.Read(reader, binary.LittleEndian, &buffer); err != nil {
  781. // fmt.Println(reader.Len())
  782. // fmt.Println("Failed to read data:", err)
  783. // continue
  784. //}
  785. //fmt.Println("perfMapTypeSocketEvents4")
  786. //
  787. //// 打印解析后的数据
  788. //fmt.Println("EventsNum:", buffer.EventsNum)
  789. //fmt.Println("Len:", buffer.Len)
  790. //
  791. //// 打印 char data 的内容
  792. //fmt.Printf("Data: %s\n", string(buffer.Data[:buffer.Len])) // 仅打印实际长度的数据
  793. //socketDataBuffer := rec.RawSample
  794. // 解析 __socket_data_buffer
  795. //buf := (*SocketDataBuffer)(unsafe.Pointer(&rec.RawSample[0])) //nolint:gosec
  796. //
  797. //// 获取 char data[32760];
  798. //socketData := (*SocketData)(unsafe.Pointer(&buf.data[0])) //nolint:gosec
  799. //// todo
  800. //fmt.Printf("socketData.DataType:%d \n", (socketData.data_type))
  801. //fmt.Printf("socketData.DataLen:%d \n", (socketData.data_len))
  802. //
  803. //// 解析C结构体中的data字段
  804. //dataSlice := C.GoBytes(unsafe.Pointer(&socketData.data[0]), C.int(socketData.data_len))
  805. //// 打印或处理包含的数据
  806. //fmt.Printf("socketData.Payload:%v \n", string(dataSlice))
  807. /*todo */
  808. //socketData := (*(*[128]byte)(unsafe.Pointer(&eventC.line)))
  809. //dataPtr := unsafe.Pointer(&buf.data[0])
  810. //socketData := (*SocketData)(dataPtr)
  811. //reader2 := bytes.NewBuffer(rec.RawSample)
  812. // 222222
  813. //fmt.Println("socketData.Pid:", socketData.pid)
  814. //fmt.Println("socketData.Tgid:", socketData.tgid)
  815. //fmt.Println("socketData.CoroutineID:", socketData.coroutine_id)
  816. //fmt.Println("socketData.Source:", socketData.source)
  817. //
  818. //fmt.Printf("socketData.Comm: %s \n", socketData.comm)
  819. //fmt.Printf("socketData.SocketID :%v \n", socketData.socket_id)
  820. //fmt.Println("socketData.Tuple:", socketData.Tuple)
  821. //fmt.Println("socketData.ExtraData:", socketData.ExtraData)
  822. //fmt.Println("socketData.ExtraDataCount:", socketData.ExtraDataCount)
  823. //fmt.Println("socketData.TCPSeq:", socketData.TcpSeq)
  824. //fmt.Println("socketData.ThreadTraceID:", socketData.ThreadTraceID)
  825. //fmt.Println("socketData.Timestamp:", socketData.Timestamp)
  826. //fmt.Println("socketData.Direction:", socketData.Direction)
  827. //fmt.Println("socketData.MsgType:", socketData.MsgType)
  828. //fmt.Println("socketData.SyscallLen:", socketData.SyscallLen)
  829. //fmt.Println("socketData.DataSeq:", socketData.DataSeq)
  830. //socketData := &SocketData{}
  831. //reader := bytes.NewBuffer(rec.RawSample)
  832. //if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  833. // klog.Warningln("failed1 to read msg:", err)
  834. // continue
  835. //}
  836. //
  837. //var data []byte
  838. //payload := reader.Bytes()
  839. //switch {
  840. //case v.Len == 0:
  841. //case v.Len > 32760:
  842. // data = payload[:32760]
  843. //default:
  844. // data = payload[:v.Len]
  845. //}
  846. //////data2 := data[:v.Len]
  847. ////fmt.Println("perfMapTypeSocketEvents")
  848. //fmt.Println(v.EventsNum)
  849. //fmt.Println(v.Len)
  850. //fmt.Println(string(data))
  851. //
  852. //var data2 SocketData
  853. //reader2 := bytes.NewBuffer(data)
  854. //if err := binary.Read(reader2, binary.LittleEndian, data2); err != nil {
  855. // klog.Warningln("failed2 to read msg:", err)
  856. // continue
  857. //}
  858. //
  859. //fmt.Println(data2.Pid)
  860. //fmt.Println(data2.Tgid)
  861. //fmt.Println(string(v.Data))
  862. //continue
  863. case perfMapTypeL7Events:
  864. v := &l7Event{}
  865. reader := bytes.NewBuffer(rec.RawSample)
  866. if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  867. klog.Warningln("failed to read msg:", err)
  868. continue
  869. }
  870. //fmt.Println("v.TraceIdFrom")
  871. //fmt.Println(v.TraceIdFrom)
  872. //a := hex.EncodeToString(v.TraceIdFrom[:])
  873. //for _, b := range v.AssumedAppId {
  874. // fmt.Printf("v.AssumedAppId- %02\n", b)
  875. //}
  876. //fmt.Println(a)
  877. payload := reader.Bytes()
  878. req := &l7.RequestData{
  879. Protocol: l7.Protocol(v.Protocol),
  880. Status: l7.Status(v.Status),
  881. Duration: time.Duration(v.Duration),
  882. Method: l7.Method(v.Method),
  883. StatementId: v.StatementId,
  884. TraceId: v.TraceId,
  885. TraceStart: v.TraceStart,
  886. TraceEnd: v.TraceEnd,
  887. EventCount: v.EventCount,
  888. AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]),
  889. SpanId: hex.EncodeToString(v.SpanId[:]),
  890. StartAt: v.StartAt,
  891. EndAt: v.EndtAt,
  892. ComponentSAddr: ipPort(v.ComponentSAddr, v.ComponentSport),
  893. ComponentDAddr: ipPort(v.ComponentDAddr, v.ComponentDport),
  894. }
  895. if req.Protocol == l7.ProtocolHTTP {
  896. klog.Debugf("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())
  897. klog.Debugf("runEventsReader ComponentDAddr.String %s", req.ComponentDAddr.String())
  898. }
  899. if v.TraceEnd == 1 {
  900. req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])
  901. req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:])
  902. req.ParentSpanContext.InstanceIdFrom = hex.EncodeToString(v.InstanceIdFrom[:])
  903. req.ParentSpanContext.AppIdFrom = hex.EncodeToString(v.AppIdFrom[:])
  904. req.ParentSpanContext.SpanIdFrom = hex.EncodeToString(v.SpanIdFrom[:])
  905. req.ParentSpanContext.TypeFrom = hex.EncodeToString(v.TypeFrom[:])
  906. // klog.Debugf("req.ParentSpanContext.TraceIdFrom %s", req.ParentSpanContext.TraceIdFrom)
  907. // klog.Debugf("req.ParentSpanContext.TypeFrom %s", req.ParentSpanContext.TypeFrom)
  908. req.SAddr = ipPort(v.SAddr, v.Sport)
  909. req.DAddr = ipPort(v.DAddr, v.Dport)
  910. // klog.Debugf("runEventsReader SAddr.String %s", req.SAddr.String())
  911. // klog.Debugf("runEventsReader DAddr.String %s", req.DAddr.String())
  912. }
  913. switch {
  914. case v.PayloadSize == 0:
  915. case v.PayloadSize > MaxPayloadSize:
  916. req.Payload = payload[:MaxPayloadSize]
  917. default:
  918. req.Payload = payload[:v.PayloadSize]
  919. }
  920. //fmt.Println("==========")
  921. //fmt.Println("req.Payload:", string(req.Payload))
  922. //fmt.Println("==========")
  923. event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
  924. case perfMapTypeFileEvents:
  925. v := &fileEvent{}
  926. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  927. klog.Warningln("failed to read msg:", err)
  928. continue
  929. }
  930. event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
  931. case perfMapTypeProcEvents:
  932. v := &procEvent{}
  933. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  934. klog.Warningln("failed to read msg:", err)
  935. continue
  936. }
  937. event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
  938. case perfMapTypeTCPEvents:
  939. v := &tcpEvent{}
  940. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  941. klog.Warningln("failed to read msg:", err)
  942. continue
  943. }
  944. event = Event{
  945. Type: v.Type,
  946. Pid: v.Pid,
  947. SrcAddr: ipPort(v.SAddr, v.SPort),
  948. DstAddr: ipPort(v.DAddr, v.DPort),
  949. Fd: v.Fd,
  950. Timestamp: v.Timestamp,
  951. Duration: time.Duration(v.Duration),
  952. }
  953. if v.Type == EventTypeConnectionClose {
  954. event.TrafficStats = &TrafficStats{
  955. BytesSent: v.BytesSent,
  956. BytesReceived: v.BytesReceived,
  957. }
  958. }
  959. event.FirstReadTime = v.FirstReadTime
  960. event.FirstWriteTime = v.FirstWriteTime
  961. event.NewReadTime = v.NewReadTime
  962. if v.Type == EventTypeAcceptClose {
  963. event.TrafficStats = &TrafficStats{
  964. BytesSent: v.BytesSent,
  965. BytesReceived: v.BytesReceived,
  966. }
  967. }
  968. case perfMapTypePythonThreadEvents:
  969. v := &pythonThreadEvent{}
  970. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  971. klog.Warningln("failed to read msg:", err)
  972. continue
  973. }
  974. event = Event{
  975. Type: v.Type,
  976. Pid: v.Pid,
  977. Duration: time.Duration(v.Duration),
  978. }
  979. case perfMapTypeEventQueue:
  980. v := &StackEvent{}
  981. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  982. klog.Warningln("failed to read msg:", err)
  983. continue
  984. }
  985. event = Event{
  986. Type: EventTypeFunEnt,
  987. StackEvent: v,
  988. }
  989. default:
  990. continue
  991. }
  992. ch <- event
  993. }
  994. }
  995. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  996. i, _ := netaddr.FromStdIP(ip[:])
  997. return netaddr.IPPortFrom(i, port)
  998. }
  999. func (t *Tracer) InitKProcInfo(pid uint32, appInfo *AppInfo) error {
  1000. var err error
  1001. var info EbpfProcInfo
  1002. if appInfo.EBPFProcInfo == nil {
  1003. info = EbpfProcInfo{
  1004. InstanceId: appInfo.InstanceIdHash.HashtVal,
  1005. AppId: appInfo.AppIdHash.HashtVal,
  1006. CodeType: uint16(appInfo.CodeType),
  1007. }
  1008. } else {
  1009. info = *appInfo.EBPFProcInfo
  1010. info.AppId = appInfo.AppIdHash.HashtVal
  1011. }
  1012. _, err = tracer.UpdateProcInfoToMap(t.collection, pid, info)
  1013. if err != nil {
  1014. klog.Error("failed to update program info", err)
  1015. }
  1016. appInfo.EBPFProcInfo = &info
  1017. return err
  1018. }
  1019. func (t *Tracer) DelKProcInfo(pid uint32) error {
  1020. _, err := tracer.DelProcInfoFromMap(t.collection, pid)
  1021. if err != nil {
  1022. klog.WithField("pid", pid).Error("failed to delete proc info", err)
  1023. }
  1024. return err
  1025. }
  1026. // TODO check language
  1027. func (t *Tracer) DisableL7Tracing() bool {
  1028. return t.disableL7Tracing
  1029. }
  1030. func (t *Tracer) DisableE2ETracing() bool {
  1031. return t.disableE2ETracing
  1032. }
  1033. func (t *Tracer) DisableStackTracing() bool {
  1034. return t.disableStackTracing
  1035. }