Forráskód Böngészése

Merge pull request #107 from coroot/tcp_connection_time

add the `container_net_tcp_connection_time_seconds_total` metric
Nikolay Sivko 1 éve
szülő
commit
05c1e51a16

+ 34 - 21
containers/container.go

@@ -92,6 +92,12 @@ type PidFd struct {
 	Fd  uint64
 }
 
+type ConnectionStats struct {
+	Count           int64
+	TotalTime       time.Duration
+	Retransmissions int64
+}
+
 type Container struct {
 	id       ContainerID
 	cgroup   *cgroup.Cgroup
@@ -110,12 +116,11 @@ type Container struct {
 	listens map[netaddr.IPPort]map[uint32]*ListenDetails
 	ipsByNs map[string][]netaddr.IP
 
-	connectsSuccessful map[AddrPair]int64           // dst:actual_dst -> count
-	connectsFailed     map[netaddr.IPPort]int64     // dst -> count
-	connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
+	connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count
+	connectsFailed     map[netaddr.IPPort]int64      // dst -> count
+	connectLastAttempt map[netaddr.IPPort]time.Time  // dst -> time
 	connectionsActive  map[AddrPair]*ActiveConnection
 	connectionsByPidFd map[PidFd]*ActiveConnection
-	retransmits        map[AddrPair]int64 // dst:actual_dst -> count
 
 	l7Stats  L7Stats
 	dnsStats *L7Metrics
@@ -153,12 +158,11 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
 		listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
 		ipsByNs: map[string][]netaddr.IP{},
 
-		connectsSuccessful: map[AddrPair]int64{},
+		connectsSuccessful: map[AddrPair]*ConnectionStats{},
 		connectsFailed:     map[netaddr.IPPort]int64{},
 		connectLastAttempt: map[netaddr.IPPort]time.Time{},
 		connectionsActive:  map[AddrPair]*ActiveConnection{},
 		connectionsByPidFd: map[PidFd]*ActiveConnection{},
-		retransmits:        map[AddrPair]int64{},
 		l7Stats:            L7Stats{},
 		dnsStats:           &L7Metrics{},
 
@@ -289,14 +293,15 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 		}
 	}
 
-	for d, count := range c.connectsSuccessful {
-		ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String())
+	for d, stats := range c.connectsSuccessful {
+		ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), d.src.String(), d.dst.String())
+		ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), d.src.String(), d.dst.String())
+		if stats.Retransmissions > 0 {
+			ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), d.src.String(), d.dst.String())
+		}
 	}
 	for dst, count := range c.connectsFailed {
-		ch <- counter(metrics.NetConnectsFailed, float64(count), dst.String())
-	}
-	for d, count := range c.retransmits {
-		ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String())
+		ch <- counter(metrics.NetConnectionsFailed, float64(count), dst.String())
 	}
 
 	connections := map[AddrPair]int{}
@@ -489,7 +494,7 @@ func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
 	}
 }
 
-func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool) {
+func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
 	if common.PortFilter.ShouldBeSkipped(dst.Port()) {
 		return
 	}
@@ -521,7 +526,14 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
 	if failed {
 		c.connectsFailed[dst]++
 	} else {
-		c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
+		key := AddrPair{src: dst, dst: *actualDst}
+		stats := c.connectsSuccessful[key]
+		if stats == nil {
+			stats = &ConnectionStats{}
+			c.connectsSuccessful[key] = stats
+		}
+		stats.Count++
+		stats.TotalTime += duration
 		connection := &ActiveConnection{
 			Dest:       dst,
 			ActualDest: *actualDst,
@@ -682,14 +694,20 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
 	return nil
 }
 
-func (c *Container) onRetransmit(srcDst AddrPair) bool {
+func (c *Container) onRetransmission(srcDst AddrPair) bool {
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	conn, ok := c.connectionsActive[srcDst]
 	if !ok {
 		return false
 	}
-	c.retransmits[AddrPair{src: srcDst.dst, dst: conn.ActualDest}]++
+	key := AddrPair{src: srcDst.dst, dst: conn.ActualDest}
+	stats := c.connectsSuccessful[key]
+	if stats == nil {
+		stats = &ConnectionStats{}
+		c.connectsSuccessful[key] = stats
+	}
+	stats.Retransmissions++
 	return true
 }
 
@@ -987,11 +1005,6 @@ func (c *Container) gc(now time.Time) {
 					delete(c.connectsSuccessful, d)
 				}
 			}
-			for d := range c.retransmits {
-				if d.src == dst {
-					delete(c.retransmits, d)
-				}
-			}
 			c.l7Stats.delete(dst)
 		}
 	}

