Bladeren bron

add `container_net_tcp_bytes_sent_total` and `container_net_tcp_bytes_received_total` metrics

Nikolay Sivko 1 jaar geleden
bovenliggende
commit
9f6909a141

+ 69 - 11
containers/container.go

@@ -77,6 +77,9 @@ type ActiveConnection struct {
 	Timestamp  uint64
 	Closed     time.Time
 
+	BytesSent     uint64
+	BytesReceived uint64
+
 	http2Parser    *l7.Http2Parser
 	postgresParser *l7.PostgresParser
 	mysqlParser    *l7.MysqlParser
@@ -93,9 +96,11 @@ type PidFd struct {
 }
 
 type ConnectionStats struct {
-	Count           int64
+	Count           uint64
 	TotalTime       time.Duration
-	Retransmissions int64
+	Retransmissions uint64
+	BytesSent       uint64
+	BytesReceived   uint64
 }
 
 type Container struct {
@@ -136,12 +141,14 @@ type Container struct {
 	nsConntrack   *Conntrack
 	lbConntracks  []*Conntrack
 
+	registry *Registry
+
 	lock sync.RWMutex
 
 	done chan struct{}
 }
 
-func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32) (*Container, error) {
+func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32, registry *Registry) (*Container, error) {
 	netNs, err := proc.GetNetNs(pid)
 	if err != nil {
 		return nil, err
@@ -173,6 +180,8 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
 
 		hostConntrack: hostConntrack,
 
+		registry: registry,
+
 		done: make(chan struct{}),
 	}
 
@@ -228,6 +237,8 @@ func (c *Container) Describe(ch chan<- *prometheus.Desc) {
 }
 
 func (c *Container) Collect(ch chan<- prometheus.Metric) {
+	c.registry.updateTrafficStatsIfNecessary()
+
 	c.lock.RLock()
 	defer c.lock.RUnlock()
 
@@ -300,6 +311,8 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 		if stats.Retransmissions > 0 {
 			ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), d.src.String(), d.dst.String())
 		}
+		ch <- counter(metrics.NetBytesSent, float64(stats.BytesSent), d.src.String(), d.dst.String())
+		ch <- counter(metrics.NetBytesReceived, float64(stats.BytesReceived), d.src.String(), d.dst.String())
 	}
 	for dst, count := range c.connectsFailed {
 		ch <- counter(metrics.NetConnectionsFailed, float64(count), dst.String())
@@ -372,7 +385,7 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 	}
 }
 
