Quellcode durchsuchen

Fixed #TASK_QT-9810 Network

rock vor 1 Jahr
Ursprung
Commit
96d9a4e98c
4 geänderte Dateien mit 115 neuen und 1 gelöschten Zeilen
  1. 83 0
      containers/container.go
  2. 5 1
      containers/metrics.go
  3. 21 0
      containers/registry.go
  4. 6 0
      ebpftracer/tracer.go

+ 83 - 0
containers/container.go

@@ -92,6 +92,18 @@ type ActiveConnection struct {
 	dmParser       *l7.DmParser
 }
 
+type ActiveAccept struct {
+	Dest       netaddr.IPPort
+	ActualDest netaddr.IPPort
+	Pid        uint32
+	Fd         uint64
+	Timestamp  uint64
+	Closed     time.Time
+
+	BytesSent     uint64
+	BytesReceived uint64
+}
+
 type ListenDetails struct {
 	ClosedAt time.Time
 	NsIPs    []netaddr.IP
@@ -118,6 +130,11 @@ type ConnectionStats struct {
 	BytesReceived   uint64
 }
 
+type AcceptStats struct {
+	BytesSent       uint64
+	BytesReceived   uint64
+}
+
 type Container struct {
 	id       ContainerID
 	cgroup   *cgroup.Cgroup
@@ -142,6 +159,11 @@ type Container struct {
 	connectLastAttempt map[netaddr.IPPort]time.Time  // dst -> time
 	connectionsActive  map[AddrPair]*ActiveConnection
 	connectionsByPidFd map[PidFd]*ActiveConnection
+	
+	acceptsSuccessful  map[AddrPair]*AcceptStats
+	acceptLastAttempt 	map[netaddr.IPPort]time.Time  // dst -> time
+	acceptsActive  		map[AddrPair]*ActiveAccept
+	acceptsByPidFd 		map[PidFd]*ActiveAccept
 
 	l7Stats  L7Stats
 	dnsStats *L7Metrics
@@ -198,6 +220,11 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
 		connectLastAttempt: map[netaddr.IPPort]time.Time{},
 		connectionsActive:  map[AddrPair]*ActiveConnection{},
 		connectionsByPidFd: map[PidFd]*ActiveConnection{},
+		acceptsSuccessful: 	map[AddrPair]*AcceptStats{},
+		acceptLastAttempt: map[netaddr.IPPort]time.Time{},
+		acceptsActive:  	map[AddrPair]*ActiveAccept{},
+		acceptsByPidFd: 	map[PidFd]*ActiveAccept{},
+		
 		l7Stats:            L7Stats{},
 		dnsStats:           &L7Metrics{},
 
@@ -341,6 +368,12 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 		ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String())
 		ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String())
 	}
+
+	for d, stats := range c.acceptsSuccessful {
+		ch <- counter(metrics.NetAcceptsSuccessful, float64(0), d.src.String(), d.dst.String())
+		ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String())
+		ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String())
+	}
 	for dst, count := range c.connectsFailed {
 		ch <- counter(metrics.NetConnectionsFailed, float64(count), dst.String())
 	}
@@ -548,6 +581,56 @@ func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
 	}
 }
 