+ 14 - 12
containers/metrics.go

@@ -28,12 +28,13 @@ var metrics = struct {
 	DiskWriteOps   *prometheus.Desc
 	DiskWriteBytes *prometheus.Desc
 
-	NetListenInfo         *prometheus.Desc
-	NetConnectsSuccessful *prometheus.Desc
-	NetConnectsFailed     *prometheus.Desc
-	NetConnectionsActive  *prometheus.Desc
-	NetRetransmits        *prometheus.Desc
-	NetLatency            *prometheus.Desc
+	NetListenInfo            *prometheus.Desc
+	NetConnectionsSuccessful *prometheus.Desc
+	NetConnectionsTotalTime  *prometheus.Desc
+	NetConnectionsFailed     *prometheus.Desc
+	NetConnectionsActive     *prometheus.Desc
+	NetRetransmits           *prometheus.Desc
+	NetLatency               *prometheus.Desc
 
 	LogMessages *prometheus.Desc
 
@@ -70,12 +71,13 @@ var metrics = struct {
 	DiskWriteOps:   metric("container_resources_disk_writes_total", "Total number of writes completed successfully by the container", "mount_point", "device", "volume"),
 	DiskWriteBytes: metric("container_resources_disk_written_bytes_total", "Total number of bytes written to the disk by the container", "mount_point", "device", "volume"),
 
-	NetListenInfo:         metric("container_net_tcp_listen_info", "Listen address of the container", "listen_addr", "proxy"),
-	NetConnectsSuccessful: metric("container_net_tcp_successful_connects_total", "Total number of successful TCP connects", "destination", "actual_destination"),
-	NetConnectsFailed:     metric("container_net_tcp_failed_connects_total", "Total number of failed TCP connects", "destination"),
-	NetConnectionsActive:  metric("container_net_tcp_active_connections", "Number of active outbound connections used by the container", "destination", "actual_destination"),
-	NetRetransmits:        metric("container_net_tcp_retransmits_total", "Total number of retransmitted TCP segments", "destination", "actual_destination"),
-	NetLatency:            metric("container_net_latency_seconds", "Round-trip time between the container and a remote IP", "destination_ip"),
+	NetListenInfo:            metric("container_net_tcp_listen_info", "Listen address of the container", "listen_addr", "proxy"),
+	NetConnectionsSuccessful: metric("container_net_tcp_successful_connects_total", "Total number of successful TCP connects", "destination", "actual_destination"),
+	NetConnectionsTotalTime:  metric("container_net_tcp_connection_time_seconds_total", "Time spent on TCP connections", "destination", "actual_destination"),
+	NetConnectionsFailed:     metric("container_net_tcp_failed_connects_total", "Total number of failed TCP connects", "destination"),
+	NetConnectionsActive:     metric("container_net_tcp_active_connections", "Number of active outbound connections used by the container", "destination", "actual_destination"),
+	NetRetransmits:           metric("container_net_tcp_retransmits_total", "Total number of retransmitted TCP segments", "destination", "actual_destination"),
+	NetLatency:               metric("container_net_latency_seconds", "Round-trip time between the container and a remote IP", "destination_ip"),
 
 	LogMessages: metric("container_log_messages_total", "Number of messages grouped by the automatically extracted repeated pattern", "source", "level", "pattern_hash", "sample"),
 

+ 3 - 3
containers/registry.go

@@ -235,14 +235,14 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 
 			case ebpftracer.EventTypeConnectionOpen:
 				if c := r.getOrCreateContainer(e.Pid); c != nil {
-					c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false)
+					c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false, e.Duration)
 					c.attachTlsUprobes(r.tracer, e.Pid)
 				} else {
 					klog.Infoln("TCP connection from unknown container", e)
 				}
 			case ebpftracer.EventTypeConnectionError:
 				if c := r.getOrCreateContainer(e.Pid); c != nil {
-					c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, 0, true)
+					c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, 0, true, e.Duration)
 				} else {
 					klog.Infoln("TCP connection error from unknown container", e)
 				}