-func (c *Container) onProcessStart(pid uint32, trace *ebpftracer.Tracer) *Process {
+func (c *Container) onProcessStart(pid uint32) *Process {
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	stats, err := TaskstatsPID(pid)
@@ -380,7 +393,7 @@ func (c *Container) onProcessStart(pid uint32, trace *ebpftracer.Tracer) *Proces
 		return nil
 	}
 	c.zombieAt = time.Time{}
-	p := NewProcess(pid, stats, trace)
+	p := NewProcess(pid, stats, c.registry.tracer)
 
 	if p == nil {
 		return nil
@@ -583,15 +596,60 @@ func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*
 	return nil, nil
 }
 
-func (c *Container) onConnectionClose(srcDst AddrPair) bool {
+func (c *Container) onConnectionClose(e ebpftracer.Event) bool {
+	srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
+	c.lock.Lock()
+	conn, ok := c.connectionsActive[srcDst]
+	c.lock.Unlock()
+	if conn != nil {
+		if conn.Closed.IsZero() {
+			if e.Pid == 0 && e.Fd == 0 {
+				stats, err := c.registry.tracer.GetAndDeleteTCPConnection(conn.Pid, conn.Fd)
+				if err != nil {
+					klog.Warningln(c.id, conn.Pid, conn.Fd, conn.ActualDest, err)
+				} else {
+					c.lock.Lock()
+					c.updateConnectionTrafficStats(conn, stats.BytesSent, stats.BytesReceived)
+					c.lock.Unlock()
+				}
+			} else if e.TrafficStats != nil {
+				c.lock.Lock()
+				c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived)
+				c.lock.Unlock()
+			}
+			conn.Closed = time.Now()
+		}
+	}
+	return ok
+}
+
+func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) {
+	if u == nil {
+		return
+	}
 	c.lock.Lock()
 	defer c.lock.Unlock()
-	conn := c.connectionsActive[srcDst]
-	if conn == nil {
-		return false
+	c.updateConnectionTrafficStats(c.connectionsByPidFd[PidFd{Pid: u.Pid, Fd: u.FD}], u.BytesSent, u.BytesReceived)
+}
+
+func (c *Container) updateConnectionTrafficStats(ac *ActiveConnection, sent, received uint64) {
+	if ac == nil {
+		return
 	}
-	conn.Closed = time.Now()
-	return true
+	key := AddrPair{src: ac.Dest, dst: ac.ActualDest}
+	stats := c.connectsSuccessful[key]
+	if stats == nil {
+		stats = &ConnectionStats{}
+		c.connectsSuccessful[key] = stats
+	}
+	if sent > ac.BytesSent {
+		stats.BytesSent += sent - ac.BytesSent
+	}
+	if received > ac.BytesReceived {
+		stats.BytesReceived += received - ac.BytesReceived
+	}
+	ac.BytesSent = sent
+	ac.BytesReceived = received
 }
 
 func (c *Container) onDNSRequest(r *l7.RequestData) map[netaddr.IP]string {

+ 4 - 0
containers/metrics.go

@@ -35,6 +35,8 @@ var metrics = struct {
 	NetConnectionsActive     *prometheus.Desc
 	NetRetransmits           *prometheus.Desc
 	NetLatency               *prometheus.Desc
+	NetBytesSent             *prometheus.Desc
+	NetBytesReceived         *prometheus.Desc
 
 	LogMessages *prometheus.Desc
 
@@ -81,6 +83,8 @@ var metrics = struct {
 	NetConnectionsActive:     metric("container_net_tcp_active_connections", "Number of active outbound connections used by the container", "destination", "actual_destination"),
 	NetRetransmits:           metric("container_net_tcp_retransmits_total", "Total number of retransmitted TCP segments", "destination", "actual_destination"),
 	NetLatency:               metric("container_net_latency_seconds", "Round-trip time between the container and a remote IP", "destination_ip"),
+	NetBytesSent:             metric("container_net_tcp_bytes_sent_total", "Total number of bytes sent to the peer", "destination", "actual_destination"),
+	NetBytesReceived:         metric("container_net_tcp_bytes_received_total", "Total number of bytes received from the peer", "destination", "actual_destination"),
 
 	LogMessages: metric("container_log_messages_total", "Number of messages grouped by the automatically extracted repeated pattern", "source", "level", "pattern_hash", "sample"),
 

+ 58 - 6
containers/registry.go

@@ -20,6 +20,8 @@ import (
 	"k8s.io/klog/v2"
 )
 
+const MinTrafficStatsUpdateInterval = 5 * time.Second
+
 var (
 	selfNetNs         = netns.None()
 	hostNetNsId       = netns.None().UniqueId()
@@ -48,6 +50,10 @@ type Registry struct {
 	ip2fqdnLock          sync.Mutex
 
 	processInfoCh chan<- ProcessInfo
+
+	trafficStatsLastUpdated time.Time
+	trafficStatsLock        sync.Mutex
+	trafficStatsUpdateCh    chan *TrafficStatsUpdate
 }
 
 func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh chan<- ProcessInfo) (*Registry, error) {
@@ -106,6 +112,8 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh
 		processInfoCh: processInfoCh,
 
 		tracer: ebpftracer.NewTracer(kernelVersion, *flags.DisableL7Tracing),
+
+		trafficStatsUpdateCh: make(chan *TrafficStatsUpdate),
 	}
 	if err = reg.Register(r); err != nil {
 		return nil, err
@@ -188,6 +196,13 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 				}
 			}
 			r.ip2fqdnLock.Unlock()
+		case u := <-r.trafficStatsUpdateCh:
+			if u == nil {
+				continue
+			}
+			if c := r.containersByPid[u.Pid]; c != nil {
+				c.updateTrafficStats(u)
+			}
 		case e, more := <-ch:
 			if !more {
 				return
@@ -206,7 +221,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 					}
 				}
 				if c := r.getOrCreateContainer(e.Pid); c != nil {
-					p := c.onProcessStart(e.Pid, r.tracer)
+					p := c.onProcessStart(e.Pid)
 					if r.processInfoCh != nil && p != nil {
 						r.processInfoCh <- ProcessInfo{Pid: p.Pid, ContainerId: c.id, StartedAt: p.StartedAt}
 					}
@@ -247,10 +262,15 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 					klog.Infoln("TCP connection error from unknown container", e)
 				}
 			case ebpftracer.EventTypeConnectionClose:
-				srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
-				for _, c := range r.containersById {
-					if c.onConnectionClose(srcDst) {
-						break
+				if e.Pid != 0 && e.Fd != 0 {
+					if c := r.containersByPid[e.Pid]; c != nil {
+						c.onConnectionClose(e)
+					}
+				} else {
+					for _, c := range r.containersById {
+						if c.onConnectionClose(e) {
+							break
+						}
 					}
 				}
 			case ebpftracer.EventTypeTCPRetransmit:
@@ -340,7 +360,7 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
 		r.containersByCgroupId[cg.Id] = c
 		return c
 	}
-	c, err := NewContainer(id, cg, md, r.hostConntrack, pid)
+	c, err := NewContainer(id, cg, md, r.hostConntrack, pid, r)
 	if err != nil {
 		klog.Warningf("failed to create container pid=%d cg=%s id=%s: %s", pid, cg.Id, id, err)
 		return nil
@@ -357,6 +377,31 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
 	return c
 }
 
+func (r *Registry) updateTrafficStatsIfNecessary() {
+	r.trafficStatsLock.Lock()
+	defer r.trafficStatsLock.Unlock()
+
+	if time.Now().Sub(r.trafficStatsLastUpdated) < MinTrafficStatsUpdateInterval {
+		return
+	}
+	iter := r.tracer.ActiveConnectionsIterator()
+	cid := ebpftracer.ConnectionId{}
+	stats := ebpftracer.Connection{}
+	for iter.Next(&cid, &stats) {
+		r.trafficStatsUpdateCh <- &TrafficStatsUpdate{
+			Pid:           cid.PID,
+			FD:            cid.FD,
+			BytesSent:     stats.BytesSent,
+			BytesReceived: stats.BytesReceived,
+		}
+	}
+	if err := iter.Err(); err != nil {
+		klog.Warningln(err)
+	}
+	r.trafficStatsUpdateCh <- nil
+	r.trafficStatsLastUpdated = time.Now()
+}
+
 func calcId(cg *cgroup.Cgroup, md *ContainerMetadata) ContainerID {
 	if cg.ContainerType == cgroup.ContainerTypeSystemdService {
 		if strings.HasPrefix(cg.ContainerId, "/system.slice/crio-conmon-") {
@@ -446,3 +491,10 @@ func getContainerMetadata(cg *cgroup.Cgroup) (*ContainerMetadata, error) {
 	}
 	return nil, fmt.Errorf("failed to interact with dockerd (%s) or with containerd (%s)", dockerdErr, containerdErr)
 }
+
+type TrafficStatsUpdate struct {
+	Pid           uint32
+	FD            uint64
+	BytesSent     uint64
+	BytesReceived uint64
+}

+ 5 - 5
containers/systemd.go

@@ -16,20 +16,20 @@ import (
 )
 
 var (
-	conn        *dbus.Conn
+	dbusConn    *dbus.Conn
 	dbusTimeout = time.Second
 )
 
 func init() {
 	var err error
-	conn, err = dbus.NewConnection(func() (*gdbus.Conn, error) {
+	dbusConn, err = dbus.NewConnection(func() (*gdbus.Conn, error) {
 		c, err := gdbus.Dial("unix:path=" + proc.HostPath("/run/systemd/private"))
 		if err != nil {
 			return nil, err
 		}
 		methods := []gdbus.Auth{gdbus.AuthExternal(strconv.Itoa(os.Getuid()))}
 		if err = c.Auth(methods); err != nil {
-			conn.Close()
+			dbusConn.Close()
 			return nil, err
 		}
 		return c, nil
@@ -40,14 +40,14 @@ func init() {
 }
 
 func SystemdTriggeredBy(id string) string {
-	if conn == nil {
+	if dbusConn == nil {
 		return ""
 	}
 	ctx, cancel := context.WithTimeout(context.Background(), dbusTimeout)
 	defer cancel()
 	parts := strings.Split(id, "/")
 	unit := parts[len(parts)-1]
-	if prop, _ := conn.GetUnitPropertyContext(ctx, unit, "TriggeredBy"); prop != nil {
+	if prop, _ := dbusConn.GetUnitPropertyContext(ctx, unit, "TriggeredBy"); prop != nil {
 		if values, _ := prop.Value.Value().([]string); len(values) > 0 {
 			return values[0]
 		}

File diff suppressed because it is too large
+ 0 - 0
ebpftracer/ebpf.go


+ 1 - 1
ebpftracer/ebpf/l7/gotls.c

@@ -56,7 +56,7 @@ int go_crypto_tls_read_enter(struct pt_regs *ctx) {
     __u64 goroutine_id = GOROUTINE(ctx);
     __u64 pid = pid_tgid >> 32;
     __u64 id = pid << 32 | goroutine_id | IS_TLS_READ_ID;
-    return trace_enter_read(id, fd, buf_ptr, 0, 0);
+    return trace_enter_read(id, pid, fd, buf_ptr, 0, 0);
 }
 
 SEC("uprobe/go_crypto_tls_read_exit")

+ 77 - 49
ebpftracer/ebpf/l7/l7.c

@@ -89,7 +89,7 @@ struct read_args {
 };
 
 struct {
-    __uint(type, BPF_MAP_TYPE_HASH);
+    __uint(type, BPF_MAP_TYPE_LRU_HASH);
     __uint(key_size, sizeof(__u64));
     __uint(value_size, sizeof(struct read_args));
     __uint(max_entries, 10240);
@@ -163,26 +163,15 @@ struct user_msghdr {
 };
 
 static inline __attribute__((__always_inline__))
-void send_event(void *ctx, struct l7_event *e, __u32 pid, __u64 fd) {
-    struct sk_info sk = {};
-    sk.pid = pid;
-    sk.fd = fd;
-    __u64 *timestamp = bpf_map_lookup_elem(&connection_timestamps, &sk);
-    if (timestamp) {
-        if (*timestamp == 0) {
-            return;
-        }
-        e->connection_timestamp = *timestamp;
-    } else {
-        e->connection_timestamp = 0;
-    }
-    e->fd = fd;
-    e->pid = pid;
+void send_event(void *ctx, struct l7_event *e, struct connection_id cid, struct connection *conn) {
+    e->connection_timestamp = conn->timestamp;
+    e->fd = cid.fd;
+    e->pid = cid.pid;
     bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
 }
 
 static inline __attribute__((__always_inline__))
-__u64 read_iovec(char *iovec, __u64 iovlen, __u64 ret, char *buf) {
+__u64 read_iovec(char *iovec, __u64 iovlen, __u64 ret, char *buf, __u64 *total_size) {
     struct iovec iov = {};
     __u64 max = (ret) ? MIN(ret, MAX_PAYLOAD_SIZE) : MAX_PAYLOAD_SIZE;
     __u64 offset = 0;
@@ -198,15 +187,15 @@ __u64 read_iovec(char *iovec, __u64 iovlen, __u64 ret, char *buf) {
         if (iov.size <= 0) {
             continue;
         }
-        size = MIN(iov.size, max-offset);
-        TRUNCATE_PAYLOAD_SIZE(size);
-        TRUNCATE_PAYLOAD_SIZE(offset);
-        if (bpf_probe_read(buf + offset, size, (void *)iov.buf)) {
-            return 0;
-        }
-        offset += size;
-        if (offset >= max) {
-            break;
+        *total_size += iov.size;
+        if (offset < max) {
+            size = MIN(iov.size, max-offset);
+            TRUNCATE_PAYLOAD_SIZE(size);
+            TRUNCATE_PAYLOAD_SIZE(offset);
+            if (bpf_probe_read(buf + offset, size, (void *)iov.buf)) {
+                return 0;
+            }
+            offset += size;
         }
     }
     return offset;
@@ -216,6 +205,15 @@ static inline __attribute__((__always_inline__))
 int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size, __u64 iovlen) {
     __u64 id = bpf_get_current_pid_tgid();
     __u32 zero = 0;
+    struct connection_id cid = {};
+    cid.pid = id >> 32;
+    cid.fd = fd;
+    __u64 total_size = size;
+
+    struct connection *conn = bpf_map_lookup_elem(&active_connections, &cid);
+    if (!conn) {
+        return 0;
+    }
 
     char* payload = buf;
     if (iovlen) {
@@ -223,12 +221,17 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         if (!payload) {
             return 0;
         }
-        size = read_iovec(buf, iovlen, 0, payload);
+        total_size = 0;
+        size = read_iovec(buf, iovlen, 0, payload, &total_size);
     }
     if (!size) {
         return 0;
     }
 
+    if (!is_tls) {
+        __sync_fetch_and_add(&conn->bytes_sent, total_size);
+    }
+
     struct l7_request *req = bpf_map_lookup_elem(&l7_request_heap, &zero);
     if (!req) {
         return 0;
@@ -239,8 +242,8 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
     req->ns = 0;
     req->payload_size = size;
     struct l7_request_key k = {};
-    k.pid = id >> 32;
-    k.fd = fd;
+    k.pid = cid.pid;
+    k.fd = cid.fd;
     k.is_tls = is_tls;
     k.stream_id = -1;
 
@@ -256,7 +259,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
             e->method = METHOD_STATEMENT_CLOSE;
             e->payload_size = size;
             COPY_PAYLOAD(e->payload, size, payload);
-            send_event(ctx, e, k.pid, k.fd);
+            send_event(ctx, e, cid, conn);
             return 0;
         }
         req->protocol = PROTOCOL_POSTGRES;
@@ -274,7 +277,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
             e->method = METHOD_STATEMENT_CLOSE;
             e->payload_size = size;
             COPY_PAYLOAD(e->payload, size, payload);
-            send_event(ctx, e, k.pid, k.fd);
+            send_event(ctx, e, cid, conn);
             return 0;
         }
         req->protocol = PROTOCOL_MYSQL;
@@ -287,7 +290,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         }
         e->protocol = PROTOCOL_RABBITMQ;
         e->method = METHOD_PRODUCE;
-        send_event(ctx, e, k.pid, k.fd);
+        send_event(ctx, e, cid, conn);
         return 0;
     } else if (nats_method(payload, size) == METHOD_PRODUCE) {
         struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
@@ -296,7 +299,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         }
         e->protocol = PROTOCOL_NATS;
         e->method = METHOD_PRODUCE;
-        send_event(ctx, e, k.pid, k.fd);
+        send_event(ctx, e, cid, conn);
         return 0;
     } else if (is_cassandra_request(payload, size, &k.stream_id)) {
         req->protocol = PROTOCOL_CASSANDRA;
@@ -316,7 +319,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         e->duration = bpf_ktime_get_ns();
         e->payload_size = size;
         COPY_PAYLOAD(e->payload, size, payload);
-        send_event(ctx, e, k.pid, k.fd);
+        send_event(ctx, e, cid, conn);
         return 0;
     } else if (is_dubbo2_request(payload, size)) {
         req->protocol = PROTOCOL_DUBBO2;
@@ -336,7 +339,16 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
 }
 
 static inline __attribute__((__always_inline__))
-int trace_enter_read(__u64 id, __u64 fd, char *buf, __u64 *ret, __u64 iovlen) {
+int trace_enter_read(__u64 id, __u32 pid, __u64 fd, char *buf, __u64 *ret, __u64 iovlen) {
+    struct connection_id cid = {};
+    cid.pid = pid;
+    cid.fd = fd;
+
+    struct connection *conn = bpf_map_lookup_elem(&active_connections, &cid);
+    if (!conn) {
+        return 0;
+    }
+
     struct read_args args = {};
     args.fd = fd;
     args.buf = buf;
@@ -352,10 +364,17 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
     if (!args) {
         return 0;
     }
-
+    struct connection_id cid = {};
+    cid.pid = pid;
+    cid.fd = args->fd;
+    struct connection *conn = bpf_map_lookup_elem(&active_connections, &cid);
+    if (!conn) {
+        bpf_map_delete_elem(&active_reads, &id);
+        return 0;
+    }
     struct l7_request_key k = {};
-    k.pid = pid;
-    k.fd = args->fd;
+    k.pid = cid.pid;
+    k.fd = cid.fd;
     k.is_tls = is_tls;
     k.stream_id = -1;
 
@@ -372,7 +391,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
             return 0;
         }
     }
-
+    __u64 total_size = ret;
     int zero = 0;
     char* payload = args->buf;
     if (args->iovlen) {
@@ -380,12 +399,17 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
         if (!payload) {
             return 0;
         }
-        ret = read_iovec(args->buf, args->iovlen, ret, payload);
+        total_size = 0;
+        ret = read_iovec(args->buf, args->iovlen, ret, payload, &total_size);
         if (!ret) {
             return 0;
         }
     }
 
+    if (!is_tls) {
+        __sync_fetch_and_add(&conn->bytes_received, total_size);
+    }
+
     struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
     if (!e) {
         return 0;
@@ -399,13 +423,13 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
     if (is_rabbitmq_consume(payload, ret)) {
         e->protocol = PROTOCOL_RABBITMQ;
         e->method = METHOD_CONSUME;
-        send_event(ctx, e, k.pid, k.fd);
+        send_event(ctx, e, cid, conn);
         return 0;
     }
     if (nats_method(payload, ret) == METHOD_CONSUME) {
         e->protocol = PROTOCOL_NATS;
         e->method = METHOD_CONSUME;
-        send_event(ctx, e, k.pid, k.fd);
+        send_event(ctx, e, cid, conn);
         return 0;
     }
 
@@ -421,7 +445,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
             e->duration = bpf_ktime_get_ns() - req->ns;
             e->payload_size = ret;
             COPY_PAYLOAD(e->payload, ret, payload);
-            send_event(ctx, e, k.pid, k.fd);
+            send_event(ctx, e, cid, conn);
             bpf_map_delete_elem(&active_l7_requests, &k);
             return 0;
         } else if (is_cassandra_response(payload, ret, &k.stream_id, &e->status)) {
@@ -436,7 +460,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
             e->duration = bpf_ktime_get_ns();
             e->payload_size = ret;
             COPY_PAYLOAD(e->payload, ret, payload);
-            send_event(ctx, e, k.pid, k.fd);
+            send_event(ctx, e, cid, conn);
             return 0;
         } else {
             return 0;
@@ -489,7 +513,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
         return 0;
     }
     e->duration = bpf_ktime_get_ns() - req->ns;
-    send_event(ctx, e, k.pid, k.fd);
+    send_event(ctx, e, cid, conn);
     return 0;
 }
 
@@ -543,13 +567,15 @@ int sys_enter_sendto(struct trace_event_raw_sys_enter_rw__stub* ctx) {
 SEC("tracepoint/syscalls/sys_enter_read")
 int sys_enter_read(struct trace_event_raw_sys_enter_rw__stub* ctx) {
     __u64 id = bpf_get_current_pid_tgid();
-    return trace_enter_read(id, ctx->fd, ctx->buf, 0, 0);
+    __u32 pid = id >> 32;
+    return trace_enter_read(id, pid, ctx->fd, ctx->buf, 0, 0);
 }
 
 SEC("tracepoint/syscalls/sys_enter_readv")
 int sys_enter_readv(struct trace_event_raw_sys_enter_rw__stub* ctx) {
     __u64 id = bpf_get_current_pid_tgid();
-    return trace_enter_read(id, ctx->fd, ctx->buf, 0, ctx->size);
+    __u32 pid = id >> 32;
+    return trace_enter_read(id, pid, ctx->fd, ctx->buf, 0, ctx->size);
 }
 
 SEC("tracepoint/syscalls/sys_enter_recvmsg")
@@ -559,13 +585,15 @@ int sys_enter_recvmsg(struct trace_event_raw_sys_enter_rw__stub* ctx) {
     if (bpf_probe_read(&msghdr, sizeof(msghdr), (void *)ctx->buf)) {
         return 0;
     }
-    return trace_enter_read(id, ctx->fd, (char*)msghdr.msg_iov, 0, msghdr.msg_iovlen);
+    __u32 pid = id >> 32;
+    return trace_enter_read(id, pid, ctx->fd, (char*)msghdr.msg_iov, 0, msghdr.msg_iovlen);
 }
 
 SEC("tracepoint/syscalls/sys_enter_recvfrom")
 int sys_enter_recvfrom(struct trace_event_raw_sys_enter_rw__stub* ctx) {
     __u64 id = bpf_get_current_pid_tgid();
-    return trace_enter_read(id, ctx->fd, ctx->buf, 0, 0);
+    __u32 pid = id >> 32;
+    return trace_enter_read(id, pid, ctx->fd, ctx->buf, 0, 0);
 }
 
 SEC("tracepoint/syscalls/sys_exit_read")

+ 17 - 15
ebpftracer/ebpf/l7/openssl.c

@@ -69,23 +69,25 @@ struct ssl_st {
     return trace_enter_write(ctx, fd, 1, buf_ptr, buf_size, 0); \
 })
 
-#define READ_ENTER(ctx, bio_t)                      \
-({                                                  \
-    __u32 fd = GET_FD(ctx, bio_t, rbio);            \
-    char* buf_ptr = (char*)PT_REGS_PARM2(ctx);      \
-    __u64 pid_tgid = bpf_get_current_pid_tgid();    \
-    __u64 id = pid_tgid | IS_TLS_READ_ID;           \
-    return trace_enter_read(id, fd, buf_ptr, 0, 0); \
+#define READ_ENTER(ctx, bio_t)                           \
+({                                                       \
+    __u32 fd = GET_FD(ctx, bio_t, rbio);                 \
+    char* buf_ptr = (char*)PT_REGS_PARM2(ctx);           \
+    __u64 pid_tgid = bpf_get_current_pid_tgid();         \
+    __u32 pid = pid_tgid >> 32;                          \
+    __u64 id = pid_tgid | IS_TLS_READ_ID;                \
+    return trace_enter_read(id, pid, fd, buf_ptr, 0, 0); \
 })
 
-#define READ_EX_ENTER(ctx, bio_t)                           \
-({                                                          \
-    __u32 fd = GET_FD(ctx, bio_t, rbio);                    \
-    char* buf_ptr = (char*)PT_REGS_PARM2(ctx);              \
-    __u64 pid_tgid = bpf_get_current_pid_tgid();            \
-    __u64 id = pid_tgid | IS_TLS_READ_ID;                   \
-    __u64* ret_ptr = (__u64*)PT_REGS_PARM4(ctx);            \
-    return trace_enter_read(id, fd, buf_ptr, ret_ptr, 0);   \
+#define READ_EX_ENTER(ctx, bio_t)                              \
+({                                                             \
+    __u32 fd = GET_FD(ctx, bio_t, rbio);                       \
+    char* buf_ptr = (char*)PT_REGS_PARM2(ctx);                 \
+    __u64 pid_tgid = bpf_get_current_pid_tgid();               \
+    __u64 id = pid_tgid | IS_TLS_READ_ID;                      \
+    __u32 pid = pid_tgid >> 32;                                \
+    __u64* ret_ptr = (__u64*)PT_REGS_PARM4(ctx);               \
+    return trace_enter_read(id, pid, fd, buf_ptr, ret_ptr, 0); \
 })
 
 SEC("uprobe/openssl_SSL_write_enter")

+ 58 - 39
ebpftracer/ebpf/tcp/state.c

@@ -1,4 +1,5 @@
 #define IPPROTO_TCP 6
+#define MAX_CONNECTIONS 1000000
 
 struct tcp_event {
     __u64 fd;
@@ -6,6 +7,8 @@ struct tcp_event {
     __u64 duration;
     __u32 type;
     __u32 pid;
+    __u64 bytes_sent;
+    __u64 bytes_received;
     __u16 sport;
     __u16 dport;
     __u8 saddr[16];
@@ -50,30 +53,31 @@ struct {
     __uint(max_entries, 10240);
 } fd_by_pid_tgid SEC(".maps");
 
-struct sk_info {
+struct connection_id {
     __u64 fd;
     __u32 pid;
 };
 
-struct conn_info {
-    __u64 fd;
-    __u64 ts;
-    __u32 pid;
-};
-
 struct {
-    __uint(type, BPF_MAP_TYPE_HASH);
+    __uint(type, BPF_MAP_TYPE_LRU_HASH);
     __uint(key_size, sizeof(void *));
-    __uint(value_size, sizeof(struct conn_info));
-    __uint(max_entries, 10240);
-} conn_info SEC(".maps");
+    __uint(value_size, sizeof(struct connection_id));
+    __uint(max_entries, MAX_CONNECTIONS);
+} connection_id_by_socket SEC(".maps");
+
+struct connection {
+    __u64 timestamp;
+    __u64 bytes_sent;
+    __u64 bytes_received;
+};
 
 struct {
     __uint(type, BPF_MAP_TYPE_LRU_HASH);
-    __uint(key_size, sizeof(struct sk_info));
-    __uint(value_size, sizeof(__u64));
-    __uint(max_entries, 32768);
-} connection_timestamps SEC(".maps");
+    __uint(key_size, sizeof(struct connection_id));
+    __uint(value_size, sizeof(struct connection));
+    __uint(max_entries, MAX_CONNECTIONS);
+} active_connections SEC(".maps");
+
 
 SEC("tracepoint/sock/inet_sock_set_state")
 int inet_sock_set_state(void *ctx)
@@ -94,12 +98,16 @@ int inet_sock_set_state(void *ctx)
         if (!fdp) {
             return 0;
         }
-        struct conn_info i = {};
-        i.pid = pid;
-        i.ts = bpf_ktime_get_ns();
-        i.fd = *fdp;
+        struct connection_id cid = {};
+        cid.pid = pid;
+        cid.fd = *fdp;
+
+        struct connection conn = {};
+        conn.timestamp = bpf_ktime_get_ns();
+
         bpf_map_delete_elem(&fd_by_pid_tgid, &id);
-        bpf_map_update_elem(&conn_info, &args.skaddr, &i, BPF_ANY);
+        bpf_map_update_elem(&connection_id_by_socket, &args.skaddr, &cid, BPF_ANY);
+        bpf_map_update_elem(&active_connections, &cid, &conn, BPF_ANY);
         return 0;
     }
 
@@ -108,28 +116,42 @@ int inet_sock_set_state(void *ctx)
     __u64 timestamp = 0;
     __u64 duration = 0;
     void *map = &tcp_connect_events;
+
+    struct tcp_event e = {};
+
     if (args.oldstate == BPF_TCP_SYN_SENT) {
-        struct conn_info *i = bpf_map_lookup_elem(&conn_info, &args.skaddr);
-        if (!i) {
+        struct connection_id *cid = bpf_map_lookup_elem(&connection_id_by_socket, &args.skaddr);
+        if (!cid) {
+            return 0;
+        }
+        struct connection *conn = bpf_map_lookup_elem(&active_connections, cid);
+        if (!conn) {
             return 0;
         }
         if (args.newstate == BPF_TCP_ESTABLISHED) {
-            timestamp = bpf_ktime_get_ns();
-            struct sk_info k = {};
-            k.pid = i->pid;
-            k.fd = i->fd;
-            bpf_map_update_elem(&connection_timestamps, &k, &timestamp, BPF_ANY);
+            timestamp = conn->timestamp;
             type = EVENT_TYPE_CONNECTION_OPEN;
         } else if (args.newstate == BPF_TCP_CLOSE) {
+            bpf_map_delete_elem(&active_connections, cid);
             type = EVENT_TYPE_CONNECTION_ERROR;
         }
-        duration = bpf_ktime_get_ns() - i->ts;
-        pid = i->pid;
-        fd = i->fd;
-        bpf_map_delete_elem(&conn_info, &args.skaddr);
+        duration = bpf_ktime_get_ns() - conn->timestamp;
+        pid = cid->pid;
+        fd = cid->fd;
     }
     if (args.oldstate == BPF_TCP_ESTABLISHED && (args.newstate == BPF_TCP_FIN_WAIT1 || args.newstate == BPF_TCP_CLOSE_WAIT)) {
-        pid = 0;
+        struct connection_id *cid = bpf_map_lookup_elem(&connection_id_by_socket, &args.skaddr);
+        if (cid) {
+            pid = cid->pid;
+            fd = cid->fd;
+            struct connection *conn = bpf_map_lookup_elem(&active_connections, cid);
+            if (conn) {
+                e.bytes_sent = conn->bytes_sent;
+                e.bytes_received = conn->bytes_received;
+                bpf_map_delete_elem(&active_connections, cid);
+            }
+            bpf_map_delete_elem(&connection_id_by_socket, &args.skaddr);
+        }
         type = EVENT_TYPE_CONNECTION_CLOSE;
     }
     if (args.oldstate == BPF_TCP_CLOSE && args.newstate == BPF_TCP_LISTEN) {
@@ -144,8 +166,6 @@ int inet_sock_set_state(void *ctx)
     if (type == 0) {
         return 0;
     }
-
-    struct tcp_event e = {};
     e.type = type;
     e.duration = duration;
     e.timestamp = timestamp;
@@ -191,11 +211,10 @@ int trace_exit_accept(struct trace_event_raw_sys_exit__stub* ctx) {
         return 0;
     }
     __u64 id = bpf_get_current_pid_tgid();
-    struct sk_info k = {};
-    k.pid = id >> 32;
-    k.fd = ctx->ret;
-    __u64 invalid_timestamp = 0;
-    bpf_map_update_elem(&connection_timestamps, &k, &invalid_timestamp, BPF_ANY);
+    struct connection_id cid = {};
+    cid.pid = id >> 32;
+    cid.fd = ctx->ret;
+    bpf_map_delete_elem(&active_connections, &cid);
     return 0;
 }
 

+ 69 - 23
ebpftracer/tracer.go

@@ -45,16 +45,22 @@ const (
 	EventReasonOOMKill EventReason = 1
 )
 
+type TrafficStats struct {
+	BytesSent     uint64
+	BytesReceived uint64
+}
+
 type Event struct {
-	Type      EventType
-	Reason    EventReason
-	Pid       uint32
-	SrcAddr   netaddr.IPPort
-	DstAddr   netaddr.IPPort
-	Fd        uint64
-	Timestamp uint64
-	Duration  time.Duration
-	L7Request *l7.RequestData
+	Type         EventType
+	Reason       EventReason
+	Pid          uint32
+	SrcAddr      netaddr.IPPort
+	DstAddr      netaddr.IPPort
+	Fd           uint64
+	Timestamp    uint64
+	Duration     time.Duration
+	L7Request    *l7.RequestData
+	TrafficStats *TrafficStats
 }
 
 type perfMapType uint8
@@ -134,6 +140,8 @@ func (t *Tracer) init(ch chan<- Event) error {
 		}
 	}
 
+	ebpfConnectionsMap := t.collection.Maps["active_connections"]
+	timestamp := uint64(time.Now().UnixNano())
 	for _, s := range sockets {
 		typ := EventTypeConnectionOpen
 		if s.Listen {
@@ -142,16 +150,46 @@ func (t *Tracer) init(ch chan<- Event) error {
 			continue
 		}
 		ch <- Event{
-			Type:    typ,
-			Pid:     s.pid,
-			Fd:      s.fd,
-			SrcAddr: s.SAddr,
-			DstAddr: s.DAddr,
+			Type:      typ,
+			Pid:       s.pid,
+			Timestamp: timestamp,
+			Fd:        s.fd,
+			SrcAddr:   s.SAddr,
+			DstAddr:   s.DAddr,
+		}
+		if typ == EventTypeConnectionOpen {
+			id := ConnectionId{FD: s.fd, PID: s.pid}
+			conn := Connection{Timestamp: timestamp}
+			if err := ebpfConnectionsMap.Update(id, conn, ebpf.UpdateNoExist); err != nil {
+				klog.Warningln(err)
+			}
 		}
 	}
 	return nil
 }
 
+func (t *Tracer) GetAndDeleteTCPConnection(pid uint32, fd uint64) (*Connection, error) {
+	id := ConnectionId{FD: fd, PID: pid}
+	conn := &Connection{}
+	return conn, t.collection.Maps["active_connections"].LookupAndDelete(id, conn)
+}
+
+func (t *Tracer) ActiveConnectionsIterator() *ebpf.MapIterator {
+	return t.collection.Maps["active_connections"].Iterate()
+}
+
+type ConnectionId struct {
+	FD  uint64
+	PID uint32
+	_   uint32
+}
+
+type Connection struct {
+	Timestamp     uint64
+	BytesSent     uint64
+	BytesReceived uint64
+}
+
 type perfMap struct {
 	name                  string
 	perCPUBufferSizePages int
@@ -297,15 +335,17 @@ type procEvent struct {
 }
 
 type tcpEvent struct {
-	Fd        uint64
-	Timestamp uint64
-	Duration  uint64
-	Type      EventType
-	Pid       uint32
-	SPort     uint16
-	DPort     uint16
-	SAddr     [16]byte
-	DAddr     [16]byte
+	Fd            uint64
+	Timestamp     uint64
+	Duration      uint64
+	Type          EventType
+	Pid           uint32
+	BytesSent     uint64
+	BytesReceived uint64
+	SPort         uint16
+	DPort         uint16
+	SAddr         [16]byte
+	DAddr         [16]byte
 }
 
 type fileEvent struct {
@@ -401,6 +441,12 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				Timestamp: v.Timestamp,
 				Duration:  time.Duration(v.Duration),
 			}
+			if v.Type == EventTypeConnectionClose {
+				event.TrafficStats = &TrafficStats{
+					BytesSent:     v.BytesSent,
+					BytesReceived: v.BytesReceived,
+				}
+			}
 		case perfMapTypePythonThreadEvents:
 			v := &pythonThreadEvent{}
 			if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {

Some files were not shown because too many files changed in this diff