tracer.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "github.com/cilium/ebpf"
  8. "github.com/cilium/ebpf/link"
  9. "github.com/cilium/ebpf/perf"
  10. "github.com/coroot/coroot-node-agent/common"
  11. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  12. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  13. "github.com/coroot/coroot-node-agent/proc"
  14. "golang.org/x/mod/semver"
  15. "golang.org/x/sys/unix"
  16. "inet.af/netaddr"
  17. "k8s.io/klog/v2"
  18. "os"
  19. "runtime"
  20. "strconv"
  21. "strings"
  22. "time"
  23. )
  24. /*
  25. #define TASK_COMM_LEN 16
  26. #define BURST_DATA_BUF_SIZE 8192 // For brust send buffer
  27. #include <linux/types.h>
  28. struct __tuple_t {
  29. __u8 daddr[16];
  30. __u8 rcv_saddr[16];
  31. __u8 addr_len;
  32. __u8 l4_protocol;
  33. __u16 dport;
  34. __u16 num;
  35. };
  36. struct __socket_data {
  37. __u32 pid;
  38. __u32 tgid;
  39. __u64 coroutine_id;
  40. __u8 source;
  41. __u8 comm[TASK_COMM_LEN];
  42. __u64 socket_id;
  43. struct __tuple_t tuple;
  44. __u32 extra_data;
  45. __u32 extra_data_count;
  46. __u32 tcp_seq;
  47. __u64 thread_trace_id;
  48. __u64 timestamp;
  49. __u8 direction: 1;
  50. __u8 msg_type: 7;
  51. __u64 syscall_len;
  52. __u64 data_seq;
  53. __u16 data_type;
  54. __u16 data_len;
  55. char data[BURST_DATA_BUF_SIZE];
  56. } __attribute__((packed));
  57. struct __socket_data_buffer {
  58. __u32 events_num;
  59. __u32 len;
  60. char data[32760];
  61. };
  62. */
  63. import "C"
  64. type SocketData C.struct___socket_data
  65. type SocketDataBuffer C.struct___socket_data_buffer
  66. const MaxPayloadSize = 1024
  67. type EventType uint32
  68. type EventReason uint32
  69. const (
  70. EventTypeProcessStart EventType = 1
  71. EventTypeProcessExit EventType = 2
  72. EventTypeConnectionOpen EventType = 3
  73. EventTypeConnectionClose EventType = 4
  74. EventTypeConnectionError EventType = 5
  75. EventTypeListenOpen EventType = 6
  76. EventTypeListenClose EventType = 7
  77. EventTypeFileOpen EventType = 8
  78. EventTypeTCPRetransmit EventType = 9
  79. EventTypeL7Request EventType = 10
  80. EventReasonNone EventReason = 0
  81. EventReasonOOMKill EventReason = 1
  82. )
  83. type Event struct {
  84. Type EventType
  85. Reason EventReason
  86. Pid uint32
  87. SrcAddr netaddr.IPPort
  88. DstAddr netaddr.IPPort
  89. Fd uint64
  90. Timestamp uint64
  91. L7Request *l7.RequestData
  92. }
  93. type perfMapType uint8
  94. const (
  95. perfMapTypeProcEvents perfMapType = 1
  96. perfMapTypeTCPEvents perfMapType = 2
  97. perfMapTypeFileEvents perfMapType = 3
  98. perfMapTypeL7Events perfMapType = 4
  99. perfMapTypeSocketEvents perfMapType = 5
  100. )
  101. type Tracer struct {
  102. kernelVersion string
  103. disableL7Tracing bool
  104. collection *ebpf.Collection
  105. readers map[string]*perf.Reader
  106. links []link.Link
  107. uprobes map[string]*ebpf.Program
  108. }
  109. func NewTracer(kernelVersion string, disableL7Tracing bool) *Tracer {
  110. if disableL7Tracing {
  111. klog.Infoln("L7 tracing is disabled")
  112. }
  113. return &Tracer{
  114. kernelVersion: kernelVersion,
  115. disableL7Tracing: disableL7Tracing,
  116. readers: map[string]*perf.Reader{}, // TODO: readers和uprobes有什么区别呢?
  117. uprobes: map[string]*ebpf.Program{},
  118. }
  119. }
  120. func (t *Tracer) Run(events chan<- Event) error {
  121. if err := t.ebpf(events); err != nil {
  122. return err
  123. }
  124. if err := t.init(events); err != nil {
  125. return err
  126. }
  127. return nil
  128. }
  129. func (t *Tracer) Close() {
  130. for _, p := range t.uprobes {
  131. _ = p.Close()
  132. }
  133. for _, l := range t.links {
  134. _ = l.Close()
  135. }
  136. for _, r := range t.readers {
  137. _ = r.Close()
  138. }
  139. t.collection.Close()
  140. }
  141. func (t *Tracer) init(ch chan<- Event) error { // TODO: 初始化 -> 触发handleEvent...
  142. fmt.Println("===================init start ===========================")
  143. pids, err := proc.ListPids()
  144. if err != nil {
  145. return fmt.Errorf("failed to list pids: %w", err)
  146. }
  147. for _, pid := range pids {
  148. ch <- Event{Type: EventTypeProcessStart, Pid: pid} // TODO: 获取启动中的进程 -> chan
  149. }
  150. fds, sockets := readFds(pids)
  151. for _, fd := range fds {
  152. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd} // TODO: 获取打开的文件句柄 -> chan
  153. }
  154. listens := map[uint64]bool{}
  155. for _, s := range sockets {
  156. if s.Listen {
  157. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  158. }
  159. }
  160. for _, s := range sockets { // TODO: 获取打开的socket -> chan
  161. typ := EventTypeConnectionOpen
  162. if s.Listen {
  163. typ = EventTypeListenOpen
  164. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  165. continue
  166. }
  167. ch <- Event{
  168. Type: typ,
  169. Pid: s.pid,
  170. Fd: s.fd,
  171. SrcAddr: s.SAddr,
  172. DstAddr: s.DAddr,
  173. }
  174. }
  175. fmt.Println("===================init end===========================")
  176. return nil
  177. }
  178. type perfMap struct {
  179. name string
  180. perCPUBufferSizePages int
  181. typ perfMapType
  182. }
  183. func (t *Tracer) ebpf(ch chan<- Event) error {
  184. fmt.Println("======================ebpf start===========================")
  185. if _, ok := ebpfProg[runtime.GOARCH]; !ok { // TODO: 核心方法: 返回当前架构下支持的EBPF程序??????
  186. return fmt.Errorf("unsupported architecture: %s", runtime.GOARCH)
  187. }
  188. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  189. var prg []byte
  190. for _, p := range ebpfProg[runtime.GOARCH] {
  191. // fmt.Println(p)
  192. if semver.Compare(kv, p.v) >= 0 {
  193. prg = p.p
  194. break
  195. }
  196. }
  197. if len(prg) == 0 {
  198. return fmt.Errorf("unsupported kernel version: %s", t.kernelVersion)
  199. }
  200. _, debugFsErr := os.Stat("/sys/kernel/debug/tracing")
  201. _, traceFsErr := os.Stat("/sys/kernel/tracing")
  202. if debugFsErr != nil && traceFsErr != nil {
  203. return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted")
  204. }
  205. fmt.Println("ITEST:len(prg)", len(prg)) // ITEST:len(prg) 1933648
  206. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg)) // TODO: 核心方法...调用cilium库 -> 解析elf文件(parses an ELF file into a CollectionSpec.)
  207. fmt.Println("ITest:collectionSpec", collectionSpec) // ITest:collectionSpec &{map[.data:Array(keySize=4, valueSize=16, maxEntries=1, flags=0) .rodata:Array(keySize=4, valueSize=147, maxEntries=1, flags=128) .rodata.str1.1:Array(keySize=4, valueSize=236, maxEntries=1, flags=128) __active_read_args_map:Hash(keySize=8, valueSize=60, maxEntries=40960, flags=0) __active_write_args_map:Hash(keySize=8, valueSize=60, maxEntries=40960, flags=0) __adapt_kern_uid_map:Array(keySize=4, valueSize=8, maxEntries=1, flags=0) __allow_port_bitmap:Array(keySize=4, valueSize=8192, maxEntries=1, flags=0) __ctx_info:PerCPUArray(keySize=4, valueSize=176, maxEntries=1, flags=0) __data_buf:PerCPUArray(keySize=4, valueSize=32768, maxEntries=1, flags=0) __http2_stack:PerCPUArray(keySize=4, valueSize=8329, maxEntries=1, flags=0) __io_event_buffer:PerCPUArray(keySize=4, valueSize=80, maxEntries=1, flags=0) __members_offset:PerCPUArray(keySize=4, valueSize=80, maxEntries=1, flags=0) __progs_jmp_kp_map:ProgramArray(keySize=4, valueSize=4, maxEntries=2, flags=0) __progs_jmp_tp_map:ProgramArray(keySize=4, valueSize=4, maxEntries=3, flags=0) __protocol_filter:Array(keySize=4, valueSize=4, maxEntries=130, flags=0) __socket_data:PerfEventArray(keySize=4, valueSize=4, maxEntries=16, flags=0) __socket_info_map:Hash(keySize=8, valueSize=54, maxEntries=40960, flags=0) __trace_conf_map:PerCPUArray(keySize=4, valueSize=56, maxEntries=1, flags=0) __trace_map:Hash(keySize=16, valueSize=25, maxEntries=40960, flags=0) __trace_stats_map:Array(keySize=4, valueSize=16, maxEntries=1, flags=0) active_l7_requests:LRUHash(keySize=16, valueSize=1048, maxEntries=32768, flags=0) active_reads:Hash(keySize=8, valueSize=32, maxEntries=10240, flags=0) alloc_map:PerCPUHash(keySize=4, valueSize=8, maxEntries=50, flags=0) apm_span_context_map:LRUHash(keySize=16, valueSize=58, maxEntries=1, flags=0) connection_timestamps:LRUHash(keySize=16, valueSize=8, maxEntries=32768, flags=0) cw_parent_span_context_storage_map:PerCPUArray(keySize=4, valueSize=58, maxEntries=1, flags=0) events:PerfEventArray(keySize=0, valueSize=0, maxEntries=0, flags=0) fd_by_pid_tgid:Hash(keySize=8, valueSize=8, maxEntries=10240, flags=0) fd_trace_info_heap:LRUHash(keySize=8, valueSize=72, maxEntries=32768, flags=0) file_events:PerfEventArray(keySize=4, valueSize=4, maxEntries=0, flags=0) go_ancerstor_map:LRUHash(keySize=12, valueSize=8, maxEntries=40960, flags=0) go_rw_ts_map:LRUHash(keySize=12, valueSize=8, maxEntries=40960, flags=0) golang_mapbucket_storage_map:PerCPUArray(keySize=4, valueSize=336, maxEntries=1, flags=0) goroutines_map:Hash(keySize=8, valueSize=8, maxEntries=40960, flags=0) header_range:PerCPUArray(keySize=4, valueSize=100, maxEntries=50, flags=0) http2_tcp_seq_map:LRUHash(keySize=12, valueSize=4, maxEntries=40960, flags=0) http_client_uprobe_storage_map:PerCPUArray(keySize=4, valueSize=568, maxEntries=1, flags=0) http_events:Hash(keySize=8, valueSize=568, maxEntries=50, flags=0) http_server_uprobe_storage_map:PerCPUArray(keySize=4, valueSize=736, maxEntries=1, flags=0) http_server_uprobes:Hash(keySize=8, valueSize=736, maxEntries=50, flags=0) iovec_buf_heap:PerCPUArray(keySize=4, valueSize=2048, maxEntries=1, flags=0) l7_event_heap:PerCPUArray(keySize=4, valueSize=1088, maxEntries=1, flags=0) l7_events:PerfEventArray(keySize=4, valueSize=4, maxEntries=0, flags=0) l7_request_heap:PerCPUArray(keySize=4, valueSize=1048, maxEntries=1, flags=0) oom_info:Hash(keySize=4, valueSize=4, maxEntries=10240, flags=0) open_file_info:Hash(keySize=8, valueSize=4, maxEntries=10240, flags=0) parent_span_context_storage_map:PerCPUArray(keySize=4, valueSize=24, maxEntries=1, flags=0) pid_tgid_callerid_map:Hash(keySize=8, valueSize=16, maxEntries=40960, flags=0) proc_events:PerfEventArray(keySize=4, valueSize=4, maxEntries=0, flags=0) proc_info_map:Hash(keySize=4, valueSize=58, maxEntries=40960, flags=0) sk_info:Hash(keySize=8, valueSize=16, maxEntries=10240, flags=0) slice_array_buff_map:PerCPUArray(keySize=4, valueSize=1024, maxEntries=1, flags=0) tcp_connect_events:PerfEventArray(keySize=4, valueSize=4, maxEntries=0, flags=0) tcp_listen_events:PerfEventArray(keySize=4, valueSize=4, maxEntries=0, flags=0) tcp_retransmit_events:PerfEventArray(keySize=4, valueSize=4, maxEntries=0, flags=0) test_heap:PerCPUArray(keySize=4, valueSize=4, maxEntries=1, flags=0) trace_info_heap:LRUHash(keySize=16, valueSize=72, maxEntries=32768, flags=0) tracked_spans:Hash(keySize=8, valueSize=24, maxEntries=1000, flags=0) tracked_spans_by_sc:Hash(keySize=24, valueSize=8, maxEntries=1000, flags=0)] map[bpf_func_sched_process_exit:0xc00051a990 bpf_func_sched_process_fork:0xc0006ab950 bpf_func_sys_enter_close:0xc0006ab050 bpf_func_sys_enter_getppid:0xc00051b050 bpf_func_sys_enter_read:0xc0006ab7a0 bpf_func_sys_enter_recvfrom:0xc00051aab0 bpf_func_sys_enter_sendto:0xc0006ab9e0 bpf_func_sys_enter_write:0xc00051b0e0 bpf_func_sys_exit_read:0xc0006abdd0 bpf_func_sys_exit_readv:0xc00051aea0 bpf_func_sys_exit_recvfrom:0xc0006aaea0 bpf_func_sys_exit_recvmmsg:0xc0006ab830 bpf_func_sys_exit_recvmsg:0xc0006ab440 bpf_func_sys_exit_sendmmsg:0xc00051a7e0 bpf_func_sys_exit_sendmsg:0xc0006aaab0 bpf_func_sys_exit_sendto:0xc00051a750 bpf_func_sys_exit_socket:0xc00051abd0 bpf_func_sys_exit_write:0xc0006ab320 bpf_func_sys_exit_writev:0xc0006abb90 bpf_prog_kp__data_submit:0xc00051a510 bpf_prog_kp__output_data:0xc0006ab0e0 bpf_prog_tp__data_submit:0xc0006abcb0 bpf_prog_tp__io_event:0xc0006abd40 bpf_prog_tp__output_data:0xc00051b200 enter_runtime_newproc1:0xc00051afc0 exit_runtime_newproc1:0xc00051a360 inet_sock_set_state:0xc0006ab170 kprobe____sys_recvmmsg:0xc0006abb00 kprobe____sys_recvmsg:0xc0006ab3b0 kprobe____sys_sendmmsg:0xc00051ae10 kprobe____sys_sendmsg:0xc0006aacf0 kprobe__do_readv:0xc0006aafc0 kprobe__do_writev:0xc0006ab290 oom_mark_victim:0xc00051a870 runtime_execute:0xc00051b290 sched_process_exit:0xc00051a900 sys_enter_connect:0xc00051a5a0 sys_enter_open:0xc00051aa20 sys_enter_openat:0xc0006ab8c0 sys_enter_read:0xc0006ab710 sys_enter_readv:0xc00051acf0 sys_enter_recvfrom:0xc00051ab40 sys_enter_recvmsg:0xc0006aae10 sys_enter_sendmmsg:0xc00051a6c0 sys_enter_sendmsg:0xc0006abc20 sys_enter_sendto:0xc0006aba70 sys_enter_write:0xc00051b170 sys_enter_writev:0xc00051ac60 sys_exit_accept:0xc0006ab200 sys_exit_accept4:0xc0006aad80 sys_exit_connect:0xc0006abef0 sys_exit_open:0xc00051a3f0 sys_exit_openat:0xc0006ab680 sys_exit_read:0xc0006abe60 sys_exit_readv:0xc00051af30 sys_exit_recvfrom:0xc0006aaf30 sys_exit_recvmsg:0xc0006ab4d0 task_newtask:0xc0006aac60 tcp_retransmit_skb:0xc00051a630 uprobe_HandlerFunc_ServeHTTP:0xc0006aab40 uprobe_HandlerFunc_ServeHTTP_Returns:0xc0006aabd0 uprobe_Transport_roundTrip:0xc0006ab560 uprobe_Transport_roundTrip_Returns:0xc0006ab5f0] 0xc0007241e0 LittleEndian}
  208. if err != nil {
  209. return fmt.Errorf("failed to load collection spec: %w", err)
  210. }
  211. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  212. tracer.PidFilter(collectionSpec) // TODO: 是白名单 or 黑名单?
  213. opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)}
  214. for _, spec := range collectionSpec.Maps {
  215. fmt.Println("maps:", spec.Name) // maps: __protocol_filter|l7_request_heap
  216. }
  217. tracer.MapInit(collectionSpec, opts) // TODO: 暂时忽略???
  218. // TODO 多进程
  219. tracer.SetConstants(collectionSpec) // TODO: 暂时忽略
  220. c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts) // TODO: 后面t.collection = c....
  221. if err != nil {
  222. var verr *ebpf.VerifierError
  223. if errors.As(err, &verr) {
  224. klog.Errorf("%+v", verr)
  225. }
  226. return fmt.Errorf("failed to load collection: %w", err)
  227. }
  228. tracer.Offset() // TODO: 暂时忽略???
  229. t.collection = c
  230. perfMaps := []perfMap{ // TODO: 性能事件数据初始化......
  231. {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
  232. {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  233. {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  234. {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  235. {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
  236. }
  237. //fmt.Println(len(collectionSpec.Programs))
  238. //fmt.Println(len(c.Programs))
  239. tracer.MapInsert(c) // TODO: 暂时忽略???
  240. if !t.disableL7Tracing {
  241. perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
  242. }
  243. perfMaps = append(perfMaps, perfMap{name: tracer.MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64})
  244. fmt.Println("perfMaps start --")
  245. for _, pm := range perfMaps {
  246. fmt.Println(pm.name)
  247. m, ok := t.collection.Maps[pm.name]
  248. if ok {
  249. r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize()) // TODO: 核心方法, 创建reader读取性能事件...
  250. if err != nil {
  251. t.Close()
  252. return fmt.Errorf("failed to create ebpf reader: %w", err)
  253. }
  254. t.readers[pm.name] = r
  255. // event监听
  256. go runEventsReader(pm.name, r, ch, pm.typ) // TODO: 核心方法, 将接收到的性能事件数据放到ch中...
  257. }
  258. }
  259. fmt.Println("perfMaps end --")
  260. for _, programSpec := range collectionSpec.Programs { // TODO: 从elf文件中->Func表或者叫programes(使用cilium类库)
  261. // ITEST............ProgramSpec: uprobe_HandlerFunc_ServeHTTP Kprobe None HandlerFunc_ServeHTTP GPL
  262. // ITEST............Program: uprobe_HandlerFunc_ServeHTTP uprobe/HandlerFunc_ServeHTTP Kprobe Kprobe(uprobe_HandlerFunc_ServeHTTP)#114
  263. // ITEST............ProgramSpec: uprobe_HandlerFunc_ServeHTTP_Returns Kprobe None HandlerFunc_ServeHTTP GPL
  264. // ITEST............Program: uprobe_HandlerFunc_ServeHTTP_Returns uprobe/HandlerFunc_ServeHTTP Kprobe Kprobe(uprobe_HandlerFunc_ServeHTTP_Returns)#131
  265. fmt.Println("ITEST............ProgramSpec: ", programSpec.Name, programSpec.Type, programSpec.AttachType, programSpec.AttachTo, programSpec.License)
  266. //if strings.Contains(programSpec.Name, "HandlerFunc_ServeHTTP") {
  267. // fmt.Println("ITEST............ProgramSpec: ", programSpec.Instructions)
  268. //}
  269. program := t.collection.Programs[programSpec.Name] // TODO: 核心过程, 从tracer.uprobes中获取对应的program, 在哪来放进去的呢? ebpf.NewCollectionWithOptions(collectionSpec, *opts)
  270. fmt.Println("ITEST............Program: ", programSpec.Name, programSpec.SectionName, programSpec.Type, program) // 示例: bpf_func_sys_exit_readv tracepoint/syscalls/sys_exit_readv TracePoint | uprobe_HandlerFunc_ServeHTTP uprobe/HandlerFunc_ServeHTTP Kprobe
  271. if t.disableL7Tracing {
  272. switch programSpec.Name {
  273. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg":
  274. continue
  275. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  276. continue
  277. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  278. continue
  279. }
  280. }
  281. var l link.Link // TODO: Link是什么?Link represents a Program attached to a BPF hook.
  282. switch programSpec.Type {
  283. case ebpf.TracePoint: // TODO: 默认将所有trace point和kprobe进行attach??? 是不是需要一个大的黑名单...
  284. // fmt.Println("===========【tracepoint】: ", programSpec.Name, programSpec.SectionName, programSpec.Type)
  285. if strings.Contains(programSpec.SectionName, "prog") {
  286. continue
  287. }
  288. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  289. l, err = link.Tracepoint(parts[0], parts[1], program, nil) // TODO: attaches the given eBPF program to the tracepoint with the given group and name. example: tp, err := Tracepoint("syscalls", "sys_enter_fork", prog, nil)
  290. case ebpf.Kprobe:
  291. // fmt.Println("===========【kprobe】: ", programSpec.Name, programSpec.SectionName, programSpec.Type)
  292. if strings.HasPrefix(programSpec.SectionName, "uprobe/") { // TODO: 直接过滤uprobe...
  293. // fmt.Println("==============uprobe s")
  294. // fmt.Println(programSpec.Name, programSpec.SectionName, programSpec.Type) // uprobe_HandlerFunc_ServeHTTP uprobe/HandlerFunc_ServeHTTP Kprobe
  295. // fmt.Println("==============uprobe e")
  296. t.uprobes[programSpec.Name] = program // TODO: 重要逻辑,放到tracer的uprobes数组中
  297. continue
  298. }
  299. l, err = link.Kprobe(programSpec.AttachTo, program, nil) // TODO: attaches the given eBPF program to a perf event that fires when the given kernel symbol starts executing. example: kp, err := Kprobe("printk", prog, nil)
  300. }
  301. if err != nil {
  302. t.Close()
  303. return fmt.Errorf("failed to link program: %w", err)
  304. }
  305. t.links = append(t.links, l)
  306. }
  307. fmt.Println("======================ebpf end=================================")
  308. return nil
  309. }
  310. func (t EventType) String() string {
  311. switch t {
  312. case EventTypeProcessStart:
  313. return "process-start"
  314. case EventTypeProcessExit:
  315. return "process-exit"
  316. case EventTypeConnectionOpen:
  317. return "connection-open"
  318. case EventTypeConnectionClose:
  319. return "connection-close"
  320. case EventTypeConnectionError:
  321. return "connection-error"
  322. case EventTypeListenOpen:
  323. return "listen-open"
  324. case EventTypeListenClose:
  325. return "listen-close"
  326. case EventTypeFileOpen:
  327. return "file-open"
  328. case EventTypeTCPRetransmit:
  329. return "tcp-retransmit"
  330. case EventTypeL7Request:
  331. return "l7-request"
  332. }
  333. return "unknown: " + strconv.Itoa(int(t))
  334. }
  335. func (t EventReason) String() string {
  336. switch t {
  337. case EventReasonNone:
  338. return "none"
  339. case EventReasonOOMKill:
  340. return "oom-kill"
  341. }
  342. return "unknown: " + strconv.Itoa(int(t))
  343. }
  344. type procEvent struct {
  345. Type EventType
  346. Pid uint32
  347. Reason uint32
  348. }
  349. type tcpEvent struct {
  350. Fd uint64
  351. Timestamp uint64
  352. Type EventType
  353. Pid uint32
  354. SPort uint16
  355. DPort uint16
  356. SAddr [16]byte
  357. DAddr [16]byte
  358. }
  359. type fileEvent struct {
  360. Type EventType
  361. Pid uint32
  362. Fd uint64
  363. }
  364. type l7Event struct {
  365. Fd uint64
  366. ConnectionTimestamp uint64
  367. Pid uint32
  368. Status uint32
  369. Duration uint64
  370. Protocol uint8
  371. Method uint8
  372. Padding uint16
  373. StatementId uint32
  374. PayloadSize uint64
  375. TraceId uint64
  376. TraceStart uint32
  377. TraceEnd uint32
  378. }
  379. type SocketDataBufferddd struct {
  380. EventsNum uint32
  381. Len uint32
  382. Data [32760]byte
  383. }
  384. const (
  385. TASK_COMM_LEN = 16
  386. BURST_DATA_BUF_SIZE = 8192
  387. )
  388. type Tuple struct {
  389. Daddr [16]uint8
  390. RcvSaddr [16]uint8
  391. AddrLen uint8
  392. L4Protocol uint8
  393. Dport uint16
  394. Num uint16
  395. }
  396. type SocketDatadddd struct {
  397. Pid uint32 // 表示线程号 如果'pid == tgid'表示一个进程, 否则是线程
  398. Tgid uint32 // 进程号
  399. CoroutineID uint64
  400. Source uint8
  401. Comm [TASK_COMM_LEN]byte
  402. SocketID uint64
  403. Tuple Tuple
  404. ExtraData uint32
  405. ExtraDataCount uint32
  406. TcpSeq uint32
  407. ThreadTraceID uint64
  408. Timestamp uint64
  409. Direction uint8
  410. MsgType uint8
  411. SyscallLen uint64
  412. DataSeq uint64
  413. DataType uint16
  414. DataLen uint16
  415. Data [BURST_DATA_BUF_SIZE]byte
  416. }
  417. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
  418. for {
  419. rec, err := r.Read() // TODO: 阻塞读取来自EBPF程序的性能事件数据...
  420. if err != nil {
  421. if errors.Is(err, perf.ErrClosed) {
  422. break
  423. }
  424. continue
  425. }
  426. if rec.LostSamples > 0 {
  427. klog.Errorln(name, "lost samples:", rec.LostSamples)
  428. continue
  429. }
  430. var event Event
  431. fmt.Println("---------......runEventsReader read.......-----------rec.typ", typ)
  432. // fmt.Println(rec)
  433. switch typ {
  434. case perfMapTypeSocketEvents:
  435. //fmt.Println("perfMapTypeSocketEvents")
  436. //// 假设 rec.RawSample 包含数据,类型为 []byte
  437. //rawData := rec.RawSample
  438. //fmt.Println("perfMapTypeSocketEvents2")
  439. //
  440. //// 创建一个 SocketDataBuffer 结构体实例
  441. //var buffer SocketDataBuffer
  442. //
  443. //// 创建一个字节缓冲区,并将数据填充到其中
  444. //reader := bytes.NewReader(rawData)
  445. //fmt.Println("perfMapTypeSocketEvents3")
  446. //fmt.Println(len(rawData))
  447. //// 使用 binary.Read 函数读取数据并解析为 SocketDataBuffer 结构体实例
  448. //if err := binary.Read(reader, binary.LittleEndian, &buffer); err != nil {
  449. // fmt.Println(reader.Len())
  450. // fmt.Println("Failed to read data:", err)
  451. // continue
  452. //}
  453. //fmt.Println("perfMapTypeSocketEvents4")
  454. //
  455. //// 打印解析后的数据
  456. //fmt.Println("EventsNum:", buffer.EventsNum)
  457. //fmt.Println("Len:", buffer.Len)
  458. //
  459. //// 打印 char data 的内容
  460. //fmt.Printf("Data: %s\n", string(buffer.Data[:buffer.Len])) // 仅打印实际长度的数据
  461. //socketDataBuffer := rec.RawSample
  462. /*todo */
  463. //buf := (*SocketDataBuffer)(unsafe.Pointer(&rec.RawSample[0])) //nolint:gosec
  464. //socketData := (*SocketData)(unsafe.Pointer(&buf.data[0])) //nolint:gosec
  465. /*todo */
  466. //socketData := (*(*[128]byte)(unsafe.Pointer(&eventC.line)))
  467. //dataPtr := unsafe.Pointer(&buf.data[0])
  468. //socketData := (*SocketData)(dataPtr)
  469. //reader2 := bytes.NewBuffer(rec.RawSample)
  470. // 222222
  471. //fmt.Println("socketData.Pid:", socketData.pid)
  472. //fmt.Println("socketData.Tgid:", socketData.tgid)
  473. //fmt.Println("socketData.CoroutineID:", socketData.coroutine_id)
  474. //fmt.Println("socketData.Source:", socketData.source)
  475. //
  476. //fmt.Printf("socketData.Comm: %s \n", socketData.comm)
  477. //fmt.Printf("socketData.SocketID :%v \n", socketData.socket_id)
  478. //fmt.Println("socketData.Tuple:", socketData.Tuple)
  479. //fmt.Println("socketData.ExtraData:", socketData.ExtraData)
  480. //fmt.Println("socketData.ExtraDataCount:", socketData.ExtraDataCount)
  481. //fmt.Println("socketData.TCPSeq:", socketData.TcpSeq)
  482. //fmt.Println("socketData.ThreadTraceID:", socketData.ThreadTraceID)
  483. //fmt.Println("socketData.Timestamp:", socketData.Timestamp)
  484. //fmt.Println("socketData.Direction:", socketData.Direction)
  485. //fmt.Println("socketData.MsgType:", socketData.MsgType)
  486. //fmt.Println("socketData.SyscallLen:", socketData.SyscallLen)
  487. //fmt.Println("socketData.DataSeq:", socketData.DataSeq)
  488. // todo
  489. // fmt.Printf("socketData.DataType:%d \n", (socketData.data_type))
  490. // fmt.Printf("socketData.DataLen:%d \n", (socketData.data_len))
  491. //fmt.Println("socketData.Data:", len(socketData.Data))
  492. //socketData := &SocketData{}
  493. //reader := bytes.NewBuffer(rec.RawSample)
  494. //if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  495. // klog.Warningln("failed1 to read msg:", err)
  496. // continue
  497. //}
  498. //
  499. //var data []byte
  500. //payload := reader.Bytes()
  501. //switch {
  502. //case v.Len == 0:
  503. //case v.Len > 32760:
  504. // data = payload[:32760]
  505. //default:
  506. // data = payload[:v.Len]
  507. //}
  508. //////data2 := data[:v.Len]
  509. ////fmt.Println("perfMapTypeSocketEvents")
  510. //fmt.Println(v.EventsNum)
  511. //fmt.Println(v.Len)
  512. //fmt.Println(string(data))
  513. //
  514. //var data2 SocketData
  515. //reader2 := bytes.NewBuffer(data)
  516. //if err := binary.Read(reader2, binary.LittleEndian, data2); err != nil {
  517. // klog.Warningln("failed2 to read msg:", err)
  518. // continue
  519. //}
  520. //
  521. //fmt.Println(data2.Pid)
  522. //fmt.Println(data2.Tgid)
  523. //fmt.Println(string(v.Data))
  524. //continue
  525. case perfMapTypeL7Events:
  526. fmt.Println("----------perfMapTypeL7Events--------") //----------perfMapTypeL7Events--------
  527. v := &l7Event{}
  528. reader := bytes.NewBuffer(rec.RawSample)
  529. if err := binary.Read(reader, binary.LittleEndian, v); err != nil { // TODO: binary.Read: 将byte -> l7Event......
  530. klog.Warningln("failed to read msg:", err)
  531. continue
  532. }
  533. payload := reader.Bytes()
  534. req := &l7.RequestData{
  535. Protocol: l7.Protocol(v.Protocol),
  536. Status: l7.Status(v.Status),
  537. Duration: time.Duration(v.Duration),
  538. Method: l7.Method(v.Method),
  539. StatementId: v.StatementId,
  540. TraceId: v.TraceId,
  541. TraceStart: v.TraceStart,
  542. TraceEnd: v.TraceEnd,
  543. }
  544. switch {
  545. case v.PayloadSize == 0:
  546. case v.PayloadSize > MaxPayloadSize:
  547. req.Payload = payload[:MaxPayloadSize]
  548. default:
  549. req.Payload = payload[:v.PayloadSize]
  550. }
  551. fmt.Println("==========")
  552. // 界面上访问: http://10.0.6.102:1010/
  553. //req.Payload: GET / HTTP/1.1
  554. //Host: 10.0.6.102:1010
  555. //Connection: keep-alive
  556. //Cache-Control: max-age=0
  557. //Upgrade-Insecure-Requests: 1
  558. //User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36
  559. //Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
  560. //Accept-Encoding: gzip, deflate
  561. //Accept-Language: zh-CN,zh;q=0.9
  562. fmt.Println("req.Payload:", string(req.Payload))
  563. fmt.Println("==========")
  564. event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
  565. case perfMapTypeFileEvents:
  566. fmt.Println("----------perfMapTypeFileEvents--------")
  567. v := &fileEvent{}
  568. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  569. klog.Warningln("failed to read msg:", err)
  570. continue
  571. }
  572. event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
  573. case perfMapTypeProcEvents:
  574. fmt.Println("----------perfMapTypeProcEvents--------")
  575. v := &procEvent{}
  576. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  577. klog.Warningln("failed to read msg:", err)
  578. continue
  579. }
  580. event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
  581. case perfMapTypeTCPEvents:
  582. v := &tcpEvent{}
  583. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  584. klog.Warningln("failed to read msg:", err)
  585. continue
  586. }
  587. fmt.Println("----------perfMapTypeTCPEvents--------")
  588. event = Event{
  589. Type: v.Type,
  590. Pid: v.Pid,
  591. SrcAddr: ipPort(v.SAddr, v.SPort),
  592. DstAddr: ipPort(v.DAddr, v.DPort),
  593. Fd: v.Fd,
  594. Timestamp: v.Timestamp,
  595. }
  596. default:
  597. continue
  598. }
  599. ch <- event
  600. }
  601. }
  602. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  603. i, _ := netaddr.FromStdIP(ip[:])
  604. return netaddr.IPPortFrom(i, port)
  605. }