+func (c *Container) onAcceptOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
+	klog.Infof("accept pid=%d id=%s addr=%s", pid, c.id, dst.IP())
+	if common.PortFilter.ShouldBeSkipped(dst.Port()) {
+		return
+	}
+	p := c.processes[pid]
+	if p == nil {
+		return
+	}
+	if dst.IP().IsLoopback() && !p.isHostNs() {
+		return
+	}
+	actualDst, err := c.getActualDestination(p, src, dst)
+	if err != nil {
+		if !common.IsNotExist(err) {
+			klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
+		}
+		return
+	}
+	switch {
+	case actualDst == nil:
+		actualDst = &dst
+	case actualDst.IP().IsLoopback() && !p.isHostNs():
+		return
+	}
+	if common.ConnectionFilter.ShouldBeSkipped(dst.IP(), actualDst.IP()) {
+		return
+	}
+	c.lock.Lock()
+	defer c.lock.Unlock()
+	if !failed {
+		key := AddrPair{src: dst, dst: *actualDst}
+		stats := c.acceptsSuccessful[key]
+		if stats == nil {
+			stats = &AcceptStats{}
+			c.acceptsSuccessful[key] = stats
+		}
+		acceptCon := &ActiveAccept{
+			Dest:       dst,
+			ActualDest: *actualDst,
+			Pid:        pid,
+			Fd:         fd,
+			Timestamp:  timestamp,
+		}
+		c.acceptsActive[AddrPair{src: src, dst: dst}] = acceptCon
+		c.acceptsByPidFd[PidFd{Pid: pid, Fd: fd}] = acceptCon
+	}
+	c.acceptLastAttempt[dst] = time.Now()
+}
+
 func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
 	if common.PortFilter.ShouldBeSkipped(dst.Port()) {
 		return

+ 5 - 1
containers/metrics.go

@@ -38,6 +38,9 @@ var metrics = struct {
 	NetBytesSent             *prometheus.Desc
 	NetBytesReceived         *prometheus.Desc
 
+	NetAcceptsSuccessful 	 *prometheus.Desc
+	NetAcceptsActive     	 *prometheus.Desc
+
 	LogMessages *prometheus.Desc
 
 	ApplicationType *prometheus.Desc
@@ -85,7 +88,8 @@ var metrics = struct {
 	NetLatency:               metric("container_net_latency_seconds", "Round-trip time between the container and a remote IP", "destination_ip"),
 	NetBytesSent:             metric("container_net_tcp_bytes_sent_total", "Total number of bytes sent to the peer", "destination", "actual_destination"),
 	NetBytesReceived:         metric("container_net_tcp_bytes_received_total", "Total number of bytes received from the peer", "destination", "actual_destination"),
-
+	NetAcceptsSuccessful: 	  metric("container_net_tcp_successful_accept_total", "Total number of successful TCP accepts", "destination", "actual_destination"),
+	
 	LogMessages: metric("container_log_messages_total", "Number of messages grouped by the automatically extracted repeated pattern", "source", "level", "pattern_hash", "sample"),
 
 	ApplicationType: metric("container_application_type", "Type of the application running in the container (e.g. memcached, postgres, mysql)", "application_type"),

+ 21 - 0
containers/registry.go

@@ -364,6 +364,27 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 				} else {
 					klog.Infoln("TCP listen open from unknown container", e)
 				}
+			case ebpftracer.EventTypeAcceptOpen:
+				fmt.Println("ebpftracer.EventTypeAcceptOpen==================", e.Pid)
+				if c := r.getOrCreateContainer(e.Pid); c != nil {
+					c.onAcceptOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false, e.Duration)
+					c.eventReady()
+					// if common.IsOpenFilter() && common.IsFilterPid(e.Pid) {
+					// 	c.WhiteSettingInfo.AppName = enums.TestApp
+					// 	err := c.RegisterAppInfo(r, e.Pid)
+					// 	if err != nil {
+					// 		klog.WithError(err).Errorf("[registry] Failed registerAppInfo. pid is %d", e.Pid)
+					// 		continue
+					// 	}
+					// 	c.attachUprobes(r.tracer, e.Pid)
+					// 	err = c.StackTrace(r.tracer, e.Pid)
+					// 	if err != nil {
+					// 		klog.Errorf("Stack trace error", err)
+					// 	}
+					// }
+				} else {
+					klog.Infoln("TCP connection from unknown container", e)
+				}
 			case ebpftracer.EventTypeConnectionOpen:
 				//fmt.Println("ebpftracer.EventTypeConnectionOpen==================", e.Pid)
 				if c := r.getOrCreateContainer(e.Pid); c != nil {

+ 6 - 0
ebpftracer/tracer.go

@@ -748,6 +748,12 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 					BytesReceived: v.BytesReceived,
 				}
 			}
+			if v.Type == EventTypeAcceptClose {
+				event.TrafficStats = &TrafficStats{
+					BytesSent:     v.BytesSent,
+					BytesReceived: v.BytesReceived,
+				}
+			}
 		case perfMapTypePythonThreadEvents:
 			v := &pythonThreadEvent{}
 			if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {