package ebpftracer import ( "bytes" debugelf "debug/elf" "encoding/binary" "encoding/hex" "errors" "fmt" "os" "strconv" "strings" "time" "github.com/coroot/coroot-node-agent/utils" "github.com/coroot/coroot-node-agent/utils/try" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/perf" "github.com/coroot/coroot-node-agent/common" "github.com/coroot/coroot-node-agent/ebpftracer/l7" "github.com/coroot/coroot-node-agent/ebpftracer/tracer" "github.com/coroot/coroot-node-agent/proc" . "github.com/coroot/coroot-node-agent/utils/modelse" klog "github.com/sirupsen/logrus" "golang.org/x/sys/unix" "inet.af/netaddr" ) /* #define TASK_COMM_LEN 16 #define BURST_DATA_BUF_SIZE 8192 // For brust send buffer #include struct __tuple_t { __u8 daddr[16]; __u8 rcv_saddr[16]; __u8 addr_len; __u8 l4_protocol; __u16 dport; __u16 num; }; struct __socket_data { __u32 pid; __u32 tgid; __u64 coroutine_id; __u8 source; __u8 comm[TASK_COMM_LEN]; __u64 socket_id; struct __tuple_t tuple; __u32 extra_data; __u32 extra_data_count; __u32 tcp_seq; __u64 thread_trace_id; __u64 timestamp; __u8 direction: 1; __u8 msg_type: 7; __u64 syscall_len; __u64 data_seq; __u16 data_type; __u16 data_len; char data[BURST_DATA_BUF_SIZE]; } __attribute__((packed)); struct __socket_data_buffer { __u32 events_num; __u32 len; char data[32760]; }; */ import "C" type SocketData C.struct___socket_data type SocketDataBuffer C.struct___socket_data_buffer const MaxPayloadSize = 1024 type EventType uint32 type EventReason uint32 const ( EventTypeProcessStart EventType = 1 EventTypeProcessExit EventType = 2 EventTypeConnectionOpen EventType = 3 EventTypeConnectionClose EventType = 4 EventTypeConnectionError EventType = 5 EventTypeListenOpen EventType = 6 EventTypeListenClose EventType = 7 EventTypeFileOpen EventType = 8 EventTypeTCPRetransmit EventType = 9 EventTypeL7Request EventType = 10 EventTypeFunEnt EventType = 11 EventTypeFunRet EventType = 12 EventTypeAcceptOpen EventType = 13 EventTypeAcceptClose EventType = 14 EventReasonNone EventReason = 0 EventReasonOOMKill EventReason = 1 ) type TrafficStats struct { BytesSent uint64 BytesReceived uint64 } type Event struct { StackEvent *StackEvent Type EventType Reason EventReason Pid uint32 SrcAddr netaddr.IPPort DstAddr netaddr.IPPort Fd uint64 Timestamp uint64 Duration time.Duration L7Request *l7.RequestData TrafficStats *TrafficStats FirstReadTime uint64 FirstWriteTime uint64 NewReadTime uint64 } type perfMapType uint8 const ( perfMapTypeProcEvents perfMapType = 1 perfMapTypeTCPEvents perfMapType = 2 perfMapTypeFileEvents perfMapType = 3 perfMapTypeL7Events perfMapType = 4 perfMapTypeSocketEvents perfMapType = 5 perfMapTypeEventQueue perfMapType = 6 perfMapTypePythonThreadEvents perfMapType = 7 ) type Tracer struct { kernelVersion string disableL7Tracing bool disableE2ETracing bool disableStackTracing bool collection *ebpf.Collection collectionSpec *ebpf.CollectionSpec readers map[string]*perf.Reader links []link.Link uprobes map[string]*ebpf.Program Symbols []debugelf.Symbol Uprobes []tracer.Uprobe UprobesMap map[string]tracer.Uprobe } func NewTracer(kernelVersion string, disableL7Tracing, disableE2ETracing, disableStackTracing bool) *Tracer { if disableL7Tracing { klog.Infoln("L7 tracing is disabled") } else { klog.Infoln("L7 tracing is enabled") } if disableE2ETracing { klog.Infoln("e2e is disabled") } else { klog.Infoln("e2e is enabled") } if disableStackTracing { klog.Infoln("L7 stack is disabled") } else { klog.Infoln("L7 stack is enabled") } return &Tracer{ kernelVersion: kernelVersion, disableL7Tracing: disableL7Tracing, disableE2ETracing: disableE2ETracing, disableStackTracing: disableStackTracing, readers: map[string]*perf.Reader{}, uprobes: map[string]*ebpf.Program{}, links: []link.Link{}, } } func (t *Tracer) Run(events chan<- Event) error { if err := t.ebpf(events); err != nil { return err } if err := t.init(events); err != nil { return err } return nil } func (t *Tracer) Close() { for k, p := range t.uprobes { if p != nil { err := p.Close() klog.WithError(err).Infof("[close] uprobes %s", k) } } for _, l := range t.links { if l != nil { err := l.Close() klog.WithError(err).Infof("[close] links") } } for k, r := range t.readers { if r != nil { err := r.Close() klog.WithError(err).Infof("[close] readers %s", k) } } if t.collection != nil { t.collection.Close() } } func (t *Tracer) init(ch chan<- Event) error { pids, err := proc.ListPids() if err != nil { return fmt.Errorf("failed to list pids: %w", err) } for _, pid := range pids { ch <- Event{Type: EventTypeProcessStart, Pid: pid} } fds, sockets := readFds(pids) for _, fd := range fds { ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd} } listens := map[uint64]bool{} for _, s := range sockets { if s.Listen { listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true } } ebpfConnectionsMap := t.collection.Maps["active_connections"] timestamp := uint64(time.Now().UnixNano()) for _, s := range sockets { typ := EventTypeConnectionOpen if s.Listen { typ = EventTypeListenOpen //} else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] { // 存在误判 continue } ch <- Event{ Type: typ, Pid: s.pid, Timestamp: timestamp, Fd: s.fd, SrcAddr: s.SAddr, DstAddr: s.DAddr, } if typ == EventTypeConnectionOpen { id := ConnectionId{FD: s.fd, PID: s.pid} sip := s.SAddr.IP() sipbytes := sip.As16() dip := s.DAddr.IP() dipbytes := dip.As16() conn := Connection{Timestamp: timestamp, Saddr: sipbytes, Sport: s.SAddr.Port(), Daddr: dipbytes, Dport: s.DAddr.Port()} if err := ebpfConnectionsMap.Update(id, conn, ebpf.UpdateNoExist); err != nil { klog.Warningln(err) } } } return nil } func (t *Tracer) ActiveConnectionsIterator() *ebpf.MapIterator { return t.collection.Maps["active_connections"].Iterate() } func (t *Tracer) ActiveAcceptsIterator() *ebpf.MapIterator { return t.collection.Maps["active_accepts"].Iterate() } type perfMap struct { name string perCPUBufferSizePages int typ perfMapType } func (t *Tracer) ebpf(ch chan<- Event) error { kv := "v" + common.KernelMajorMinor(t.kernelVersion) path, prg, err := EbpfCode(kv) klog.Infof("kv is [%s], kernel version: [%s] path: [%s]", kv, t.kernelVersion, path) if len(prg) == 0 || err != nil { return fmt.Errorf("kv is %s, unsupported kernel version: [%s] path: [%s] err:<%v>", kv, t.kernelVersion, path, err) } _, debugFsErr := os.Stat("/sys/kernel/debug/tracing") _, traceFsErr := os.Stat("/sys/kernel/tracing") if debugFsErr != nil && traceFsErr != nil { return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted") } collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg)) if err != nil { return fmt.Errorf("failed to load collection spec: %w", err) } _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY}) tracer.PidFilter(collectionSpec) opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)} klog.Infof("[start] Look eBPF .maps") for _, spec := range collectionSpec.Maps { klog.Infoln(spec.Name) } klog.Infof("[end] Look eBPF .maps") tracer.MapInit(collectionSpec, opts) // TODO 多进程 // tracer.SetConstants(collectionSpec) c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts) if err != nil { var verr *ebpf.VerifierError if errors.As(err, &verr) { klog.Errorf("----%+v", verr) } return fmt.Errorf("failed to load collection: %w", err) } tracer.Offset() t.collectionSpec = collectionSpec t.collection = c perfMaps := []perfMap{ {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4}, {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4}, {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8}, {name: "tcp_accept_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8}, {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4}, {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4}, {name: "event_queue", typ: perfMapTypeEventQueue, perCPUBufferSizePages: 32}, {name: "python_thread_events", typ: perfMapTypePythonThreadEvents, perCPUBufferSizePages: 4}, } tracer.MapInsert(c) if !t.DisableL7Tracing() { perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32}) } perfMaps = append(perfMaps, perfMap{name: MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64}) klog.Infof("[start] Look eBPF perf_maps") for _, pm := range perfMaps { klog.Infoln(pm.name) m, ok := t.collection.Maps[pm.name] if ok { r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize()) if err != nil { t.Close() return fmt.Errorf("failed to create ebpf reader: %w", err) } t.readers[pm.name] = r // event监听 //go runEventsReader(pm.name, r, ch, pm.typ) try.GoParams(runEventsReader, utils.CatchFn, pm.name, r, ch, pm.typ) } } klog.Infof("[end] Look eBPF perf_maps") klog.Infof("[start] Look eBPF specPrograms") if err = t.LinkEbpfProg(); err != nil { return err } klog.Infof("[end] Look eBPF specPrograms") return nil } func (t *Tracer) LinkEbpfProg() error { klog.Infof("[start] Look eBPF specPrograms") var ( l link.Link err error lastErr error ) for _, programSpec := range t.collectionSpec.Programs { program := t.collection.Programs[programSpec.Name] klog.Infof("%s:[%s]", programSpec.SectionName, programSpec.Name) if t.DisableL7Tracing() { switch programSpec.Name { case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg": continue case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg": continue case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg": continue } } switch programSpec.Type { case ebpf.TracePoint: if strings.Contains(programSpec.SectionName, "prog") { continue } parts := strings.SplitN(programSpec.AttachTo, "/", 2) l, err = link.Tracepoint(parts[0], parts[1], program, nil) case ebpf.Kprobe: if strings.HasPrefix(programSpec.SectionName, "uprobe/") { t.uprobes[programSpec.Name] = program continue } if strings.HasPrefix(programSpec.SectionName, "kretprobe/") { l, err = link.Kretprobe(programSpec.AttachTo, program, nil) if err == nil { t.links = append(t.links, l) } continue } l, err = link.Kprobe(programSpec.AttachTo, program, nil) } if err != nil { lastErr = err t.Close() klog.Errorf("LinkEbpfProg failed to program[%s] link program: %s", programSpec.Name, err) //return fmt.Errorf("failed to link program: %w", err) } else { t.links = append(t.links, l) } } klog.Infof("[end] Look eBPF specPrograms") if lastErr != nil { return fmt.Errorf("failed to link program: %w", lastErr) } return nil } func (t *Tracer) UnlinkEbpfProg() error { var ( lastErr error err error ) /* 此处不应该处理 t.uprobes 中 ebpf-program for pName, p := range t.uprobes { if err = p.Close(); err != nil { lastErr = err klog.Errorf("UnlinkEbpfProg close program[%s] uprobe occurs error: %s", pName, err.Error()) } } */ for _, l := range t.links { if err = l.Close(); err != nil { lastErr = err klog.Errorf("UnlinkEbpfProg close link occurs error: %s", err.Error()) } } return lastErr } func (t EventType) Int() int { return int(t) } func (t EventType) String() string { switch t { case EventTypeProcessStart: return "process-start" case EventTypeProcessExit: return "process-exit" case EventTypeConnectionOpen: return "connection-open" case EventTypeConnectionClose: return "connection-close" case EventTypeConnectionError: return "connection-error" case EventTypeListenOpen: return "listen-open" case EventTypeListenClose: return "listen-close" case EventTypeFileOpen: return "file-open" case EventTypeTCPRetransmit: return "tcp-retransmit" case EventTypeL7Request: return "l7-request" } return "unknown: " + strconv.Itoa(int(t)) } func (t EventReason) String() string { switch t { case EventReasonNone: return "none" case EventReasonOOMKill: return "oom-kill" } return "unknown: " + strconv.Itoa(int(t)) } type procEvent struct { Type EventType Pid uint32 Reason uint32 } type tcpEvent struct { Fd uint64 Timestamp uint64 Duration uint64 FirstReadTime uint64 FirstWriteTime uint64 NewReadTime uint64 Type EventType Pid uint32 BytesSent uint64 BytesReceived uint64 SPort uint16 DPort uint16 SAddr [16]byte DAddr [16]byte } type fileEvent struct { Type EventType Pid uint32 Fd uint64 } // struct l7_event in l7.c type l7Event struct { Fd uint64 ConnectionTimestamp uint64 Pid uint32 Status uint32 Duration uint64 Protocol uint8 Method uint8 Padding uint16 StatementId uint32 PayloadSize uint64 TraceId uint64 StartAt uint64 // ns EndtAt uint64 // ns TraceStart uint32 TraceEnd uint32 TraceType uint32 EventCount uint32 Sport uint16 Dport uint16 SAddr HashByte16 DAddr HashByte16 ComponentSport uint16 ComponentDport uint16 IsTls uint16 ComponentSAddr HashByte16 ComponentDAddr HashByte16 AssumedAppId HashByte SpanId HashByte TraceIdFrom HashByte16 CalledId HashByte InstanceIdFrom HashByte AppIdFrom HashByte SpanIdFrom HashByte TypeFrom [1]byte SysvcFrom [76]byte // cwother header value: NN:ParentServiceName:MM:ParentServiceSysTag RPCTarget [64]byte ErrorMsg HashByte128 MQ struct { Topic [256]byte // MQ topic (e.g., Kafka topic) Key [256]byte // MQ key (e.g., Kafka message key) } } type SocketDataBufferddd struct { EventsNum uint32 Len uint32 Data [32760]byte } const ( TASK_COMM_LEN = 16 BURST_DATA_BUF_SIZE = 8192 ) type Tuple struct { Daddr [16]uint8 RcvSaddr [16]uint8 AddrLen uint8 L4Protocol uint8 Dport uint16 Num uint16 } type SocketDatadddd struct { Pid uint32 // 表示线程号 如果'pid == tgid'表示一个进程, 否则是线程 Tgid uint32 // 进程号 CoroutineID uint64 Source uint8 Comm [TASK_COMM_LEN]byte SocketID uint64 Tuple Tuple ExtraData uint32 ExtraDataCount uint32 TcpSeq uint32 ThreadTraceID uint64 Timestamp uint64 Direction uint8 MsgType uint8 SyscallLen uint64 DataSeq uint64 DataType uint16 DataLen uint16 Data [BURST_DATA_BUF_SIZE]byte } type StackEvent struct { Type uint64 Pid uint64 TraceId uint64 Goid uint64 Ip uint64 Bp uint64 CallerIp uint64 CallerBp uint64 TimeNsStart uint64 TimeNsEnd uint64 // Nid uint64 // Fpid uint64 // Level uint64 Location byte ClassName [100]byte MethedName [100]byte } type StackFunEvent struct { StackEvent StackEvent Uprobe *tracer.Uprobe } type pythonThreadEvent struct { Type EventType Pid uint32 Duration uint64 } func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) { for { rec, err := r.Read() if err != nil { if errors.Is(err, perf.ErrClosed) { break } continue } if rec.LostSamples > 0 { klog.Errorln(name, "lost samples:", rec.LostSamples) continue } var event Event switch typ { case perfMapTypeSocketEvents: //fmt.Println("perfMapTypeSocketEvents") // 假设 rec.RawSample 包含数据,类型为 []byte //rawData := rec.RawSample //fmt.Println("perfMapTypeSocketEvents2") // //// 创建一个 SocketDataBuffer 结构体实例 //var buffer SocketDataBuffer // //// 创建一个字节缓冲区,并将数据填充到其中 //reader := bytes.NewReader(rawData) //fmt.Println("perfMapTypeSocketEvents3") //fmt.Println(len(rawData)) //// 使用 binary.Read 函数读取数据并解析为 SocketDataBuffer 结构体实例 //if err := binary.Read(reader, binary.LittleEndian, &buffer); err != nil { // fmt.Println(reader.Len()) // fmt.Println("Failed to read data:", err) // continue //} //fmt.Println("perfMapTypeSocketEvents4") // //// 打印解析后的数据 //fmt.Println("EventsNum:", buffer.EventsNum) //fmt.Println("Len:", buffer.Len) // //// 打印 char data 的内容 //fmt.Printf("Data: %s\n", string(buffer.Data[:buffer.Len])) // 仅打印实际长度的数据 //socketDataBuffer := rec.RawSample // 解析 __socket_data_buffer //buf := (*SocketDataBuffer)(unsafe.Pointer(&rec.RawSample[0])) //nolint:gosec // //// 获取 char data[32760]; //socketData := (*SocketData)(unsafe.Pointer(&buf.data[0])) //nolint:gosec //// todo //fmt.Printf("socketData.DataType:%d \n", (socketData.data_type)) //fmt.Printf("socketData.DataLen:%d \n", (socketData.data_len)) // //// 解析C结构体中的data字段 //dataSlice := C.GoBytes(unsafe.Pointer(&socketData.data[0]), C.int(socketData.data_len)) //// 打印或处理包含的数据 //fmt.Printf("socketData.Payload:%v \n", string(dataSlice)) /*todo */ //socketData := (*(*[128]byte)(unsafe.Pointer(&eventC.line))) //dataPtr := unsafe.Pointer(&buf.data[0]) //socketData := (*SocketData)(dataPtr) //reader2 := bytes.NewBuffer(rec.RawSample) // 222222 //fmt.Println("socketData.Pid:", socketData.pid) //fmt.Println("socketData.Tgid:", socketData.tgid) //fmt.Println("socketData.CoroutineID:", socketData.coroutine_id) //fmt.Println("socketData.Source:", socketData.source) // //fmt.Printf("socketData.Comm: %s \n", socketData.comm) //fmt.Printf("socketData.SocketID :%v \n", socketData.socket_id) //fmt.Println("socketData.Tuple:", socketData.Tuple) //fmt.Println("socketData.ExtraData:", socketData.ExtraData) //fmt.Println("socketData.ExtraDataCount:", socketData.ExtraDataCount) //fmt.Println("socketData.TCPSeq:", socketData.TcpSeq) //fmt.Println("socketData.ThreadTraceID:", socketData.ThreadTraceID) //fmt.Println("socketData.Timestamp:", socketData.Timestamp) //fmt.Println("socketData.Direction:", socketData.Direction) //fmt.Println("socketData.MsgType:", socketData.MsgType) //fmt.Println("socketData.SyscallLen:", socketData.SyscallLen) //fmt.Println("socketData.DataSeq:", socketData.DataSeq) //socketData := &SocketData{} //reader := bytes.NewBuffer(rec.RawSample) //if err := binary.Read(reader, binary.LittleEndian, v); err != nil { // klog.Warningln("failed1 to read msg:", err) // continue //} // //var data []byte //payload := reader.Bytes() //switch { //case v.Len == 0: //case v.Len > 32760: // data = payload[:32760] //default: // data = payload[:v.Len] //} //////data2 := data[:v.Len] ////fmt.Println("perfMapTypeSocketEvents") //fmt.Println(v.EventsNum) //fmt.Println(v.Len) //fmt.Println(string(data)) // //var data2 SocketData //reader2 := bytes.NewBuffer(data) //if err := binary.Read(reader2, binary.LittleEndian, data2); err != nil { // klog.Warningln("failed2 to read msg:", err) // continue //} // //fmt.Println(data2.Pid) //fmt.Println(data2.Tgid) //fmt.Println(string(v.Data)) //continue case perfMapTypeL7Events: v := &l7Event{} reader := bytes.NewBuffer(rec.RawSample) if err := binary.Read(reader, binary.LittleEndian, v); err != nil { klog.Warningln("failed to read msg:", err) continue } //fmt.Println("v.TraceIdFrom") //fmt.Println(v.TraceIdFrom) //a := hex.EncodeToString(v.TraceIdFrom[:]) //for _, b := range v.AssumedAppId { // fmt.Printf("v.AssumedAppId- %02\n", b) //} //fmt.Println(a) payload := reader.Bytes() req := &l7.RequestData{ Protocol: l7.Protocol(v.Protocol), Pid: v.Pid, Status: l7.Status(v.Status), Duration: time.Duration(v.Duration), Method: l7.Method(v.Method), StatementId: v.StatementId, TraceId: v.TraceId, TraceStart: v.TraceStart, TraceEnd: v.TraceEnd, TraceType: v.TraceType, EventCount: v.EventCount, AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]), SpanId: hex.EncodeToString(v.SpanId[:]), StartAt: v.StartAt, EndAt: v.EndtAt, ComponentSAddr: ipPort(v.ComponentSAddr, v.ComponentSport), ComponentDAddr: ipPort(v.ComponentDAddr, v.ComponentDport), DestAddrString: utils.BytesToString(v.RPCTarget[:]), ErrorMsg: utils.BytesToString(v.ErrorMsg[:]), IsTls: v.IsTls > 0, MQTopic: utils.BytesToString(v.MQ.Topic[:]), MQKey: utils.BytesToString(v.MQ.Key[:]), } if req.Protocol == l7.ProtocolHTTP { klog.Debugf("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String()) klog.Debugf("runEventsReader ComponentDAddr.String %s", req.ComponentDAddr.String()) } if v.TraceEnd == TRACE_STATUS { req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:]) req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:]) req.ParentSpanContext.InstanceIdFrom = hex.EncodeToString(v.InstanceIdFrom[:]) req.ParentSpanContext.AppIdFrom = hex.EncodeToString(v.AppIdFrom[:]) req.ParentSpanContext.SpanIdFrom = hex.EncodeToString(v.SpanIdFrom[:]) req.ParentSpanContext.TypeFrom = l7.TypeFrom(hex.EncodeToString(v.TypeFrom[:])) req.SysvcFrom = utils.BytesToString(v.SysvcFrom[:]) klog.Debugf("cwother '%s'", req.SysvcFrom) // klog.Debugf("req.ParentSpanContext.TraceIdFrom %s", req.ParentSpanContext.TraceIdFrom) // klog.Debugf("req.ParentSpanContext.TypeFrom %s", req.ParentSpanContext.TypeFrom) req.SAddr = ipPort(v.SAddr, v.Sport) req.DAddr = ipPort(v.DAddr, v.Dport) // klog.Debugf("runEventsReader SAddr.String %s", req.SAddr.String()) // klog.Debugf("runEventsReader DAddr.String %s", req.DAddr.String()) } switch { case v.PayloadSize == 0: case v.PayloadSize > MaxPayloadSize: req.Payload = payload[:MaxPayloadSize] default: req.Payload = payload[:v.PayloadSize] } //fmt.Println("==========") //fmt.Println("req.Payload:", string(req.Payload)) //fmt.Println("==========") event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req} case perfMapTypeFileEvents: v := &fileEvent{} if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil { klog.Warningln("failed to read msg:", err) continue } event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd} case perfMapTypeProcEvents: v := &procEvent{} if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil { klog.Warningln("failed to read msg:", err) continue } event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid} case perfMapTypeTCPEvents: v := &tcpEvent{} if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil { klog.Warningln("failed to read msg:", err) continue } event = Event{ Type: v.Type, Pid: v.Pid, SrcAddr: ipPort(v.SAddr, v.SPort), DstAddr: ipPort(v.DAddr, v.DPort), Fd: v.Fd, Timestamp: v.Timestamp, Duration: time.Duration(v.Duration), } if v.Type == EventTypeConnectionClose { event.TrafficStats = &TrafficStats{ BytesSent: v.BytesSent, BytesReceived: v.BytesReceived, } } event.FirstReadTime = v.FirstReadTime event.FirstWriteTime = v.FirstWriteTime event.NewReadTime = v.NewReadTime if v.Type == EventTypeAcceptClose { event.TrafficStats = &TrafficStats{ BytesSent: v.BytesSent, BytesReceived: v.BytesReceived, } } case perfMapTypePythonThreadEvents: v := &pythonThreadEvent{} if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil { klog.Warningln("failed to read msg:", err) continue } event = Event{ Type: v.Type, Pid: v.Pid, Duration: time.Duration(v.Duration), } case perfMapTypeEventQueue: v := &StackEvent{} if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil { klog.Warningln("failed to read msg:", err) continue } event = Event{ Type: EventTypeFunEnt, StackEvent: v, } default: continue } ch <- event } } func ipPort(ip [16]byte, port uint16) netaddr.IPPort { i, _ := netaddr.FromStdIP(ip[:]) return netaddr.IPPortFrom(i, port) } func (t *Tracer) InitKProcInfo(pid uint32, appInfo *AppInfo) error { fmt.Println("InitKProcInfo111111111111111111111111111111111") var err error var info EbpfProcInfo if appInfo.EBPFProcInfo == nil { info = EbpfProcInfo{ InstanceId: appInfo.InstanceIdHash.HashtVal, AppId: appInfo.AppIdHash.HashtVal, CodeType: uint16(appInfo.CodeType), } } else { info = *appInfo.EBPFProcInfo info.AppId = appInfo.AppIdHash.HashtVal } // 构建 sysvc 值:{app_name_len}:app_name:{appServiceType_len}:appServiceType:{SysTagLen}[:SysTag] // appServiceType 固定为 "APPLICATION" appName := appInfo.AppName nodeInfo := NODE_INFO.GetNodeInfo() sysTag := "" if nodeInfo != nil { sysTag = nodeInfo.SysTag } appServiceType := "APPLICATION" appNameLen := len(appName) appServiceTypeLen := len(appServiceType) sysTagLen := len(sysTag) // 限制长度,避免超出数组大小 if appNameLen > 32 { appNameLen = 32 appName = appName[:32] } if sysTagLen > 32 { sysTagLen = 32 sysTag = sysTag[:32] } // 格式:{app_name_len}:app_name:{appServiceType_len}:appServiceType[:{SysTagLen}:SysTag] // 如果 sysTag 为空,则为 {app_name_len}:app_name:{appServiceType_len}:appServiceType // 如果 sysTag 不为空,则为 {app_name_len}:app_name:{appServiceType_len}:appServiceType:{SysTagLen}:SysTag // 长度不足2位时前面补0,例如:08:service:12:APPLICATION 或 08:service:12:APPLICATION:03:tag var sysvcStr string if sysTagLen == 0 { sysvcStr = fmt.Sprintf("%02d:%s:%02d:%s", appNameLen, appName, appServiceTypeLen, appServiceType) } else { sysvcStr = fmt.Sprintf("%02d:%s:%02d:%s:%02d:%s", appNameLen, appName, appServiceTypeLen, appServiceType, sysTagLen, sysTag) } // 复制到字节数组,确保不超过 76 字节 sysvcBytes := []byte(sysvcStr) if len(sysvcBytes) > 76 { sysvcBytes = sysvcBytes[:76] } copy(info.Sysvc[:], sysvcBytes) _, err = tracer.UpdateProcInfoToMap(t.collection, pid, info) if err != nil { klog.Error("failed to update program info", err) } appInfo.EBPFProcInfo = &info return err } func (t *Tracer) DelKProcInfo(pid uint32) error { _, err := tracer.DelProcInfoFromMap(t.collection, pid) if err != nil { klog.WithField("pid", pid).Error("failed to delete proc info", err) } return err } // TODO check language func (t *Tracer) DisableL7Tracing() bool { return t.disableL7Tracing } func (t *Tracer) DisableE2ETracing() bool { return t.disableE2ETracing } func (t *Tracer) DisableStackTracing() bool { return t.disableStackTracing }