package ebpftracer import ( "bytes" debugelf "debug/elf" "encoding/binary" "encoding/hex" "errors" "fmt" "os" "strconv" "strings" "time" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/perf" "github.com/coroot/coroot-node-agent/common" "github.com/coroot/coroot-node-agent/ebpftracer/l7" "github.com/coroot/coroot-node-agent/ebpftracer/tracer" "github.com/coroot/coroot-node-agent/proc" . "github.com/coroot/coroot-node-agent/utils/modelse" klog "github.com/sirupsen/logrus" "golang.org/x/sys/unix" "inet.af/netaddr" ) /* #define TASK_COMM_LEN 16 #define BURST_DATA_BUF_SIZE 8192 // For brust send buffer #include struct __tuple_t { __u8 daddr[16]; __u8 rcv_saddr[16]; __u8 addr_len; __u8 l4_protocol; __u16 dport; __u16 num; }; struct __socket_data { __u32 pid; __u32 tgid; __u64 coroutine_id; __u8 source; __u8 comm[TASK_COMM_LEN]; __u64 socket_id; struct __tuple_t tuple; __u32 extra_data; __u32 extra_data_count; __u32 tcp_seq; __u64 thread_trace_id; __u64 timestamp; __u8 direction: 1; __u8 msg_type: 7; __u64 syscall_len; __u64 data_seq; __u16 data_type; __u16 data_len; char data[BURST_DATA_BUF_SIZE]; } __attribute__((packed)); struct __socket_data_buffer { __u32 events_num; __u32 len; char data[32760]; }; */ import "C" type SocketData C.struct___socket_data type SocketDataBuffer C.struct___socket_data_buffer const MaxPayloadSize = 1024 type EventType uint32 type EventReason uint32 const ( EventTypeProcessStart EventType = 1 EventTypeProcessExit EventType = 2 EventTypeConnectionOpen EventType = 3 EventTypeConnectionClose EventType = 4 EventTypeConnectionError EventType = 5 EventTypeListenOpen EventType = 6 EventTypeListenClose EventType = 7 EventTypeFileOpen EventType = 8 EventTypeTCPRetransmit EventType = 9 EventTypeL7Request EventType = 10 EventTypeFunEnt EventType = 11 EventTypeFunRet EventType = 12 EventTypeAcceptOpen EventType = 13 EventTypeAcceptClose EventType = 14 EventReasonNone EventReason = 0 EventReasonOOMKill EventReason = 1 ) type TrafficStats struct { BytesSent uint64 BytesReceived uint64 } type Event struct { StackEvent *StackEvent Type EventType Reason EventReason Pid uint32 SrcAddr netaddr.IPPort DstAddr netaddr.IPPort Fd uint64 Timestamp uint64 Duration time.Duration L7Request *l7.RequestData TrafficStats *TrafficStats FirstReadTime uint64 FirstWriteTime uint64 NewReadTime uint64 } type perfMapType uint8 const ( perfMapTypeProcEvents perfMapType = 1 perfMapTypeTCPEvents perfMapType = 2 perfMapTypeFileEvents perfMapType = 3 perfMapTypeL7Events perfMapType = 4 perfMapTypeSocketEvents perfMapType = 5 perfMapTypeEventQueue perfMapType = 6 perfMapTypePythonThreadEvents perfMapType = 7 ) type Tracer struct { kernelVersion string disableL7Tracing bool disableE2ETracing bool disableStackTracing bool collection *ebpf.Collection collectionSpec *ebpf.CollectionSpec readers map[string]*perf.Reader links []link.Link uprobes map[string]*ebpf.Program Symbols []debugelf.Symbol Uprobes []tracer.Uprobe UprobesMap map[string]tracer.Uprobe } func NewTracer(kernelVersion string, disableL7Tracing, disableE2ETracing, disableStackTracing bool) *Tracer { if disableL7Tracing { klog.Infoln("L7 tracing is disabled") } if disableE2ETracing { klog.Infoln("L7 tracing is disabled") } if disableStackTracing { klog.Infoln("L7 stack is disabled") } return &Tracer{ kernelVersion: kernelVersion, disableL7Tracing: disableL7Tracing, disableE2ETracing: disableE2ETracing, disableStackTracing: disableStackTracing, readers: map[string]*perf.Reader{}, uprobes: map[string]*ebpf.Program{}, } } func (t *Tracer) Run(events chan<- Event) error { if err := t.ebpf(events); err != nil { return err } if err := t.init(events); err != nil { return err } return nil } func (t *Tracer) Close() { for k, p := range t.uprobes { err := p.Close() klog.WithError(err).Infof("[close] uprobes %s", k) } for _, l := range t.links { err := l.Close() klog.WithError(err).Infof("[close] links") } for k, r := range t.readers { err := r.Close() klog.WithError(err).Infof("[close] readers %s", k) } t.collection.Close() } func (t *Tracer) init(ch chan<- Event) error { pids, err := proc.ListPids() if err != nil { return fmt.Errorf("failed to list pids: %w", err) } for _, pid := range pids { ch <- Event{Type: EventTypeProcessStart, Pid: pid} } fds, sockets := readFds(pids) for _, fd := range fds { ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd} } listens := map[uint64]bool{} for _, s := range sockets { if s.Listen { listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true } } ebpfConnectionsMap := t.collection.Maps["active_connections"] timestamp := uint64(time.Now().UnixNano()) for _, s := range sockets { typ := EventTypeConnectionOpen if s.Listen { typ = EventTypeListenOpen } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound continue } ch <- Event{ Type: typ, Pid: s.pid, Timestamp: timestamp, Fd: s.fd, SrcAddr: s.SAddr, DstAddr: s.DAddr, } if typ == EventTypeConnectionOpen { id := ConnectionId{FD: s.fd, PID: s.pid} sip := s.SAddr.IP() sipbytes := sip.As16() dip := s.DAddr.IP() dipbytes := dip.As16() conn := Connection{Timestamp: timestamp, Saddr: sipbytes, Sport: s.SAddr.Port(), Daddr: dipbytes, Dport: s.DAddr.Port()} if err := ebpfConnectionsMap.Update(id, conn, ebpf.UpdateNoExist); err != nil { klog.Warningln(err) } } } return nil } func (t *Tracer) ActiveConnectionsIterator() *ebpf.MapIterator { return t.collection.Maps["active_connections"].Iterate() } func (t *Tracer) ActiveAcceptsIterator() *ebpf.MapIterator { return t.collection.Maps["active_accepts"].Iterate() } type perfMap struct { name string perCPUBufferSizePages int typ perfMapType } func (t *Tracer) ebpf(ch chan<- Event) error { kv := "v" + common.KernelMajorMinor(t.kernelVersion) path, prg, err := EbpfCode(kv) klog.Infof("kv is [%s], kernel version: [%s] path: [%s]", kv, t.kernelVersion, path) if len(prg) == 0 || err != nil { return fmt.Errorf("kv is %s, unsupported kernel version: [%s] path: [%s] err:<%v>", kv, t.kernelVersion, path, err) } _, debugFsErr := os.Stat("/sys/kernel/debug/tracing") _, traceFsErr := os.Stat("/sys/kernel/tracing") if debugFsErr != nil && traceFsErr != nil { return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted") } collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg)) if err != nil { return fmt.Errorf("failed to load collection spec: %w", err) } _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY}) tracer.PidFilter(collectionSpec) opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)} klog.Infof("[start] Look eBPF .maps") for _, spec := range collectionSpec.Maps { klog.Infoln(spec.Name) } klog.Infof("[end] Look eBPF .maps") tracer.MapInit(collectionSpec, opts) // TODO 多进程 // tracer.SetConstants(collectionSpec) c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts) if err != nil { var verr *ebpf.VerifierError if errors.As(err, &verr) { klog.Errorf("----%+v", verr) } return fmt.Errorf("failed to load collection: %w", err) } tracer.Offset() t.collectionSpec = collectionSpec t.collection = c perfMaps := []perfMap{ {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4}, {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4}, {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8}, {name: "tcp_accept_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8}, {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4}, {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4}, {name: "event_queue", typ: perfMapTypeEventQueue, perCPUBufferSizePages: 32}, {name: "python_thread_events", typ: perfMapTypePythonThreadEvents, perCPUBufferSizePages: 4}, } tracer.MapInsert(c) if !t.DisableL7Tracing() { perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32}) } perfMaps = append(perfMaps, perfMap{name: MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64}) klog.Infof("[start] Look eBPF perf_maps") for _, pm := range perfMaps { klog.Infoln(pm.name) m, ok := t.collection.Maps[pm.name] if ok { r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize()) if err != nil { t.Close() return fmt.Errorf("failed to create ebpf reader: %w", err) } t.readers[pm.name] = r // event监听 go runEventsReader(pm.name, r, ch, pm.typ) } } klog.Infof("[end] Look eBPF perf_maps") klog.Infof("[start] Look eBPF specPrograms") if err = t.LinkEbpfProg(); err != nil { return err } klog.Infof("[end] Look eBPF specPrograms") return nil } func (t *Tracer) LinkEbpfProg() error { klog.Infof("[start] Look eBPF specPrograms") var ( l link.Link err error lastErr error ) for _, programSpec := range t.collectionSpec.Programs { program := t.collection.Programs[programSpec.Name] klog.Infof("%s:[%s]", programSpec.SectionName, programSpec.Name) if t.DisableL7Tracing() { switch programSpec.Name { case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg": continue case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg": continue case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg": continue } } switch programSpec.Type { case ebpf.TracePoint: if strings.Contains(programSpec.SectionName, "prog") { continue } parts := strings.SplitN(programSpec.AttachTo, "/", 2) l, err = link.Tracepoint(parts[0], parts[1], program, nil) case ebpf.Kprobe: if strings.HasPrefix(programSpec.SectionName, "uprobe/") { t.uprobes[programSpec.Name] = program continue } if strings.HasPrefix(programSpec.SectionName, "kretprobe/") { l, err = link.Kretprobe(programSpec.AttachTo, program, nil) t.links = append(t.links, l) continue } l, err = link.Kprobe(programSpec.AttachTo, program, nil) } if err != nil { lastErr = err t.Close() klog.Errorf("LinkEbpfProg failed to program[%s] link program: %s", programSpec.Name, err) //return fmt.Errorf("failed to link program: %w", err) } else { t.links = append(t.links, l) } } klog.Infof("[end] Look eBPF specPrograms") if lastErr != nil { return fmt.Errorf("failed to link program: %w", lastErr) } return nil } func (t *Tracer) UnlinkEbpfProg() error { var ( lastErr error err error ) /* 此处不应该处理 t.uprobes 中 ebpf-program for pName, p := range t.uprobes { if err = p.Close(); err != nil { lastErr = err klog.Errorf("UnlinkEbpfProg close program[%s] uprobe occurs error: %s", pName, err.Error()) } } */ for _, l := range t.links { if err = l.Close(); err != nil { lastErr = err klog.Errorf("UnlinkEbpfProg close link occurs error: %s", err.Error()) } } return lastErr } func (t EventType) Int() int { return int(t) } func (t EventType) String() string { switch t { case EventTypeProcessStart: return "process-start" case EventTypeProcessExit: return "process-exit" case EventTypeConnectionOpen: return "connection-open" case EventTypeConnectionClose: return "connection-close" case EventTypeConnectionError: return "connection-error" case EventTypeListenOpen: return "listen-open" case EventTypeListenClose: return "listen-close" case EventTypeFileOpen: return "file-open" case EventTypeTCPRetransmit: return "tcp-retransmit" case EventTypeL7Request: return "l7-request" } return "unknown: " + strconv.Itoa(int(t)) } func (t EventReason) String() string { switch t { case EventReasonNone: return "none" case EventReasonOOMKill: return "oom-kill" } return "unknown: " + strconv.Itoa(int(t)) } type procEvent struct { Type EventType Pid uint32 Reason uint32 } type tcpEvent struct { Fd uint64 Timestamp uint64 Duration uint64 FirstReadTime uint64 FirstWriteTime uint64 NewReadTime uint64 Type EventType Pid uint32 BytesSent uint64 BytesReceived uint64 SPort uint16 DPort uint16 SAddr [16]byte DAddr [16]byte } type fileEvent struct { Type EventType Pid uint32 Fd uint64 } // struct l7_event in l7.c type l7Event struct { Fd uint64 ConnectionTimestamp uint64 Pid uint32 Status uint32 Duration uint64 Protocol uint8 Method uint8 Padding uint16 StatementId uint32 PayloadSize uint64 TraceId uint64 StartAt uint64 // ns EndtAt uint64 // ns TraceStart uint32 TraceEnd uint32 EventCount uint32 Sport uint16 Dport uint16 SAddr [16]byte DAddr [16]byte ComponentSport uint16 ComponentDport uint16 ComponentSAddr [16]byte ComponentDAddr [16]byte AssumedAppId HashByte SpanId HashByte TraceIdFrom HashByte16 CalledId HashByte InstanceIdFrom HashByte AppIdFrom HashByte SpanIdFrom HashByte } type SocketDataBufferddd struct { EventsNum uint32 Len uint32 Data [32760]byte } const ( TASK_COMM_LEN = 16 BURST_DATA_BUF_SIZE = 8192 ) type Tuple struct { Daddr [16]uint8 RcvSaddr [16]uint8 AddrLen uint8 L4Protocol uint8 Dport uint16 Num uint16 } type SocketDatadddd struct { Pid uint32 // 表示线程号 如果'pid == tgid'表示一个进程, 否则是线程 Tgid uint32 // 进程号 CoroutineID uint64 Source uint8 Comm [TASK_COMM_LEN]byte SocketID uint64 Tuple Tuple ExtraData uint32 ExtraDataCount uint32 TcpSeq uint32 ThreadTraceID uint64 Timestamp uint64 Direction uint8 MsgType uint8 SyscallLen uint64 DataSeq uint64 DataType uint16 DataLen uint16 Data [BURST_DATA_BUF_SIZE]byte } type StackEvent struct { Type uint64 Pid uint64 TraceId uint64 Goid uint64 Ip uint64 Bp uint64 CallerIp uint64 CallerBp uint64 TimeNsStart uint64 TimeNsEnd uint64 // Nid uint64 // Fpid uint64 // Level uint64 Location byte ClassName [100]byte MethedName [100]byte } type StackFunEvent struct { StackEvent StackEvent Uprobe *tracer.Uprobe } type pythonThreadEvent struct { Type EventType Pid uint32 Duration uint64 } func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) { for { rec, err := r.Read() if err != nil { if errors.Is(err, perf.ErrClosed) { break } continue } if rec.LostSamples > 0 { klog.Errorln(name, "lost samples:", rec.LostSamples) continue } var event Event switch typ { case perfMapTypeSocketEvents: //fmt.Println("perfMapTypeSocketEvents") // 假设 rec.RawSample 包含数据,类型为 []byte //rawData := rec.RawSample //fmt.Println("perfMapTypeSocketEvents2") // //// 创建一个 SocketDataBuffer 结构体实例 //var buffer SocketDataBuffer // //// 创建一个字节缓冲区,并将数据填充到其中 //reader := bytes.NewReader(rawData) //fmt.Println("perfMapTypeSocketEvents3") //fmt.Println(len(rawData)) //// 使用 binary.Read 函数读取数据并解析为 SocketDataBuffer 结构体实例 //if err := binary.Read(reader, binary.LittleEndian, &buffer); err != nil { // fmt.Println(reader.Len()) // fmt.Println("Failed to read data:", err) // continue //} //fmt.Println("perfMapTypeSocketEvents4") // //// 打印解析后的数据 //fmt.Println("EventsNum:", buffer.EventsNum) //fmt.Println("Len:", buffer.Len) // //// 打印 char data 的内容 //fmt.Printf("Data: %s\n", string(buffer.Data[:buffer.Len])) // 仅打印实际长度的数据 //socketDataBuffer := rec.RawSample // 解析 __socket_data_buffer //buf := (*SocketDataBuffer)(unsafe.Pointer(&rec.RawSample[0])) //nolint:gosec // //// 获取 char data[32760]; //socketData := (*SocketData)(unsafe.Pointer(&buf.data[0])) //nolint:gosec //// todo //fmt.Printf("socketData.DataType:%d \n", (socketData.data_type)) //fmt.Printf("socketData.DataLen:%d \n", (socketData.data_len)) // //// 解析C结构体中的data字段 //dataSlice := C.GoBytes(unsafe.Pointer(&socketData.data[0]), C.int(socketData.data_len)) //// 打印或处理包含的数据 //fmt.Printf("socketData.Payload:%v \n", string(dataSlice)) /*todo */ //socketData := (*(*[128]byte)(unsafe.Pointer(&eventC.line))) //dataPtr := unsafe.Pointer(&buf.data[0]) //socketData := (*SocketData)(dataPtr) //reader2 := bytes.NewBuffer(rec.RawSample) // 222222 //fmt.Println("socketData.Pid:", socketData.pid) //fmt.Println("socketData.Tgid:", socketData.tgid) //fmt.Println("socketData.CoroutineID:", socketData.coroutine_id) //fmt.Println("socketData.Source:", socketData.source) // //fmt.Printf("socketData.Comm: %s \n", socketData.comm) //fmt.Printf("socketData.SocketID :%v \n", socketData.socket_id) //fmt.Println("socketData.Tuple:", socketData.Tuple) //fmt.Println("socketData.ExtraData:", socketData.ExtraData) //fmt.Println("socketData.ExtraDataCount:", socketData.ExtraDataCount) //fmt.Println("socketData.TCPSeq:", socketData.TcpSeq) //fmt.Println("socketData.ThreadTraceID:", socketData.ThreadTraceID) //fmt.Println("socketData.Timestamp:", socketData.Timestamp) //fmt.Println("socketData.Direction:", socketData.Direction) //fmt.Println("socketData.MsgType:", socketData.MsgType) //fmt.Println("socketData.SyscallLen:", socketData.SyscallLen) //fmt.Println("socketData.DataSeq:", socketData.DataSeq) //socketData := &SocketData{} //reader := bytes.NewBuffer(rec.RawSample) //if err := binary.Read(reader, binary.LittleEndian, v); err != nil { // klog.Warningln("failed1 to read msg:", err) // continue //} // //var data []byte //payload := reader.Bytes() //switch { //case v.Len == 0: //case v.Len > 32760: // data = payload[:32760] //default: // data = payload[:v.Len] //} //////data2 := data[:v.Len] ////fmt.Println("perfMapTypeSocketEvents") //fmt.Println(v.EventsNum) //fmt.Println(v.Len) //fmt.Println(string(data)) // //var data2 SocketData //reader2 := bytes.NewBuffer(data) //if err := binary.Read(reader2, binary.LittleEndian, data2); err != nil { // klog.Warningln("failed2 to read msg:", err) // continue //} // //fmt.Println(data2.Pid) //fmt.Println(data2.Tgid) //fmt.Println(string(v.Data)) //continue case perfMapTypeL7Events: v := &l7Event{} reader := bytes.NewBuffer(rec.RawSample) if err := binary.Read(reader, binary.LittleEndian, v); err != nil { klog.Warningln("failed to read msg:", err) continue } //fmt.Println("v.TraceIdFrom") //fmt.Println(v.TraceIdFrom) //a := hex.EncodeToString(v.TraceIdFrom[:]) //for _, b := range v.AssumedAppId { // fmt.Printf("v.AssumedAppId- %02\n", b) //} //fmt.Println(a) payload := reader.Bytes() req := &l7.RequestData{ Protocol: l7.Protocol(v.Protocol), Status: l7.Status(v.Status), Duration: time.Duration(v.Duration), Method: l7.Method(v.Method), StatementId: v.StatementId, TraceId: v.TraceId, TraceStart: v.TraceStart, TraceEnd: v.TraceEnd, EventCount: v.EventCount, AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]), SpanId: hex.EncodeToString(v.SpanId[:]), StartAt: v.StartAt, EndAt: v.EndtAt, ComponentSAddr: ipPort(v.ComponentSAddr, v.ComponentSport), ComponentDAddr: ipPort(v.ComponentDAddr, v.ComponentDport), } if req.Protocol == l7.ProtocolHTTP { klog.Infof("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String()) klog.Infof("runEventsReader ComponentDAddr.String %s", req.ComponentDAddr.String()) } if v.TraceEnd == 1 { req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:]) req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:]) req.ParentSpanContext.InstanceIdFrom = hex.EncodeToString(v.InstanceIdFrom[:]) req.ParentSpanContext.AppIdFrom = hex.EncodeToString(v.AppIdFrom[:]) req.ParentSpanContext.SpanIdFrom = hex.EncodeToString(v.SpanIdFrom[:]) req.SAddr = ipPort(v.SAddr, v.Sport) req.DAddr = ipPort(v.DAddr, v.Dport) klog.Infof("runEventsReader SAddr.String %s", req.SAddr.String()) klog.Infof("runEventsReader DAddr.String %s", req.DAddr.String()) } switch { case v.PayloadSize == 0: case v.PayloadSize > MaxPayloadSize: req.Payload = payload[:MaxPayloadSize] default: req.Payload = payload[:v.PayloadSize] } //fmt.Println("==========") //fmt.Println("req.Payload:", string(req.Payload)) //fmt.Println("==========") event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req} case perfMapTypeFileEvents: v := &fileEvent{} if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil { klog.Warningln("failed to read msg:", err) continue } event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd} case perfMapTypeProcEvents: v := &procEvent{} if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil { klog.Warningln("failed to read msg:", err) continue } event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid} case perfMapTypeTCPEvents: v := &tcpEvent{} if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil { klog.Warningln("failed to read msg:", err) continue } event = Event{ Type: v.Type, Pid: v.Pid, SrcAddr: ipPort(v.SAddr, v.SPort), DstAddr: ipPort(v.DAddr, v.DPort), Fd: v.Fd, Timestamp: v.Timestamp, Duration: time.Duration(v.Duration), } if v.Type == EventTypeConnectionClose { event.TrafficStats = &TrafficStats{ BytesSent: v.BytesSent, BytesReceived: v.BytesReceived, } } event.FirstReadTime = v.FirstReadTime event.FirstWriteTime = v.FirstWriteTime event.NewReadTime = v.NewReadTime if v.Type == EventTypeAcceptClose { event.TrafficStats = &TrafficStats{ BytesSent: v.BytesSent, BytesReceived: v.BytesReceived, } } case perfMapTypePythonThreadEvents: v := &pythonThreadEvent{} if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil { klog.Warningln("failed to read msg:", err) continue } event = Event{ Type: v.Type, Pid: v.Pid, Duration: time.Duration(v.Duration), } case perfMapTypeEventQueue: v := &StackEvent{} if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil { klog.Warningln("failed to read msg:", err) continue } event = Event{ Type: EventTypeFunEnt, StackEvent: v, } default: continue } ch <- event } } func ipPort(ip [16]byte, port uint16) netaddr.IPPort { i, _ := netaddr.FromStdIP(ip[:]) return netaddr.IPPortFrom(i, port) } func (t *Tracer) InitKProcInfo(pid uint32, appInfo *AppInfo) error { var err error var info EbpfProcInfo if appInfo.EBPFProcInfo == nil { info = EbpfProcInfo{ InstanceId: appInfo.InstanceIdHash.HashtVal, AppId: appInfo.AppIdHash.HashtVal, CodeType: uint16(appInfo.CodeType), } } else { info = *appInfo.EBPFProcInfo info.AppId = appInfo.AppIdHash.HashtVal } _, err = tracer.UpdateProcInfoToMap(t.collection, pid, info) if err != nil { klog.Error("failed to update program info", err) } appInfo.EBPFProcInfo = &info return err } func (t *Tracer) DelKProcInfo(pid uint32) error { _, err := tracer.DelProcInfoFromMap(t.collection, pid) if err != nil { klog.Error("failed to delete proc info", err) } return err } // TODO check language func (t *Tracer) DisableL7Tracing() bool { return t.disableL7Tracing } func (t *Tracer) DisableE2ETracing() bool { return t.disableE2ETracing } func (t *Tracer) DisableStackTracing() bool { return t.disableStackTracing }