Procházet zdrojové kódy

optimize deserialization of messages received from eBPF perfmaps

Nikolay Sivko před 2 roky
rodič
revize
559868c12c
1 změnil soubory, kde provedl 77 přidání a 57 odebrání
  1. 77 57
      ebpftracer/tracer.go

+ 77 - 57
ebpftracer/tracer.go

@@ -54,6 +54,15 @@ type Event struct {
 	L7Request *l7.RequestData
 }
 
+type perfMapType uint8
+
+const (
+	perfMapTypeProcEvents perfMapType = 1
+	perfMapTypeTCPEvents  perfMapType = 2
+	perfMapTypeFileEvents perfMapType = 3
+	perfMapTypeL7Events   perfMapType = 4
+)
+
 type Tracer struct {
 	kernelVersion    string
 	disableL7Tracing bool
@@ -142,7 +151,7 @@ func (t *Tracer) init(ch chan<- Event) error {
 type perfMap struct {
 	name                  string
 	perCPUBufferSizePages int
-	event                 rawEvent
+	typ                   perfMapType
 }
 
 func (t *Tracer) ebpf(ch chan<- Event) error {
@@ -183,15 +192,15 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 	t.collection = c
 
 	perfMaps := []perfMap{
-		{name: "proc_events", event: &procEvent{}, perCPUBufferSizePages: 4},
-		{name: "tcp_listen_events", event: &tcpEvent{}, perCPUBufferSizePages: 4},
-		{name: "tcp_connect_events", event: &tcpEvent{}, perCPUBufferSizePages: 8},
-		{name: "tcp_retransmit_events", event: &tcpEvent{}, perCPUBufferSizePages: 4},
-		{name: "file_events", event: &fileEvent{}, perCPUBufferSizePages: 4},
+		{name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
+		{name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
+		{name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
+		{name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
+		{name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
 	}
 
 	if !t.disableL7Tracing {
-		perfMaps = append(perfMaps, perfMap{name: "l7_events", event: &l7Event{}, perCPUBufferSizePages: 32})
+		perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
 	}
 
 	for _, pm := range perfMaps {
@@ -201,7 +210,7 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 			return fmt.Errorf("failed to create ebpf reader: %w", err)
 		}
 		t.readers[pm.name] = r
-		go runEventsReader(pm.name, r, ch, pm.event)
+		go runEventsReader(pm.name, r, ch, pm.typ)
 	}
 
 	for _, programSpec := range collectionSpec.Programs {
@@ -274,24 +283,16 @@ func (t EventReason) String() string {
 	return "unknown: " + strconv.Itoa(int(t))
 }
 
-type rawEvent interface {
-	Event() Event
-}
-
 type procEvent struct {
-	Type   uint32
+	Type   EventType
 	Pid    uint32
 	Reason uint32
 }
 
-func (e procEvent) Event() Event {
-	return Event{Type: EventType(e.Type), Reason: EventReason(e.Reason), Pid: e.Pid}
-}
-
 type tcpEvent struct {
 	Fd        uint64
 	Timestamp uint64
-	Type      uint32
+	Type      EventType
 	Pid       uint32
 	SPort     uint16
 	DPort     uint16
@@ -299,27 +300,12 @@ type tcpEvent struct {
 	DAddr     [16]byte
 }
 
-func (e tcpEvent) Event() Event {
-	return Event{
-		Type:      EventType(e.Type),
-		Pid:       e.Pid,
-		SrcAddr:   ipPort(e.SAddr, e.SPort),
-		DstAddr:   ipPort(e.DAddr, e.DPort),
-		Fd:        e.Fd,
-		Timestamp: e.Timestamp,
-	}
-}
-
 type fileEvent struct {
-	Type uint32
+	Type EventType
 	Pid  uint32
 	Fd   uint64
 }
 
-func (e fileEvent) Event() Event {
-	return Event{Type: EventType(e.Type), Pid: e.Pid, Fd: e.Fd}
-}
-
 type l7Event struct {
 	Fd                  uint64
 	ConnectionTimestamp uint64
@@ -331,28 +317,9 @@ type l7Event struct {
 	Padding             uint16
 	StatementId         uint32
 	PayloadSize         uint64
-	Payload             [MaxPayloadSize]byte
 }
 
-func (e l7Event) Event() Event {
-	r := &l7.RequestData{
-		Protocol:    l7.Protocol(e.Protocol),
-		Status:      l7.Status(e.Status),
-		Duration:    time.Duration(e.Duration),
-		Method:      l7.Method(e.Method),
-		StatementId: e.StatementId,
-	}
-	switch {
-	case e.PayloadSize == 0:
-	case e.PayloadSize > MaxPayloadSize:
-		r.Payload = e.Payload[:MaxPayloadSize]
-	default:
-		r.Payload = e.Payload[:e.PayloadSize]
-	}
-	return Event{Type: EventTypeL7Request, Pid: e.Pid, Fd: e.Fd, Timestamp: e.ConnectionTimestamp, L7Request: r}
-}
-
-func runEventsReader(name string, r *perf.Reader, ch chan<- Event, e rawEvent) {
+func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
 	for {
 		rec, err := r.Read()
 		if err != nil {
@@ -365,11 +332,64 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, e rawEvent) {
 			klog.Errorln(name, "lost samples:", rec.LostSamples)
 			continue
 		}
-		if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, e); err != nil {
-			klog.Warningln("failed to read msg:", err)
+		var event Event
+
+		switch typ {
+		case perfMapTypeL7Events:
+			v := &l7Event{}
+			if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
+				klog.Warningln("failed to read msg:", err)
+				continue
+			}
+			payload := rec.RawSample[len(rec.RawSample)-MaxPayloadSize:]
+			req := &l7.RequestData{
+				Protocol:    l7.Protocol(v.Protocol),
+				Status:      l7.Status(v.Status),
+				Duration:    time.Duration(v.Duration),
+				Method:      l7.Method(v.Method),
+				StatementId: v.StatementId,
+			}
+			switch {
+			case v.PayloadSize == 0:
+			case v.PayloadSize > MaxPayloadSize:
+				req.Payload = payload[:MaxPayloadSize]
+			default:
+				req.Payload = payload[:v.PayloadSize]
+			}
+			event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
+		case perfMapTypeFileEvents:
+			v := &fileEvent{}
+			if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
+				klog.Warningln("failed to read msg:", err)
+				continue
+			}
+			event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
+		case perfMapTypeProcEvents:
+			v := &procEvent{}
+			if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
+				klog.Warningln("failed to read msg:", err)
+				continue
+			}
+			event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
+		case perfMapTypeTCPEvents:
+			v := &tcpEvent{}
+			if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
+				klog.Warningln("failed to read msg:", err)
+				continue
+			}
+			event = Event{
+				Type:      v.Type,
+				Pid:       v.Pid,
+				SrcAddr:   ipPort(v.SAddr, v.SPort),
+				DstAddr:   ipPort(v.DAddr, v.DPort),
+				Fd:        v.Fd,
+				Timestamp: v.Timestamp,
+			}
+		default:
 			continue
 		}
-		ch <- e.Event()
+
+		ch <- event
 	}
 }