Răsfoiți Sursa

discovering the FD of every outbound TCP connection

Nikolay Sivko 3 ani în urmă
părinte
comite
b49e97643e

+ 21 - 11
containers/container.go

@@ -57,6 +57,12 @@ type AddrPair struct {
 	dst netaddr.IPPort
 }
 
+type ActiveConnection struct {
+	ActualDest netaddr.IPPort
+	Pid        uint32
+	Fd         uint64
+}
+
 type Container struct {
 	cgroup   *cgroup.Cgroup
 	metadata *ContainerMetadata
@@ -76,8 +82,8 @@ type Container struct {
 	connectsSuccessful map[AddrPair]int             // dst:actual_dst -> count
 	connectsFailed     map[netaddr.IPPort]int       // dst -> count
 	connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
-	connectionsActive  map[AddrPair]netaddr.IPPort  // src:dst -> actual_dst
-	retransmits        map[AddrPair]int             // dst:actual_dst -> count
+	connectionsActive  map[AddrPair]ActiveConnection
+	retransmits        map[AddrPair]int // dst:actual_dst -> count
 
 	oomKills int
 
@@ -104,7 +110,7 @@ func NewContainer(cg *cgroup.Cgroup, md *ContainerMetadata) *Container {
 		connectsSuccessful: map[AddrPair]int{},
 		connectsFailed:     map[netaddr.IPPort]int{},
 		connectLastAttempt: map[netaddr.IPPort]time.Time{},
-		connectionsActive:  map[AddrPair]netaddr.IPPort{},
+		connectionsActive:  map[AddrPair]ActiveConnection{},
 		retransmits:        map[AddrPair]int{},
 
 		mountIds: map[string]struct{}{},
@@ -242,8 +248,8 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 	}
 
 	connections := map[AddrPair]int{}
-	for c, actualDst := range c.connectionsActive {
-		connections[AddrPair{src: c.dst, dst: actualDst}]++
+	for c, conn := range c.connectionsActive {
+		connections[AddrPair{src: c.dst, dst: conn.ActualDest}]++
 	}
 	for d, count := range connections {
 		ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
@@ -316,7 +322,7 @@ func (c *Container) onProcessExit(pid uint32, oomKill bool) {
 	}
 }
 
-func (c *Container) onFileOpen(pid uint32, fd uint32) {
+func (c *Container) onFileOpen(pid uint32, fd uint64) {
 	mntId, logPath := resolveFd(pid, fd)
 	c.lock.Lock()
 	defer c.lock.Unlock()
@@ -349,7 +355,7 @@ func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
 	}
 }
 
-func (c *Container) onConnectionOpen(pid uint32, src, dst netaddr.IPPort, failed bool) {
+func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, failed bool) {
 	if dst.IP().IsLoopback() {
 		netNs, err := proc.GetNetNs(pid)
 		isHostNs := err == nil && hostNetNsId == netNs.UniqueId()
@@ -376,7 +382,11 @@ func (c *Container) onConnectionOpen(pid uint32, src, dst netaddr.IPPort, failed
 	} else {
 		actualDst := ConntrackGetActualDestination(src, dst)
 		c.connectsSuccessful[AddrPair{src: dst, dst: actualDst}]++
-		c.connectionsActive[AddrPair{src: src, dst: dst}] = actualDst
+		c.connectionsActive[AddrPair{src: src, dst: dst}] = ActiveConnection{
+			ActualDest: actualDst,
+			Pid:        pid,
+			Fd:         fd,
+		}
 	}
 	c.connectLastAttempt[dst] = time.Now()
 }
@@ -394,11 +404,11 @@ func (c *Container) onConnectionClose(srcDst AddrPair) bool {
 func (c *Container) onRetransmit(srcDst AddrPair) bool {
 	c.lock.Lock()
 	defer c.lock.Unlock()
-	actualDst, ok := c.connectionsActive[srcDst]
+	conn, ok := c.connectionsActive[srcDst]
 	if !ok {
 		return false
 	}
-	c.retransmits[AddrPair{src: srcDst.dst, dst: actualDst}]++
+	c.retransmits[AddrPair{src: srcDst.dst, dst: conn.ActualDest}]++
 	return true
 }
 
@@ -738,7 +748,7 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I
 	}
 }
 
-func resolveFd(pid uint32, fd uint32) (mntId string, logPath string) {
+func resolveFd(pid uint32, fd uint64) (mntId string, logPath string) {
 	info := proc.GetFdInfo(pid, fd)
 	if info == nil {
 		return

+ 2 - 2
containers/registry.go

@@ -177,13 +177,13 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 
 			case ebpftracer.EventTypeConnectionOpen:
 				if c := r.getOrCreateContainer(e.Pid); c != nil {
-					c.onConnectionOpen(e.Pid, e.SrcAddr, e.DstAddr, false)
+					c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, false)
 				} else {
 					klog.Infoln("TCP connection from unknown container", e)
 				}
 			case ebpftracer.EventTypeConnectionError:
 				if c := r.getOrCreateContainer(e.Pid); c != nil {
-					c.onConnectionOpen(e.Pid, e.SrcAddr, e.DstAddr, true)
+					c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, true)
 				} else {
 					klog.Infoln("TCP connection error from unknown container", e)
 				}

Fișier diff suprimat deoarece este prea mare
+ 0 - 1
ebpftracer/ebpf.go


+ 1 - 1
ebpftracer/ebpf/file.c

@@ -3,7 +3,7 @@
 struct file_event {
 	__u32 type;
 	__u32 pid;
-	__u32 fd;
+	__u64 fd;
 };
 
 struct {

+ 46 - 10
ebpftracer/ebpf/tcp_state.c

@@ -1,6 +1,7 @@
 #define IPPROTO_TCP 6
 
 struct tcp_event {
+	__u64 fd;
 	__u32 type;
 	__u32 pid;
 	__u16 sport;
@@ -40,9 +41,16 @@ struct trace_event_raw_inet_sock_set_state__stub {
 	__u8 daddr_v6[16];
 };
 
+struct {
+	__uint(type, BPF_MAP_TYPE_HASH);
+	__uint(key_size, sizeof(__u64));
+	__uint(value_size, sizeof(__u64));
+	__uint(max_entries, 10240);
+} fd_by_pid_tgid SEC(".maps");
+
 struct sk_info {
+	__u64 fd;
 	__u32 pid;
-//	__u64 ts;
 };
 struct {
 	__uint(type, BPF_MAP_TYPE_HASH);
@@ -61,14 +69,24 @@ int inet_sock_set_state(void *ctx)
 	if (args.protocol != IPPROTO_TCP) {
 		return 0;
 	}
+	__u64 id = bpf_get_current_pid_tgid();
+	__u32 pid = id >> 32;
+
 	if (args.oldstate == BPF_TCP_CLOSE && args.newstate == BPF_TCP_SYN_SENT) {
-		struct sk_info i = {};
-		i.pid = bpf_get_current_pid_tgid() >> 32;
+        __u64 *fdp = bpf_map_lookup_elem(&fd_by_pid_tgid, &id);
+
+        if (!fdp) {
+            return 0;
+        }
+        bpf_map_delete_elem(&fd_by_pid_tgid, &id);
+        struct sk_info i = {};
+        i.pid = pid;
+        i.fd = *fdp;
 		bpf_map_update_elem(&sk_info, &args.skaddr, &i, BPF_ANY);
 		return 0;
 	}
 
-	__u32 pid = bpf_get_current_pid_tgid() >> 32;
+    __u64 fd = 0;
 	__u32 type = 0;
 	void *map = &tcp_connect_events;
 	if (args.oldstate == BPF_TCP_SYN_SENT) {
@@ -84,6 +102,7 @@ int inet_sock_set_state(void *ctx)
 			return 0;
 		}
 		pid = i->pid;
+		fd = i->fd;
 		bpf_map_delete_elem(&sk_info, &args.skaddr);
 	}
 	if (args.oldstate == BPF_TCP_ESTABLISHED && (args.newstate == BPF_TCP_FIN_WAIT1 || args.newstate == BPF_TCP_CLOSE_WAIT)) {
@@ -103,12 +122,12 @@ int inet_sock_set_state(void *ctx)
 		return 0;
 	}
 
-	struct tcp_event e = {
-		.type = type,
-		.pid = pid,
-		.sport = args.sport,
-		.dport = args.dport,
-	};
+	struct tcp_event e = {};
+	e.type = type;
+	e.pid = pid;
+	e.sport = args.sport;
+	e.dport = args.dport;
+    e.fd = fd;
 	__builtin_memcpy(&e.saddr, &args.saddr_v6, sizeof(e.saddr));
 	__builtin_memcpy(&e.daddr, &args.daddr_v6, sizeof(e.saddr));
 
@@ -116,3 +135,20 @@ int inet_sock_set_state(void *ctx)
 
 	return 0;
 }
+
+struct trace_event_raw_sys_enter_connect__stub {
+	__u64 unused;
+	long int id;
+	__u64 fd;
+};
+
+SEC("tracepoint/syscalls/sys_enter_connect")
+int sys_enter_connect(void *ctx) {
+    struct trace_event_raw_sys_enter_connect__stub args = {};
+    if (bpf_probe_read(&args, sizeof(args), ctx) < 0) {
+        return 0;
+    }
+    __u64 id = bpf_get_current_pid_tgid();
+    bpf_map_update_elem(&fd_by_pid_tgid, &id, &args.fd, BPF_ANY);
+}
+

+ 3 - 1
ebpftracer/init.go

@@ -8,11 +8,12 @@ import (
 
 type file struct {
 	pid uint32
-	fd  uint32
+	fd  uint64
 }
 
 type sock struct {
 	pid uint32
+	fd  uint64
 	proc.Sock
 }
 
@@ -46,6 +47,7 @@ func readFds(pids []uint32) (files []file, socks []sock) {
 			switch {
 			case fd.SocketInode != "":
 				if s, ok := sockets[fd.SocketInode]; ok {
+					s.fd = fd.Fd
 					s.pid = pid
 					socks = append(socks, s)
 				}

+ 6 - 3
ebpftracer/tracer.go

@@ -17,6 +17,7 @@ import (
 	"runtime"
 	"strconv"
 	"strings"
+	"time"
 )
 
 type EventType uint32
@@ -43,7 +44,7 @@ type Event struct {
 	Pid     uint32
 	SrcAddr netaddr.IPPort
 	DstAddr netaddr.IPPort
-	Fd      uint32
+	Fd      uint64
 }
 
 type Tracer struct {
@@ -104,6 +105,7 @@ func (t *Tracer) init(ch chan<- Event) error {
 		ch <- Event{
 			Type:    typ,
 			Pid:     s.pid,
+			Fd:      s.fd,
 			SrcAddr: s.SAddr,
 			DstAddr: s.DAddr,
 		}
@@ -229,6 +231,7 @@ func (e procEvent) Event() Event {
 }
 
 type tcpEvent struct {
+	Fd    uint64
 	Type  uint32
 	Pid   uint32
 	SPort uint16
@@ -238,13 +241,13 @@ type tcpEvent struct {
 }
 
 func (e tcpEvent) Event() Event {
-	return Event{Type: EventType(e.Type), Pid: e.Pid, SrcAddr: ipPort(e.SAddr, e.SPort), DstAddr: ipPort(e.DAddr, e.DPort)}
+	return Event{Type: EventType(e.Type), Pid: e.Pid, SrcAddr: ipPort(e.SAddr, e.SPort), DstAddr: ipPort(e.DAddr, e.DPort), Fd: e.Fd}
 }
 
 type fileEvent struct {
 	Type uint32
 	Pid  uint32
-	Fd   uint32
+	Fd   uint64
 }
 
 func (e fileEvent) Event() Event {

+ 5 - 5
proc/fd.go

@@ -8,7 +8,7 @@ import (
 )
 
 type Fd struct {
-	Fd   uint32
+	Fd   uint64
 	Dest string
 
 	SocketInode string
@@ -22,7 +22,7 @@ func ReadFds(pid uint32) ([]Fd, error) {
 	}
 	res := make([]Fd, 0, len(entries))
 	for _, entry := range entries {
-		fd, err := strconv.Atoi(entry.Name())
+		fd, err := strconv.ParseUint(entry.Name(), 10, 64)
 		if err != nil {
 			continue
 		}
@@ -34,7 +34,7 @@ func ReadFds(pid uint32) ([]Fd, error) {
 		if strings.HasPrefix(dest, "socket:[") && strings.HasSuffix(dest, "]") {
 			socketInode = dest[len("socket:[") : len(dest)-1]
 		}
-		res = append(res, Fd{Fd: uint32(fd), Dest: dest, SocketInode: socketInode})
+		res = append(res, Fd{Fd: fd, Dest: dest, SocketInode: socketInode})
 	}
 	return res, nil
 }
@@ -45,8 +45,8 @@ type FdInfo struct {
 	Dest  string
 }
 
-func GetFdInfo(pid uint32, fd uint32) *FdInfo {
-	fds := strconv.Itoa(int(fd))
+func GetFdInfo(pid uint32, fd uint64) *FdInfo {
+	fds := strconv.FormatUint(fd, 10)
 	data, err := os.ReadFile(Path(pid, "fdinfo", fds))
 	if err != nil {
 		return nil

Unele fișiere nu au fost afișate deoarece prea multe fișiere au fost modificate în acest diff