package ebpftracer import ( "bytes" debugelf "debug/elf" "encoding/binary" "encoding/hex" "errors" "fmt" "os" "runtime" "strconv" "strings" "time" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/perf" "github.com/coroot/coroot-node-agent/common" "github.com/coroot/coroot-node-agent/ebpftracer/l7" "github.com/coroot/coroot-node-agent/ebpftracer/tracer" "github.com/coroot/coroot-node-agent/proc" "github.com/coroot/coroot-node-agent/utils" "golang.org/x/mod/semver" "golang.org/x/sys/unix" "inet.af/netaddr" "k8s.io/klog/v2" ) /* #define TASK_COMM_LEN 16 #define BURST_DATA_BUF_SIZE 8192 // For brust send buffer #include struct __tuple_t { __u8 daddr[16]; __u8 rcv_saddr[16]; __u8 addr_len; __u8 l4_protocol; __u16 dport; __u16 num; }; struct __socket_data { __u32 pid; __u32 tgid; __u64 coroutine_id; __u8 source; __u8 comm[TASK_COMM_LEN]; __u64 socket_id; struct __tuple_t tuple; __u32 extra_data; __u32 extra_data_count; __u32 tcp_seq; __u64 thread_trace_id; __u64 timestamp; __u8 direction: 1; __u8 msg_type: 7; __u64 syscall_len; __u64 data_seq; __u16 data_type; __u16 data_len; char data[BURST_DATA_BUF_SIZE]; } __attribute__((packed)); struct __socket_data_buffer { __u32 events_num; __u32 len; char data[32760]; }; */ import "C" type SocketData C.struct___socket_data type SocketDataBuffer C.struct___socket_data_buffer const MaxPayloadSize = 1024 type EventType uint32 type EventReason uint32 const ( EventTypeProcessStart EventType = 1 EventTypeProcessExit EventType = 2 EventTypeConnectionOpen EventType = 3 EventTypeConnectionClose EventType = 4 EventTypeConnectionError EventType = 5 EventTypeListenOpen EventType = 6 EventTypeListenClose EventType = 7 EventTypeFileOpen EventType = 8 EventTypeTCPRetransmit EventType = 9 EventTypeL7Request EventType = 10 EventTypeFunEnt EventType = 11 EventTypeFunRet EventType = 12 EventReasonNone EventReason = 0 EventReasonOOMKill EventReason = 1 ) type Event struct { Type EventType Reason EventReason Pid uint32 SrcAddr netaddr.IPPort DstAddr netaddr.IPPort Fd uint64 Timestamp uint64 L7Request *l7.RequestData StackEvent *StackEvent } type perfMapType uint8 const ( perfMapTypeProcEvents perfMapType = 1 perfMapTypeTCPEvents perfMapType = 2 perfMapTypeFileEvents perfMapType = 3 perfMapTypeL7Events perfMapType = 4 perfMapTypeSocketEvents perfMapType = 5 perfMapTypeEventQueue perfMapType = 6 ) type Tracer struct { kernelVersion string disableL7Tracing bool collection *ebpf.Collection readers map[string]*perf.Reader links []link.Link uprobes map[string]*ebpf.Program Symbols []debugelf.Symbol Uprobes []tracer.Uprobe UprobesMap map[string]tracer.Uprobe } func NewTracer(kernelVersion string, disableL7Tracing bool) *Tracer { if disableL7Tracing { klog.Infoln("L7 tracing is disabled") } return &Tracer{ kernelVersion: kernelVersion, disableL7Tracing: disableL7Tracing, readers: map[string]*perf.Reader{}, uprobes: map[string]*ebpf.Program{}, } } func (t *Tracer) Run(events chan<- Event) error { if err := t.ebpf(events); err != nil { return err } if err := t.stack(); err != nil { return err } if err := t.init(events); err != nil { return err } return nil } func (t *Tracer) Close() { for _, p := range t.uprobes { _ = p.Close() } for _, l := range t.links { _ = l.Close() } for _, r := range t.readers { _ = r.Close() } t.collection.Close() } func (t *Tracer) init(ch chan<- Event) error { pids, err := proc.ListPids() if err != nil { return fmt.Errorf("failed to list pids: %w", err) } for _, pid := range pids { ch <- Event{Type: EventTypeProcessStart, Pid: pid} } fds, sockets := readFds(pids) for _, fd := range fds { ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd} } listens := map[uint64]bool{} for _, s := range sockets { if s.Listen { listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true } } for _, s := range sockets { typ := EventTypeConnectionOpen if s.Listen { typ = EventTypeListenOpen } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound continue } ch <- Event{ Type: typ, Pid: s.pid, Fd: s.fd, SrcAddr: s.SAddr, DstAddr: s.DAddr, } } return nil } type perfMap struct { name string perCPUBufferSizePages int typ perfMapType } func (t *Tracer) ebpf(ch chan<- Event) error { if _, ok := ebpfProg[runtime.GOARCH]; !ok { return fmt.Errorf("unsupported architecture: %s", runtime.GOARCH) } kv := "v" + common.KernelMajorMinor(t.kernelVersion) var prg []byte for _, p := range ebpfProg[runtime.GOARCH] { if semver.Compare(kv, p.v) >= 0 { prg = p.p break } } if len(prg) == 0 { return fmt.Errorf("unsupported kernel version: %s", t.kernelVersion) } _, debugFsErr := os.Stat("/sys/kernel/debug/tracing") _, traceFsErr := os.Stat("/sys/kernel/tracing") if debugFsErr != nil && traceFsErr != nil { return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted") } collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg)) if err != nil { return fmt.Errorf("failed to load collection spec: %w", err) } _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY}) tracer.PidFilter(collectionSpec) opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)} for _, spec := range collectionSpec.Maps { fmt.Println("maps:", spec.Name) } tracer.MapInit(collectionSpec, opts) // TODO 多进程 tracer.SetConstants(collectionSpec) c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts) if err != nil { var verr *ebpf.VerifierError if errors.As(err, &verr) { klog.Errorf("----%+v", verr) } return fmt.Errorf("failed to load collection: %w", err) } tracer.Offset() t.collection = c perfMaps := []perfMap{ {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4}, {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4}, {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8}, {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4}, {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4}, {name: "event_queue", typ: perfMapTypeEventQueue, perCPUBufferSizePages: 32}, } fmt.Println(len(collectionSpec.Programs)) fmt.Println(len(c.Programs)) tracer.MapInsert(c) if !t.disableL7Tracing { perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32}) } perfMaps = append(perfMaps, perfMap{name: tracer.MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64}) fmt.Println("perfMaps start --") for _, pm := range perfMaps { fmt.Println("pm.namepm.name: ", pm.name) m, ok := t.collection.Maps[pm.name] if ok { r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize()) if err != nil { t.Close() return fmt.Errorf("failed to create ebpf reader: %w", err) } t.readers[pm.name] = r // event监听 go runEventsReader(pm.name, r, ch, pm.typ) } } fmt.Println("perfMaps end --") for _, programSpec := range collectionSpec.Programs { program := t.collection.Programs[programSpec.Name] fmt.Println("programSpecprogramSpec:--:", programSpec.Name, programSpec.SectionName, programSpec.Type) if t.disableL7Tracing { switch programSpec.Name { case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg": continue case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg": continue case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg": continue } } var l link.Link switch programSpec.Type { case ebpf.TracePoint: if strings.Contains(programSpec.SectionName, "prog") { continue } parts := strings.SplitN(programSpec.AttachTo, "/", 2) l, err = link.Tracepoint(parts[0], parts[1], program, nil) case ebpf.Kprobe: if strings.HasPrefix(programSpec.SectionName, "uprobe/") { fmt.Println("==============uprobe s") fmt.Println(programSpec.Name, programSpec.SectionName, programSpec.Type) fmt.Println("==============uprobe e") t.uprobes[programSpec.Name] = program continue } l, err = link.Kprobe(programSpec.AttachTo, program, nil) } if err != nil { t.Close() return fmt.Errorf("failed to link program: %w", err) } t.links = append(t.links, l) } return nil } func (t EventType) String() string { switch t { case EventTypeProcessStart: return "process-start" case EventTypeProcessExit: return "process-exit" case EventTypeConnectionOpen: return "connection-open" case EventTypeConnectionClose: return "connection-close" case EventTypeConnectionError: return "connection-error" case EventTypeListenOpen: return "listen-open" case EventTypeListenClose: return "listen-close" case EventTypeFileOpen: return "file-open" case EventTypeTCPRetransmit: return "tcp-retransmit" case EventTypeL7Request: return "l7-request" } return "unknown: " + strconv.Itoa(int(t)) } func (t EventReason) String() string { switch t { case EventReasonNone: return "none" case EventReasonOOMKill: return "oom-kill" } return "unknown: " + strconv.Itoa(int(t)) } type procEvent struct { Type EventType Pid uint32 Reason uint32 } type tcpEvent struct { Fd uint64 Timestamp uint64 Type EventType Pid uint32 SPort uint16 DPort uint16 SAddr [16]byte DAddr [16]byte } type fileEvent struct { Type EventType Pid uint32 Fd uint64 } // struct l7_event in l7.c type l7Event struct { Fd uint64 ConnectionTimestamp uint64 Pid uint32 Status uint32 Duration uint64 Protocol uint8 Method uint8 Padding uint16 StatementId uint32 PayloadSize uint64 TraceId uint64 TraceStart uint32 TraceEnd uint32 EventCount uint32 AssumedAppId utils.HashByte SpanId utils.HashByte TraceIdFrom utils.HashByte16 CalledId utils.HashByte InstanceIdFrom utils.HashByte AppIdFrom utils.HashByte SpanIdFrom utils.HashByte } type SocketDataBufferddd struct { EventsNum uint32 Len uint32 Data [32760]byte } const ( TASK_COMM_LEN = 16 BURST_DATA_BUF_SIZE = 8192 ) type Tuple struct { Daddr [16]uint8 RcvSaddr [16]uint8 AddrLen uint8 L4Protocol uint8 Dport uint16 Num uint16 } type SocketDatadddd struct { Pid uint32 // 表示线程号 如果'pid == tgid'表示一个进程, 否则是线程 Tgid uint32 // 进程号 CoroutineID uint64 Source uint8 Comm [TASK_COMM_LEN]byte SocketID uint64 Tuple Tuple ExtraData uint32 ExtraDataCount uint32 TcpSeq uint32 ThreadTraceID uint64 Timestamp uint64 Direction uint8 MsgType uint8 SyscallLen uint64 DataSeq uint64 DataType uint16 DataLen uint16 Data [BURST_DATA_BUF_SIZE]byte } type StackEvent struct { Pid uint64 TraceId uint64 Goid uint64 Ip uint64 Bp uint64 CallerIp uint64 CallerBp uint64 TimeNsStart uint64 TimeNsEnd uint64 Nid uint64 Fpid uint64 Level uint64 Location byte } type StackFunEvent struct { StackEvent StackEvent Uprobe *tracer.Uprobe Level int Pid int Nid int } func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) { for { rec, err := r.Read() if err != nil { if errors.Is(err, perf.ErrClosed) { break } continue } if rec.LostSamples > 0 { klog.Errorln(name, "lost samples:", rec.LostSamples) continue } var event Event switch typ { case perfMapTypeSocketEvents: //fmt.Println("perfMapTypeSocketEvents") // 假设 rec.RawSample 包含数据,类型为 []byte //rawData := rec.RawSample //fmt.Println("perfMapTypeSocketEvents2") // //// 创建一个 SocketDataBuffer 结构体实例 //var buffer SocketDataBuffer // //// 创建一个字节缓冲区,并将数据填充到其中 //reader := bytes.NewReader(rawData) //fmt.Println("perfMapTypeSocketEvents3") //fmt.Println(len(rawData)) //// 使用 binary.Read 函数读取数据并解析为 SocketDataBuffer 结构体实例 //if err := binary.Read(reader, binary.LittleEndian, &buffer); err != nil { // fmt.Println(reader.Len()) // fmt.Println("Failed to read data:", err) // continue //} //fmt.Println("perfMapTypeSocketEvents4") // //// 打印解析后的数据 //fmt.Println("EventsNum:", buffer.EventsNum) //fmt.Println("Len:", buffer.Len) // //// 打印 char data 的内容 //fmt.Printf("Data: %s\n", string(buffer.Data[:buffer.Len])) // 仅打印实际长度的数据 //socketDataBuffer := rec.RawSample // 解析 __socket_data_buffer //buf := (*SocketDataBuffer)(unsafe.Pointer(&rec.RawSample[0])) //nolint:gosec // //// 获取 char data[32760]; //socketData := (*SocketData)(unsafe.Pointer(&buf.data[0])) //nolint:gosec //// todo //fmt.Printf("socketData.DataType:%d \n", (socketData.data_type)) //fmt.Printf("socketData.DataLen:%d \n", (socketData.data_len)) // //// 解析C结构体中的data字段 //dataSlice := C.GoBytes(unsafe.Pointer(&socketData.data[0]), C.int(socketData.data_len)) //// 打印或处理包含的数据 //fmt.Printf("socketData.Payload:%v \n", string(dataSlice)) /*todo */ //socketData := (*(*[128]byte)(unsafe.Pointer(&eventC.line))) //dataPtr := unsafe.Pointer(&buf.data[0]) //socketData := (*SocketData)(dataPtr) //reader2 := bytes.NewBuffer(rec.RawSample) // 222222 //fmt.Println("socketData.Pid:", socketData.pid) //fmt.Println("socketData.Tgid:", socketData.tgid) //fmt.Println("socketData.CoroutineID:", socketData.coroutine_id) //fmt.Println("socketData.Source:", socketData.source) // //fmt.Printf("socketData.Comm: %s \n", socketData.comm) //fmt.Printf("socketData.SocketID :%v \n", socketData.socket_id) //fmt.Println("socketData.Tuple:", socketData.Tuple) //fmt.Println("socketData.ExtraData:", socketData.ExtraData) //fmt.Println("socketData.ExtraDataCount:", socketData.ExtraDataCount) //fmt.Println("socketData.TCPSeq:", socketData.TcpSeq) //fmt.Println("socketData.ThreadTraceID:", socketData.ThreadTraceID) //fmt.Println("socketData.Timestamp:", socketData.Timestamp) //fmt.Println("socketData.Direction:", socketData.Direction) //fmt.Println("socketData.MsgType:", socketData.MsgType) //fmt.Println("socketData.SyscallLen:", socketData.SyscallLen) //fmt.Println("socketData.DataSeq:", socketData.DataSeq) //socketData := &SocketData{} //reader := bytes.NewBuffer(rec.RawSample) //if err := binary.Read(reader, binary.LittleEndian, v); err != nil { // klog.Warningln("failed1 to read msg:", err) // continue //} // //var data []byte //payload := reader.Bytes() //switch { //case v.Len == 0: //case v.Len > 32760: // data = payload[:32760] //default: // data = payload[:v.Len] //} //////data2 := data[:v.Len] ////fmt.Println("perfMapTypeSocketEvents") //fmt.Println(v.EventsNum) //fmt.Println(v.Len) //fmt.Println(string(data)) // //var data2 SocketData //reader2 := bytes.NewBuffer(data) //if err := binary.Read(reader2, binary.LittleEndian, data2); err != nil { // klog.Warningln("failed2 to read msg:", err) // continue //} // //fmt.Println(data2.Pid) //fmt.Println(data2.Tgid) //fmt.Println(string(v.Data)) //continue case perfMapTypeL7Events: v := &l7Event{} reader := bytes.NewBuffer(rec.RawSample) if err := binary.Read(reader, binary.LittleEndian, v); err != nil { klog.Warningln("failed to read msg:", err) continue } //fmt.Println("v.TraceIdFrom") //fmt.Println(v.TraceIdFrom) //a := hex.EncodeToString(v.TraceIdFrom[:]) //for _, b := range v.AssumedAppId { // fmt.Printf("v.AssumedAppId- %02\n", b) //} //fmt.Println(a) payload := reader.Bytes() req := &l7.RequestData{ Protocol: l7.Protocol(v.Protocol), Status: l7.Status(v.Status), Duration: time.Duration(v.Duration), Method: l7.Method(v.Method), StatementId: v.StatementId, TraceId: v.TraceId, TraceStart: v.TraceStart, TraceEnd: v.TraceEnd, EventCount: v.EventCount, AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]), SpanId: hex.EncodeToString(v.SpanId[:]), } if v.TraceEnd == 1 { req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:]) req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:]) req.ParentSpanContext.InstanceIdFrom = hex.EncodeToString(v.InstanceIdFrom[:]) req.ParentSpanContext.AppIdFrom = hex.EncodeToString(v.AppIdFrom[:]) req.ParentSpanContext.SpanIdFrom = hex.EncodeToString(v.SpanIdFrom[:]) } switch { case v.PayloadSize == 0: case v.PayloadSize > MaxPayloadSize: req.Payload = payload[:MaxPayloadSize] default: req.Payload = payload[:v.PayloadSize] } //fmt.Println("==========") //fmt.Println("req.Payload:", string(req.Payload)) //fmt.Println("==========") event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req} case perfMapTypeFileEvents: v := &fileEvent{} if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil { klog.Warningln("failed to read msg:", err) continue } event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd} case perfMapTypeProcEvents: v := &procEvent{} if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil { klog.Warningln("failed to read msg:", err) continue } event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid} case perfMapTypeTCPEvents: v := &tcpEvent{} if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil { klog.Warningln("failed to read msg:", err) continue } event = Event{ Type: v.Type, Pid: v.Pid, SrcAddr: ipPort(v.SAddr, v.SPort), DstAddr: ipPort(v.DAddr, v.DPort), Fd: v.Fd, Timestamp: v.Timestamp, } case perfMapTypeEventQueue: v := &StackEvent{} if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil { klog.Warningln("failed to read msg:", err) continue } event = Event{ Type: EventTypeFunEnt, StackEvent: v, } default: continue } ch <- event } } func ipPort(ip [16]byte, port uint16) netaddr.IPPort { i, _ := netaddr.FromStdIP(ip[:]) return netaddr.IPPortFrom(i, port) } func (t *Tracer) InitKProcInfo(pid uint32, insID utils.ID) error { info := tracer.EbpfProcInfo{} info.InstanceId = insID.HashtVal // 获取内存地址 allocDetails, err := tracer.Allocate(int(pid)) if err == nil && allocDetails != nil { info.StartAddr = allocDetails.StartAddr info.EndAddr = allocDetails.EndAddr } //klog.Infoln("Major:", major) //klog.Infoln("Minor:", minor) //klog.Infoln("Revision:", revision) //klog.Infoln("goVersion", goVersion) //klog.Infoln("info.StartAddr", info.StartAddr) //klog.Infoln("info.EndAddr", info.EndAddr) _, err = tracer.UpdateProcInfoToMap(t.collection, pid, info) if err != nil { klog.Error("failed to update program info", err) } return err }