@@ -256,7 +256,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 			case ebpftracer.EventTypeTCPRetransmit:
 				srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
 				for _, c := range r.containersById {
-					if c.onRetransmit(srcDst) {
+					if c.onRetransmission(srcDst) {
 						break
 					}
 				}

A különbségek nem kerülnek megjelenítésre, a fájl túl nagy
+ 0 - 0
ebpftracer/ebpf.go


+ 18 - 6
ebpftracer/ebpf/tcp/state.c

@@ -3,6 +3,7 @@
 struct tcp_event {
     __u64 fd;
     __u64 timestamp;
+    __u64 duration;
     __u32 type;
     __u32 pid;
     __u16 sport;
@@ -53,12 +54,19 @@ struct sk_info {
     __u64 fd;
     __u32 pid;
 };
+
+struct conn_info {
+    __u64 fd;
+    __u64 ts;
+    __u32 pid;
+};
+
 struct {
     __uint(type, BPF_MAP_TYPE_HASH);
     __uint(key_size, sizeof(void *));
-    __uint(value_size, sizeof(struct sk_info));
+    __uint(value_size, sizeof(struct conn_info));
     __uint(max_entries, 10240);
-} sk_info SEC(".maps");
+} conn_info SEC(".maps");
 
 struct {
     __uint(type, BPF_MAP_TYPE_LRU_HASH);
@@ -86,20 +94,22 @@ int inet_sock_set_state(void *ctx)
         if (!fdp) {
             return 0;
         }
-        struct sk_info i = {};
+        struct conn_info i = {};
         i.pid = pid;
+        i.ts = bpf_ktime_get_ns();
         i.fd = *fdp;
         bpf_map_delete_elem(&fd_by_pid_tgid, &id);
-        bpf_map_update_elem(&sk_info, &args.skaddr, &i, BPF_ANY);
+        bpf_map_update_elem(&conn_info, &args.skaddr, &i, BPF_ANY);
         return 0;
     }
 
     __u64 fd = 0;
     __u32 type = 0;
     __u64 timestamp = 0;
+    __u64 duration = 0;
     void *map = &tcp_connect_events;
     if (args.oldstate == BPF_TCP_SYN_SENT) {
-        struct sk_info *i = bpf_map_lookup_elem(&sk_info, &args.skaddr);
+        struct conn_info *i = bpf_map_lookup_elem(&conn_info, &args.skaddr);
         if (!i) {
             return 0;
         }
@@ -113,9 +123,10 @@ int inet_sock_set_state(void *ctx)
         } else if (args.newstate == BPF_TCP_CLOSE) {
             type = EVENT_TYPE_CONNECTION_ERROR;
         }
+        duration = bpf_ktime_get_ns() - i->ts;
         pid = i->pid;
         fd = i->fd;
-        bpf_map_delete_elem(&sk_info, &args.skaddr);
+        bpf_map_delete_elem(&conn_info, &args.skaddr);
     }
     if (args.oldstate == BPF_TCP_ESTABLISHED && (args.newstate == BPF_TCP_FIN_WAIT1 || args.newstate == BPF_TCP_CLOSE_WAIT)) {
         pid = 0;
@@ -136,6 +147,7 @@ int inet_sock_set_state(void *ctx)
 
     struct tcp_event e = {};
     e.type = type;
+    e.duration = duration;
     e.timestamp = timestamp;
     e.pid = pid;
     e.sport = args.sport;

+ 3 - 0
ebpftracer/tracer.go

@@ -52,6 +52,7 @@ type Event struct {
 	DstAddr   netaddr.IPPort
 	Fd        uint64
 	Timestamp uint64
+	Duration  time.Duration
 	L7Request *l7.RequestData
 }
 
@@ -295,6 +296,7 @@ type procEvent struct {
 type tcpEvent struct {
 	Fd        uint64
 	Timestamp uint64
+	Duration  uint64
 	Type      EventType
 	Pid       uint32
 	SPort     uint16
@@ -388,6 +390,7 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				DstAddr:   ipPort(v.DAddr, v.DPort),
 				Fd:        v.Fd,
 				Timestamp: v.Timestamp,
+				Duration:  time.Duration(v.Duration),
 			}
 		default:
 			continue

Nem az összes módosított fájl került megjelenítésre, mert túl sok fájl változott