| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663 |
- package ebpftracer
- import (
- "bytes"
- "encoding/binary"
- "errors"
- "fmt"
- "github.com/cilium/ebpf"
- "github.com/cilium/ebpf/link"
- "github.com/cilium/ebpf/perf"
- "github.com/coroot/coroot-node-agent/common"
- "github.com/coroot/coroot-node-agent/ebpftracer/l7"
- "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
- "github.com/coroot/coroot-node-agent/proc"
- "golang.org/x/mod/semver"
- "golang.org/x/sys/unix"
- "inet.af/netaddr"
- "k8s.io/klog/v2"
- "os"
- "runtime"
- "strconv"
- "strings"
- "time"
- )
- /*
- #define TASK_COMM_LEN 16
- #define BURST_DATA_BUF_SIZE 8192 // For brust send buffer
- #include <linux/types.h>
- struct __tuple_t {
- __u8 daddr[16];
- __u8 rcv_saddr[16];
- __u8 addr_len;
- __u8 l4_protocol;
- __u16 dport;
- __u16 num;
- };
- struct __socket_data {
- __u32 pid;
- __u32 tgid;
- __u64 coroutine_id;
- __u8 source;
- __u8 comm[TASK_COMM_LEN];
- __u64 socket_id;
- struct __tuple_t tuple;
- __u32 extra_data;
- __u32 extra_data_count;
- __u32 tcp_seq;
- __u64 thread_trace_id;
- __u64 timestamp;
- __u8 direction: 1;
- __u8 msg_type: 7;
- __u64 syscall_len;
- __u64 data_seq;
- __u16 data_type;
- __u16 data_len;
- char data[BURST_DATA_BUF_SIZE];
- } __attribute__((packed));
- struct __socket_data_buffer {
- __u32 events_num;
- __u32 len;
- char data[32760];
- };
- */
- import "C"
- type SocketData C.struct___socket_data
- type SocketDataBuffer C.struct___socket_data_buffer
- const MaxPayloadSize = 1024
- type EventType uint32
- type EventReason uint32
- const (
- EventTypeProcessStart EventType = 1
- EventTypeProcessExit EventType = 2
- EventTypeConnectionOpen EventType = 3
- EventTypeConnectionClose EventType = 4
- EventTypeConnectionError EventType = 5
- EventTypeListenOpen EventType = 6
- EventTypeListenClose EventType = 7
- EventTypeFileOpen EventType = 8
- EventTypeTCPRetransmit EventType = 9
- EventTypeL7Request EventType = 10
- EventReasonNone EventReason = 0
- EventReasonOOMKill EventReason = 1
- )
- type Event struct {
- Type EventType
- Reason EventReason
- Pid uint32
- SrcAddr netaddr.IPPort
- DstAddr netaddr.IPPort
- Fd uint64
- Timestamp uint64
- L7Request *l7.RequestData
- }
- type perfMapType uint8
- const (
- perfMapTypeProcEvents perfMapType = 1
- perfMapTypeTCPEvents perfMapType = 2
- perfMapTypeFileEvents perfMapType = 3
- perfMapTypeL7Events perfMapType = 4
- perfMapTypeSocketEvents perfMapType = 5
- )
- type Tracer struct {
- kernelVersion string
- disableL7Tracing bool
- collection *ebpf.Collection
- readers map[string]*perf.Reader
- links []link.Link
- uprobes map[string]*ebpf.Program
- }
- func NewTracer(kernelVersion string, disableL7Tracing bool) *Tracer {
- if disableL7Tracing {
- klog.Infoln("L7 tracing is disabled")
- }
- return &Tracer{
- kernelVersion: kernelVersion,
- disableL7Tracing: disableL7Tracing,
- readers: map[string]*perf.Reader{}, // TODO: readers和uprobes有什么区别呢?
- uprobes: map[string]*ebpf.Program{},
- }
- }
- func (t *Tracer) Run(events chan<- Event) error {
- if err := t.ebpf(events); err != nil {
- return err
- }
- if err := t.init(events); err != nil {
- return err
- }
- return nil
- }
- func (t *Tracer) Close() {
- for _, p := range t.uprobes {
- _ = p.Close()
- }
- for _, l := range t.links {
- _ = l.Close()
- }
- for _, r := range t.readers {
- _ = r.Close()
- }
- t.collection.Close()
- }
- func (t *Tracer) init(ch chan<- Event) error { // TODO: 初始化 -> 触发handleEvent...
- fmt.Println("===================init start ===========================")
- pids, err := proc.ListPids()
- if err != nil {
- return fmt.Errorf("failed to list pids: %w", err)
- }
- for _, pid := range pids {
- ch <- Event{Type: EventTypeProcessStart, Pid: pid} // TODO: 获取启动中的进程 -> chan
- }
- fds, sockets := readFds(pids)
- for _, fd := range fds {
- ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd} // TODO: 获取打开的文件句柄 -> chan
- }
- listens := map[uint64]bool{}
- for _, s := range sockets {
- if s.Listen {
- listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
- }
- }
- for _, s := range sockets { // TODO: 获取打开的socket -> chan
- typ := EventTypeConnectionOpen
- if s.Listen {
- typ = EventTypeListenOpen
- } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
- continue
- }
- ch <- Event{
- Type: typ,
- Pid: s.pid,
- Fd: s.fd,
- SrcAddr: s.SAddr,
- DstAddr: s.DAddr,
- }
- }
- fmt.Println("===================init end===========================")
- return nil
- }
- type perfMap struct {
- name string
- perCPUBufferSizePages int
- typ perfMapType
- }
- func (t *Tracer) ebpf(ch chan<- Event) error {
- fmt.Println("======================ebpf start===========================")
- if _, ok := ebpfProg[runtime.GOARCH]; !ok { // TODO: 核心方法: 返回当前架构下支持的EBPF程序??????
- return fmt.Errorf("unsupported architecture: %s", runtime.GOARCH)
- }
- kv := "v" + common.KernelMajorMinor(t.kernelVersion)
- var prg []byte
- for _, p := range ebpfProg[runtime.GOARCH] {
- // fmt.Println(p)
- if semver.Compare(kv, p.v) >= 0 {
- prg = p.p
- break
- }
- }
- if len(prg) == 0 {
- return fmt.Errorf("unsupported kernel version: %s", t.kernelVersion)
- }
- _, debugFsErr := os.Stat("/sys/kernel/debug/tracing")
- _, traceFsErr := os.Stat("/sys/kernel/tracing")
- if debugFsErr != nil && traceFsErr != nil {
- return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted")
- }
- fmt.Println("ITEST:len(prg)", len(prg)) // ITEST:len(prg) 1933648
- collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg)) // TODO: 核心方法...调用cilium库 -> 解析elf文件(parses an ELF file into a CollectionSpec.)
- fmt.Println("ITest:collectionSpec", collectionSpec) // ITest:collectionSpec &{map[.data:Array(keySize=4, valueSize=16, maxEntries=1, flags=0) .rodata:Array(keySize=4, valueSize=147, maxEntries=1, flags=128) .rodata.str1.1:Array(keySize=4, valueSize=236, maxEntries=1, flags=128) __active_read_args_map:Hash(keySize=8, valueSize=60, maxEntries=40960, flags=0) __active_write_args_map:Hash(keySize=8, valueSize=60, maxEntries=40960, flags=0) __adapt_kern_uid_map:Array(keySize=4, valueSize=8, maxEntries=1, flags=0) __allow_port_bitmap:Array(keySize=4, valueSize=8192, maxEntries=1, flags=0) __ctx_info:PerCPUArray(keySize=4, valueSize=176, maxEntries=1, flags=0) __data_buf:PerCPUArray(keySize=4, valueSize=32768, maxEntries=1, flags=0) __http2_stack:PerCPUArray(keySize=4, valueSize=8329, maxEntries=1, flags=0) __io_event_buffer:PerCPUArray(keySize=4, valueSize=80, maxEntries=1, flags=0) __members_offset:PerCPUArray(keySize=4, valueSize=80, maxEntries=1, flags=0) __progs_jmp_kp_map:ProgramArray(keySize=4, valueSize=4, maxEntries=2, flags=0) __progs_jmp_tp_map:ProgramArray(keySize=4, valueSize=4, maxEntries=3, flags=0) __protocol_filter:Array(keySize=4, valueSize=4, maxEntries=130, flags=0) __socket_data:PerfEventArray(keySize=4, valueSize=4, maxEntries=16, flags=0) __socket_info_map:Hash(keySize=8, valueSize=54, maxEntries=40960, flags=0) __trace_conf_map:PerCPUArray(keySize=4, valueSize=56, maxEntries=1, flags=0) __trace_map:Hash(keySize=16, valueSize=25, maxEntries=40960, flags=0) __trace_stats_map:Array(keySize=4, valueSize=16, maxEntries=1, flags=0) active_l7_requests:LRUHash(keySize=16, valueSize=1048, maxEntries=32768, flags=0) active_reads:Hash(keySize=8, valueSize=32, maxEntries=10240, flags=0) alloc_map:PerCPUHash(keySize=4, valueSize=8, maxEntries=50, flags=0) apm_span_context_map:LRUHash(keySize=16, valueSize=58, maxEntries=1, flags=0) connection_timestamps:LRUHash(keySize=16, valueSize=8, maxEntries=32768, flags=0) cw_parent_span_context_storage_map:PerCPUArray(keySize=4, valueSize=58, maxEntries=1, flags=0) events:PerfEventArray(keySize=0, valueSize=0, maxEntries=0, flags=0) fd_by_pid_tgid:Hash(keySize=8, valueSize=8, maxEntries=10240, flags=0) fd_trace_info_heap:LRUHash(keySize=8, valueSize=72, maxEntries=32768, flags=0) file_events:PerfEventArray(keySize=4, valueSize=4, maxEntries=0, flags=0) go_ancerstor_map:LRUHash(keySize=12, valueSize=8, maxEntries=40960, flags=0) go_rw_ts_map:LRUHash(keySize=12, valueSize=8, maxEntries=40960, flags=0) golang_mapbucket_storage_map:PerCPUArray(keySize=4, valueSize=336, maxEntries=1, flags=0) goroutines_map:Hash(keySize=8, valueSize=8, maxEntries=40960, flags=0) header_range:PerCPUArray(keySize=4, valueSize=100, maxEntries=50, flags=0) http2_tcp_seq_map:LRUHash(keySize=12, valueSize=4, maxEntries=40960, flags=0) http_client_uprobe_storage_map:PerCPUArray(keySize=4, valueSize=568, maxEntries=1, flags=0) http_events:Hash(keySize=8, valueSize=568, maxEntries=50, flags=0) http_server_uprobe_storage_map:PerCPUArray(keySize=4, valueSize=736, maxEntries=1, flags=0) http_server_uprobes:Hash(keySize=8, valueSize=736, maxEntries=50, flags=0) iovec_buf_heap:PerCPUArray(keySize=4, valueSize=2048, maxEntries=1, flags=0) l7_event_heap:PerCPUArray(keySize=4, valueSize=1088, maxEntries=1, flags=0) l7_events:PerfEventArray(keySize=4, valueSize=4, maxEntries=0, flags=0) l7_request_heap:PerCPUArray(keySize=4, valueSize=1048, maxEntries=1, flags=0) oom_info:Hash(keySize=4, valueSize=4, maxEntries=10240, flags=0) open_file_info:Hash(keySize=8, valueSize=4, maxEntries=10240, flags=0) parent_span_context_storage_map:PerCPUArray(keySize=4, valueSize=24, maxEntries=1, flags=0) pid_tgid_callerid_map:Hash(keySize=8, valueSize=16, maxEntries=40960, flags=0) proc_events:PerfEventArray(keySize=4, valueSize=4, maxEntries=0, flags=0) proc_info_map:Hash(keySize=4, valueSize=58, maxEntries=40960, flags=0) sk_info:Hash(keySize=8, valueSize=16, maxEntries=10240, flags=0) slice_array_buff_map:PerCPUArray(keySize=4, valueSize=1024, maxEntries=1, flags=0) tcp_connect_events:PerfEventArray(keySize=4, valueSize=4, maxEntries=0, flags=0) tcp_listen_events:PerfEventArray(keySize=4, valueSize=4, maxEntries=0, flags=0) tcp_retransmit_events:PerfEventArray(keySize=4, valueSize=4, maxEntries=0, flags=0) test_heap:PerCPUArray(keySize=4, valueSize=4, maxEntries=1, flags=0) trace_info_heap:LRUHash(keySize=16, valueSize=72, maxEntries=32768, flags=0) tracked_spans:Hash(keySize=8, valueSize=24, maxEntries=1000, flags=0) tracked_spans_by_sc:Hash(keySize=24, valueSize=8, maxEntries=1000, flags=0)] map[bpf_func_sched_process_exit:0xc00051a990 bpf_func_sched_process_fork:0xc0006ab950 bpf_func_sys_enter_close:0xc0006ab050 bpf_func_sys_enter_getppid:0xc00051b050 bpf_func_sys_enter_read:0xc0006ab7a0 bpf_func_sys_enter_recvfrom:0xc00051aab0 bpf_func_sys_enter_sendto:0xc0006ab9e0 bpf_func_sys_enter_write:0xc00051b0e0 bpf_func_sys_exit_read:0xc0006abdd0 bpf_func_sys_exit_readv:0xc00051aea0 bpf_func_sys_exit_recvfrom:0xc0006aaea0 bpf_func_sys_exit_recvmmsg:0xc0006ab830 bpf_func_sys_exit_recvmsg:0xc0006ab440 bpf_func_sys_exit_sendmmsg:0xc00051a7e0 bpf_func_sys_exit_sendmsg:0xc0006aaab0 bpf_func_sys_exit_sendto:0xc00051a750 bpf_func_sys_exit_socket:0xc00051abd0 bpf_func_sys_exit_write:0xc0006ab320 bpf_func_sys_exit_writev:0xc0006abb90 bpf_prog_kp__data_submit:0xc00051a510 bpf_prog_kp__output_data:0xc0006ab0e0 bpf_prog_tp__data_submit:0xc0006abcb0 bpf_prog_tp__io_event:0xc0006abd40 bpf_prog_tp__output_data:0xc00051b200 enter_runtime_newproc1:0xc00051afc0 exit_runtime_newproc1:0xc00051a360 inet_sock_set_state:0xc0006ab170 kprobe____sys_recvmmsg:0xc0006abb00 kprobe____sys_recvmsg:0xc0006ab3b0 kprobe____sys_sendmmsg:0xc00051ae10 kprobe____sys_sendmsg:0xc0006aacf0 kprobe__do_readv:0xc0006aafc0 kprobe__do_writev:0xc0006ab290 oom_mark_victim:0xc00051a870 runtime_execute:0xc00051b290 sched_process_exit:0xc00051a900 sys_enter_connect:0xc00051a5a0 sys_enter_open:0xc00051aa20 sys_enter_openat:0xc0006ab8c0 sys_enter_read:0xc0006ab710 sys_enter_readv:0xc00051acf0 sys_enter_recvfrom:0xc00051ab40 sys_enter_recvmsg:0xc0006aae10 sys_enter_sendmmsg:0xc00051a6c0 sys_enter_sendmsg:0xc0006abc20 sys_enter_sendto:0xc0006aba70 sys_enter_write:0xc00051b170 sys_enter_writev:0xc00051ac60 sys_exit_accept:0xc0006ab200 sys_exit_accept4:0xc0006aad80 sys_exit_connect:0xc0006abef0 sys_exit_open:0xc00051a3f0 sys_exit_openat:0xc0006ab680 sys_exit_read:0xc0006abe60 sys_exit_readv:0xc00051af30 sys_exit_recvfrom:0xc0006aaf30 sys_exit_recvmsg:0xc0006ab4d0 task_newtask:0xc0006aac60 tcp_retransmit_skb:0xc00051a630 uprobe_HandlerFunc_ServeHTTP:0xc0006aab40 uprobe_HandlerFunc_ServeHTTP_Returns:0xc0006aabd0 uprobe_Transport_roundTrip:0xc0006ab560 uprobe_Transport_roundTrip_Returns:0xc0006ab5f0] 0xc0007241e0 LittleEndian}
- if err != nil {
- return fmt.Errorf("failed to load collection spec: %w", err)
- }
- _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
- tracer.PidFilter(collectionSpec) // TODO: 是白名单 or 黑名单?
- opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)}
- for _, spec := range collectionSpec.Maps {
- fmt.Println("maps:", spec.Name) // maps: __protocol_filter|l7_request_heap
- }
- tracer.MapInit(collectionSpec, opts) // TODO: 暂时忽略???
- // TODO 多进程
- tracer.SetConstants(collectionSpec) // TODO: 暂时忽略
- c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts) // TODO: 后面t.collection = c....
- if err != nil {
- var verr *ebpf.VerifierError
- if errors.As(err, &verr) {
- klog.Errorf("%+v", verr)
- }
- return fmt.Errorf("failed to load collection: %w", err)
- }
- tracer.Offset() // TODO: 暂时忽略???
- t.collection = c
- perfMaps := []perfMap{ // TODO: 性能事件数据初始化......
- {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
- {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
- {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
- {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
- {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
- }
- //fmt.Println(len(collectionSpec.Programs))
- //fmt.Println(len(c.Programs))
- tracer.MapInsert(c) // TODO: 暂时忽略???
- if !t.disableL7Tracing {
- perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
- }
- perfMaps = append(perfMaps, perfMap{name: tracer.MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64})
- fmt.Println("perfMaps start --")
- for _, pm := range perfMaps {
- fmt.Println(pm.name)
- m, ok := t.collection.Maps[pm.name]
- if ok {
- r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize()) // TODO: 核心方法, 创建reader读取性能事件...
- if err != nil {
- t.Close()
- return fmt.Errorf("failed to create ebpf reader: %w", err)
- }
- t.readers[pm.name] = r
- // event监听
- go runEventsReader(pm.name, r, ch, pm.typ) // TODO: 核心方法, 将接收到的性能事件数据放到ch中...
- }
- }
- fmt.Println("perfMaps end --")
- for _, programSpec := range collectionSpec.Programs { // TODO: 从elf文件中->Func表或者叫programes(使用cilium类库)
- // ITEST............ProgramSpec: uprobe_HandlerFunc_ServeHTTP Kprobe None HandlerFunc_ServeHTTP GPL
- // ITEST............Program: uprobe_HandlerFunc_ServeHTTP uprobe/HandlerFunc_ServeHTTP Kprobe Kprobe(uprobe_HandlerFunc_ServeHTTP)#114
- // ITEST............ProgramSpec: uprobe_HandlerFunc_ServeHTTP_Returns Kprobe None HandlerFunc_ServeHTTP GPL
- // ITEST............Program: uprobe_HandlerFunc_ServeHTTP_Returns uprobe/HandlerFunc_ServeHTTP Kprobe Kprobe(uprobe_HandlerFunc_ServeHTTP_Returns)#131
- fmt.Println("ITEST............ProgramSpec: ", programSpec.Name, programSpec.Type, programSpec.AttachType, programSpec.AttachTo, programSpec.License)
- //if strings.Contains(programSpec.Name, "HandlerFunc_ServeHTTP") {
- // fmt.Println("ITEST............ProgramSpec: ", programSpec.Instructions)
- //}
- program := t.collection.Programs[programSpec.Name] // TODO: 核心过程, 从tracer.uprobes中获取对应的program, 在哪来放进去的呢? ebpf.NewCollectionWithOptions(collectionSpec, *opts)
- fmt.Println("ITEST............Program: ", programSpec.Name, programSpec.SectionName, programSpec.Type, program) // 示例: bpf_func_sys_exit_readv tracepoint/syscalls/sys_exit_readv TracePoint | uprobe_HandlerFunc_ServeHTTP uprobe/HandlerFunc_ServeHTTP Kprobe
- if t.disableL7Tracing {
- switch programSpec.Name {
- case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg":
- continue
- case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
- continue
- case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
- continue
- }
- }
- var l link.Link // TODO: Link是什么?Link represents a Program attached to a BPF hook.
- switch programSpec.Type {
- case ebpf.TracePoint: // TODO: 默认将所有trace point和kprobe进行attach??? 是不是需要一个大的黑名单...
- // fmt.Println("===========【tracepoint】: ", programSpec.Name, programSpec.SectionName, programSpec.Type)
- if strings.Contains(programSpec.SectionName, "prog") {
- continue
- }
- parts := strings.SplitN(programSpec.AttachTo, "/", 2)
- l, err = link.Tracepoint(parts[0], parts[1], program, nil) // TODO: attaches the given eBPF program to the tracepoint with the given group and name. example: tp, err := Tracepoint("syscalls", "sys_enter_fork", prog, nil)
- case ebpf.Kprobe:
- // fmt.Println("===========【kprobe】: ", programSpec.Name, programSpec.SectionName, programSpec.Type)
- if strings.HasPrefix(programSpec.SectionName, "uprobe/") { // TODO: 直接过滤uprobe...
- // fmt.Println("==============uprobe s")
- // fmt.Println(programSpec.Name, programSpec.SectionName, programSpec.Type) // uprobe_HandlerFunc_ServeHTTP uprobe/HandlerFunc_ServeHTTP Kprobe
- // fmt.Println("==============uprobe e")
- t.uprobes[programSpec.Name] = program // TODO: 重要逻辑,放到tracer的uprobes数组中
- continue
- }
- l, err = link.Kprobe(programSpec.AttachTo, program, nil) // TODO: attaches the given eBPF program to a perf event that fires when the given kernel symbol starts executing. example: kp, err := Kprobe("printk", prog, nil)
- }
- if err != nil {
- t.Close()
- return fmt.Errorf("failed to link program: %w", err)
- }
- t.links = append(t.links, l)
- }
- fmt.Println("======================ebpf end=================================")
- return nil
- }
- func (t EventType) String() string {
- switch t {
- case EventTypeProcessStart:
- return "process-start"
- case EventTypeProcessExit:
- return "process-exit"
- case EventTypeConnectionOpen:
- return "connection-open"
- case EventTypeConnectionClose:
- return "connection-close"
- case EventTypeConnectionError:
- return "connection-error"
- case EventTypeListenOpen:
- return "listen-open"
- case EventTypeListenClose:
- return "listen-close"
- case EventTypeFileOpen:
- return "file-open"
- case EventTypeTCPRetransmit:
- return "tcp-retransmit"
- case EventTypeL7Request:
- return "l7-request"
- }
- return "unknown: " + strconv.Itoa(int(t))
- }
- func (t EventReason) String() string {
- switch t {
- case EventReasonNone:
- return "none"
- case EventReasonOOMKill:
- return "oom-kill"
- }
- return "unknown: " + strconv.Itoa(int(t))
- }
- type procEvent struct {
- Type EventType
- Pid uint32
- Reason uint32
- }
- type tcpEvent struct {
- Fd uint64
- Timestamp uint64
- Type EventType
- Pid uint32
- SPort uint16
- DPort uint16
- SAddr [16]byte
- DAddr [16]byte
- }
- type fileEvent struct {
- Type EventType
- Pid uint32
- Fd uint64
- }
- type l7Event struct {
- Fd uint64
- ConnectionTimestamp uint64
- Pid uint32
- Status uint32
- Duration uint64
- Protocol uint8
- Method uint8
- Padding uint16
- StatementId uint32
- PayloadSize uint64
- TraceId uint64
- TraceStart uint32
- TraceEnd uint32
- }
- type SocketDataBufferddd struct {
- EventsNum uint32
- Len uint32
- Data [32760]byte
- }
- const (
- TASK_COMM_LEN = 16
- BURST_DATA_BUF_SIZE = 8192
- )
- type Tuple struct {
- Daddr [16]uint8
- RcvSaddr [16]uint8
- AddrLen uint8
- L4Protocol uint8
- Dport uint16
- Num uint16
- }
- type SocketDatadddd struct {
- Pid uint32 // 表示线程号 如果'pid == tgid'表示一个进程, 否则是线程
- Tgid uint32 // 进程号
- CoroutineID uint64
- Source uint8
- Comm [TASK_COMM_LEN]byte
- SocketID uint64
- Tuple Tuple
- ExtraData uint32
- ExtraDataCount uint32
- TcpSeq uint32
- ThreadTraceID uint64
- Timestamp uint64
- Direction uint8
- MsgType uint8
- SyscallLen uint64
- DataSeq uint64
- DataType uint16
- DataLen uint16
- Data [BURST_DATA_BUF_SIZE]byte
- }
- func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
- for {
- rec, err := r.Read() // TODO: 阻塞读取来自EBPF程序的性能事件数据...
- if err != nil {
- if errors.Is(err, perf.ErrClosed) {
- break
- }
- continue
- }
- if rec.LostSamples > 0 {
- klog.Errorln(name, "lost samples:", rec.LostSamples)
- continue
- }
- var event Event
- fmt.Println("---------......runEventsReader read.......-----------rec.typ", typ)
- // fmt.Println(rec)
- switch typ {
- case perfMapTypeSocketEvents:
- //fmt.Println("perfMapTypeSocketEvents")
- //// 假设 rec.RawSample 包含数据,类型为 []byte
- //rawData := rec.RawSample
- //fmt.Println("perfMapTypeSocketEvents2")
- //
- //// 创建一个 SocketDataBuffer 结构体实例
- //var buffer SocketDataBuffer
- //
- //// 创建一个字节缓冲区,并将数据填充到其中
- //reader := bytes.NewReader(rawData)
- //fmt.Println("perfMapTypeSocketEvents3")
- //fmt.Println(len(rawData))
- //// 使用 binary.Read 函数读取数据并解析为 SocketDataBuffer 结构体实例
- //if err := binary.Read(reader, binary.LittleEndian, &buffer); err != nil {
- // fmt.Println(reader.Len())
- // fmt.Println("Failed to read data:", err)
- // continue
- //}
- //fmt.Println("perfMapTypeSocketEvents4")
- //
- //// 打印解析后的数据
- //fmt.Println("EventsNum:", buffer.EventsNum)
- //fmt.Println("Len:", buffer.Len)
- //
- //// 打印 char data 的内容
- //fmt.Printf("Data: %s\n", string(buffer.Data[:buffer.Len])) // 仅打印实际长度的数据
- //socketDataBuffer := rec.RawSample
- /*todo */
- //buf := (*SocketDataBuffer)(unsafe.Pointer(&rec.RawSample[0])) //nolint:gosec
- //socketData := (*SocketData)(unsafe.Pointer(&buf.data[0])) //nolint:gosec
- /*todo */
- //socketData := (*(*[128]byte)(unsafe.Pointer(&eventC.line)))
- //dataPtr := unsafe.Pointer(&buf.data[0])
- //socketData := (*SocketData)(dataPtr)
- //reader2 := bytes.NewBuffer(rec.RawSample)
- // 222222
- //fmt.Println("socketData.Pid:", socketData.pid)
- //fmt.Println("socketData.Tgid:", socketData.tgid)
- //fmt.Println("socketData.CoroutineID:", socketData.coroutine_id)
- //fmt.Println("socketData.Source:", socketData.source)
- //
- //fmt.Printf("socketData.Comm: %s \n", socketData.comm)
- //fmt.Printf("socketData.SocketID :%v \n", socketData.socket_id)
- //fmt.Println("socketData.Tuple:", socketData.Tuple)
- //fmt.Println("socketData.ExtraData:", socketData.ExtraData)
- //fmt.Println("socketData.ExtraDataCount:", socketData.ExtraDataCount)
- //fmt.Println("socketData.TCPSeq:", socketData.TcpSeq)
- //fmt.Println("socketData.ThreadTraceID:", socketData.ThreadTraceID)
- //fmt.Println("socketData.Timestamp:", socketData.Timestamp)
- //fmt.Println("socketData.Direction:", socketData.Direction)
- //fmt.Println("socketData.MsgType:", socketData.MsgType)
- //fmt.Println("socketData.SyscallLen:", socketData.SyscallLen)
- //fmt.Println("socketData.DataSeq:", socketData.DataSeq)
- // todo
- // fmt.Printf("socketData.DataType:%d \n", (socketData.data_type))
- // fmt.Printf("socketData.DataLen:%d \n", (socketData.data_len))
- //fmt.Println("socketData.Data:", len(socketData.Data))
- //socketData := &SocketData{}
- //reader := bytes.NewBuffer(rec.RawSample)
- //if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
- // klog.Warningln("failed1 to read msg:", err)
- // continue
- //}
- //
- //var data []byte
- //payload := reader.Bytes()
- //switch {
- //case v.Len == 0:
- //case v.Len > 32760:
- // data = payload[:32760]
- //default:
- // data = payload[:v.Len]
- //}
- //////data2 := data[:v.Len]
- ////fmt.Println("perfMapTypeSocketEvents")
- //fmt.Println(v.EventsNum)
- //fmt.Println(v.Len)
- //fmt.Println(string(data))
- //
- //var data2 SocketData
- //reader2 := bytes.NewBuffer(data)
- //if err := binary.Read(reader2, binary.LittleEndian, data2); err != nil {
- // klog.Warningln("failed2 to read msg:", err)
- // continue
- //}
- //
- //fmt.Println(data2.Pid)
- //fmt.Println(data2.Tgid)
- //fmt.Println(string(v.Data))
- //continue
- case perfMapTypeL7Events:
- fmt.Println("----------perfMapTypeL7Events--------") //----------perfMapTypeL7Events--------
- v := &l7Event{}
- reader := bytes.NewBuffer(rec.RawSample)
- if err := binary.Read(reader, binary.LittleEndian, v); err != nil { // TODO: binary.Read: 将byte -> l7Event......
- klog.Warningln("failed to read msg:", err)
- continue
- }
- payload := reader.Bytes()
- req := &l7.RequestData{
- Protocol: l7.Protocol(v.Protocol),
- Status: l7.Status(v.Status),
- Duration: time.Duration(v.Duration),
- Method: l7.Method(v.Method),
- StatementId: v.StatementId,
- TraceId: v.TraceId,
- TraceStart: v.TraceStart,
- TraceEnd: v.TraceEnd,
- }
- switch {
- case v.PayloadSize == 0:
- case v.PayloadSize > MaxPayloadSize:
- req.Payload = payload[:MaxPayloadSize]
- default:
- req.Payload = payload[:v.PayloadSize]
- }
- fmt.Println("==========")
- // 界面上访问: http://10.0.6.102:1010/
- //req.Payload: GET / HTTP/1.1
- //Host: 10.0.6.102:1010
- //Connection: keep-alive
- //Cache-Control: max-age=0
- //Upgrade-Insecure-Requests: 1
- //User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36
- //Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
- //Accept-Encoding: gzip, deflate
- //Accept-Language: zh-CN,zh;q=0.9
- fmt.Println("req.Payload:", string(req.Payload))
- fmt.Println("==========")
- event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
- case perfMapTypeFileEvents:
- fmt.Println("----------perfMapTypeFileEvents--------")
- v := &fileEvent{}
- if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
- klog.Warningln("failed to read msg:", err)
- continue
- }
- event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
- case perfMapTypeProcEvents:
- fmt.Println("----------perfMapTypeProcEvents--------")
- v := &procEvent{}
- if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
- klog.Warningln("failed to read msg:", err)
- continue
- }
- event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
- case perfMapTypeTCPEvents:
- v := &tcpEvent{}
- if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
- klog.Warningln("failed to read msg:", err)
- continue
- }
- fmt.Println("----------perfMapTypeTCPEvents--------")
- event = Event{
- Type: v.Type,
- Pid: v.Pid,
- SrcAddr: ipPort(v.SAddr, v.SPort),
- DstAddr: ipPort(v.DAddr, v.DPort),
- Fd: v.Fd,
- Timestamp: v.Timestamp,
- }
- default:
- continue
- }
- ch <- event
- }
- }
- func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
- i, _ := netaddr.FromStdIP(ip[:])
- return netaddr.IPPortFrom(i, port)
- }
|