Przeglądaj źródła

Merge pull request #20 from coroot/containers_with_multiple_net_namespaces

add support for containers with multiple network namespaces
Nikolay Sivko 3 lat temu
rodzic
commit
02a3aebd8e
1 zmienionych plików z 85 dodań i 49 usunięć
  1. 85 49
      containers/container.go

+ 85 - 49
containers/container.go

@@ -80,12 +80,27 @@ type L7Stats struct {
 	Latency  prometheus.Histogram
 	Latency  prometheus.Histogram
 }
 }
 
 
+type ListenDetails struct {
+	ClosedAt time.Time
+	NsIPs    []netaddr.IP
+}
+
+type Process struct {
+	Pid       uint32
+	StartedAt time.Time
+	NetNsId   string
+}
+
+func (p *Process) isHostNs() bool {
+	return p.NetNsId == hostNetNsId
+}
+
 type Container struct {
 type Container struct {
 	id       ContainerID
 	id       ContainerID
 	cgroup   *cgroup.Cgroup
 	cgroup   *cgroup.Cgroup
 	metadata *ContainerMetadata
 	metadata *ContainerMetadata
 
 
-	pids map[uint32]time.Time // pid -> start time
+	processes map[uint32]*Process
 
 
 	startedAt time.Time
 	startedAt time.Time
 	zombieAt  time.Time
 	zombieAt  time.Time
@@ -95,7 +110,7 @@ type Container struct {
 	delaysByPid map[uint32]Delays
 	delaysByPid map[uint32]Delays
 	delaysLock  sync.Mutex
 	delaysLock  sync.Mutex
 
 
-	listens map[netaddr.IPPort]map[uint32]time.Time // listen addr -> pid -> close time
+	listens map[netaddr.IPPort]map[uint32]*ListenDetails
 
 
 	connectsSuccessful map[AddrPair]int64           // dst:actual_dst -> count
 	connectsSuccessful map[AddrPair]int64           // dst:actual_dst -> count
 	connectsFailed     map[netaddr.IPPort]int64     // dst -> count
 	connectsFailed     map[netaddr.IPPort]int64     // dst -> count
@@ -109,11 +124,8 @@ type Container struct {
 
 
 	mounts map[string]proc.MountInfo
 	mounts map[string]proc.MountInfo
 
 
-	nsIPs []netaddr.IP
-
 	logParsers map[string]*LogParser
 	logParsers map[string]*LogParser
 
 
-	isHostNs      bool
 	hostConntrack *Conntrack
 	hostConntrack *Conntrack
 	nsConntrack   *Conntrack
 	nsConntrack   *Conntrack
 	lbConntracks  []*Conntrack
 	lbConntracks  []*Conntrack
@@ -134,11 +146,11 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
 		cgroup:   cg,
 		cgroup:   cg,
 		metadata: md,
 		metadata: md,
 
 
-		pids: map[uint32]time.Time{},
+		processes: map[uint32]*Process{},
 
 
 		delaysByPid: map[uint32]Delays{},
 		delaysByPid: map[uint32]Delays{},
 
 
-		listens: map[netaddr.IPPort]map[uint32]time.Time{},
+		listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
 
 
 		connectsSuccessful: map[AddrPair]int64{},
 		connectsSuccessful: map[AddrPair]int64{},
 		connectsFailed:     map[netaddr.IPPort]int64{},
 		connectsFailed:     map[netaddr.IPPort]int64{},
@@ -151,7 +163,6 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
 
 
 		logParsers: map[string]*LogParser{},
 		logParsers: map[string]*LogParser{},
 
 
-		isHostNs:      hostNetNsId == netNs.UniqueId(),
 		hostConntrack: hostConntrack,
 		hostConntrack: hostConntrack,
 
 
 		done: make(chan struct{}),
 		done: make(chan struct{}),
@@ -304,7 +315,7 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 
 
 	appTypes := map[string]struct{}{}
 	appTypes := map[string]struct{}{}
 	seenJvms := map[string]bool{}
 	seenJvms := map[string]bool{}
-	for pid := range c.pids {
+	for pid := range c.processes {
 		cmdline := proc.GetCmdline(pid)
 		cmdline := proc.GetCmdline(pid)
 		if len(cmdline) == 0 {
 		if len(cmdline) == 0 {
 			continue
 			continue
@@ -352,15 +363,21 @@ func (c *Container) onProcessStart(pid uint32) {
 	if err != nil {
 	if err != nil {
 		return
 		return
 	}
 	}
+	ns, err := proc.GetNetNs(pid)
+	if err != nil {
+		return
+	}
+	defer ns.Close()
 	c.zombieAt = time.Time{}
 	c.zombieAt = time.Time{}
-	c.pids[pid] = stats.BeginTime
+	c.processes[pid] = &Process{Pid: pid, StartedAt: stats.BeginTime, NetNsId: ns.UniqueId()}
+
 	if c.startedAt.IsZero() {
 	if c.startedAt.IsZero() {
 		c.startedAt = stats.BeginTime
 		c.startedAt = stats.BeginTime
 	} else {
 	} else {
 		min := stats.BeginTime
 		min := stats.BeginTime
-		for _, t := range c.pids {
-			if t.Before(min) {
-				min = t
+		for _, p := range c.processes {
+			if p.StartedAt.Before(min) {
+				min = p.StartedAt
 			}
 			}
 		}
 		}
 		if min.After(c.startedAt) {
 		if min.After(c.startedAt) {
@@ -373,8 +390,8 @@ func (c *Container) onProcessStart(pid uint32) {
 func (c *Container) onProcessExit(pid uint32, oomKill bool) {
 func (c *Container) onProcessExit(pid uint32, oomKill bool) {
 	c.lock.Lock()
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	defer c.lock.Unlock()
-	delete(c.pids, pid)
-	if len(c.pids) == 0 {
+	delete(c.processes, pid)
+	if len(c.processes) == 0 {
 		c.zombieAt = time.Now()
 		c.zombieAt = time.Now()
 	}
 	}
 	delete(c.delaysByPid, pid)
 	delete(c.delaysByPid, pid)
@@ -418,9 +435,10 @@ func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
 		defer c.lock.Unlock()
 		defer c.lock.Unlock()
 	}
 	}
 	if _, ok := c.listens[addr]; !ok {
 	if _, ok := c.listens[addr]; !ok {
-		c.listens[addr] = map[uint32]time.Time{}
+		c.listens[addr] = map[uint32]*ListenDetails{}
 	}
 	}
-	c.listens[addr][pid] = time.Time{}
+	details := &ListenDetails{}
+	c.listens[addr][pid] = details
 
 
 	if addr.IP().IsUnspecified() {
 	if addr.IP().IsUnspecified() {
 		ns, err := proc.GetNetNs(pid)
 		ns, err := proc.GetNetNs(pid)
@@ -434,7 +452,7 @@ func (c *Container) onListenOpen(pid uint32, addr netaddr.IPPort, safe bool) {
 		if ips, err := proc.GetNsIps(ns); err != nil {
 		if ips, err := proc.GetNsIps(ns); err != nil {
 			klog.Warningln(err)
 			klog.Warningln(err)
 		} else {
 		} else {
-			c.nsIPs = ips
+			details.NsIPs = ips
 		}
 		}
 	}
 	}
 }
 }
@@ -444,13 +462,19 @@ func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
 	defer c.lock.Unlock()
 	defer c.lock.Unlock()
 	if _, byAddr := c.listens[addr]; byAddr {
 	if _, byAddr := c.listens[addr]; byAddr {
 		if _, byPid := c.listens[addr][pid]; byPid {
 		if _, byPid := c.listens[addr][pid]; byPid {
-			c.listens[addr][pid] = time.Now()
+			if details := c.listens[addr][pid]; details != nil {
+				details.ClosedAt = time.Now()
+			}
 		}
 		}
 	}
 	}
 }
 }
 
 
 func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool) {
 func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool) {
-	if dst.IP().IsLoopback() && !c.isHostNs {
+	p := c.processes[pid]
+	if p == nil {
+		return
+	}
+	if dst.IP().IsLoopback() && !p.isHostNs() {
 		return
 		return
 	}
 	}
 	whitelisted := false
 	whitelisted := false
@@ -468,7 +492,7 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
 	if failed {
 	if failed {
 		c.connectsFailed[dst]++
 		c.connectsFailed[dst]++
 	} else {
 	} else {
-		actualDst, err := c.getActualDestination(pid, src, dst)
+		actualDst, err := c.getActualDestination(p, src, dst)
 		if err != nil {
 		if err != nil {
 			if !common.IsNotExist(err) {
 			if !common.IsNotExist(err) {
 				klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
 				klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
@@ -478,7 +502,7 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
 		switch {
 		switch {
 		case actualDst == nil:
 		case actualDst == nil:
 			actualDst = &dst
 			actualDst = &dst
-		case actualDst.IP().IsLoopback() && !c.isHostNs:
+		case actualDst.IP().IsLoopback() && !p.isHostNs():
 			return
 			return
 		}
 		}
 		c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
 		c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
@@ -493,7 +517,7 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
 	c.connectLastAttempt[dst] = time.Now()
 	c.connectLastAttempt[dst] = time.Now()
 }
 }
 
 
-func (c *Container) getActualDestination(pid uint32, src, dst netaddr.IPPort) (*netaddr.IPPort, error) {
+func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*netaddr.IPPort, error) {
 	if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil {
 	if actualDst := lookupCiliumConntrackTable(src, dst); actualDst != nil {
 		return actualDst, nil
 		return actualDst, nil
 	}
 	}
@@ -506,9 +530,9 @@ func (c *Container) getActualDestination(pid uint32, src, dst netaddr.IPPort) (*
 	if actualDst != nil {
 	if actualDst != nil {
 		return actualDst, nil
 		return actualDst, nil
 	}
 	}
-	if !c.isHostNs {
+	if !p.isHostNs() {
 		if c.nsConntrack == nil {
 		if c.nsConntrack == nil {
-			netNs, err := proc.GetNetNs(pid)
+			netNs, err := proc.GetNetNs(p.Pid)
 			if err != nil {
 			if err != nil {
 				return nil, err
 				return nil, err
 			}
 			}
@@ -608,7 +632,7 @@ func (c *Container) onRetransmit(srcDst AddrPair) bool {
 func (c *Container) updateDelays() {
 func (c *Container) updateDelays() {
 	c.delaysLock.Lock()
 	c.delaysLock.Lock()
 	defer c.delaysLock.Unlock()
 	defer c.delaysLock.Unlock()
-	for pid := range c.pids {
+	for pid := range c.processes {
 		stats, err := TaskstatsTGID(pid)
 		stats, err := TaskstatsTGID(pid)
 		if err != nil {
 		if err != nil {
 			continue
 			continue
@@ -629,7 +653,7 @@ func (c *Container) getMounts() map[string]map[string]*proc.FSStat {
 	res := map[string]map[string]*proc.FSStat{}
 	res := map[string]map[string]*proc.FSStat{}
 	for _, mi := range c.mounts {
 	for _, mi := range c.mounts {
 		var stat *proc.FSStat
 		var stat *proc.FSStat
-		for pid := range c.pids {
+		for pid := range c.processes {
 			s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
 			s, err := proc.StatFS(proc.Path(pid, "root", mi.MountPoint))
 			if err == nil {
 			if err == nil {
 				stat = &s
 				stat = &s
@@ -651,20 +675,28 @@ func (c *Container) getListens() map[netaddr.IPPort]int {
 	res := map[netaddr.IPPort]int{}
 	res := map[netaddr.IPPort]int{}
 	for addr, byPid := range c.listens {
 	for addr, byPid := range c.listens {
 		open := 0
 		open := 0
-		for _, closedAt := range byPid {
-			if closedAt.IsZero() {
+		isHostNs := false
+		ips := map[netaddr.IP]bool{}
+		for pid, details := range byPid {
+			p := c.processes[pid]
+			if p == nil {
+				continue
+			}
+			if p.isHostNs() {
+				isHostNs = true
+			}
+			if details.ClosedAt.IsZero() {
 				open = 1
 				open = 1
-				break
+			}
+			for _, ip := range details.NsIPs {
+				ips[ip] = true
 			}
 			}
 		}
 		}
-		var ips []netaddr.IP
-		if addr.IP().IsUnspecified() {
-			ips = c.nsIPs
-		} else {
-			ips = []netaddr.IP{addr.IP()}
+		if !addr.IP().IsUnspecified() {
+			ips = map[netaddr.IP]bool{addr.IP(): true}
 		}
 		}
-		for _, ip := range ips {
-			if ip.IsLoopback() && !c.isHostNs {
+		for ip := range ips {
+			if ip.IsLoopback() && !isHostNs {
 				continue
 				continue
 			}
 			}
 			res[netaddr.IPPortFrom(ip, addr.Port())] = open
 			res[netaddr.IPPortFrom(ip, addr.Port())] = open
@@ -723,7 +755,7 @@ func (c *Container) getProxiedListens() map[string]map[netaddr.IPPort]struct{} {
 
 
 func (c *Container) ping() map[netaddr.IP]float64 {
 func (c *Container) ping() map[netaddr.IP]float64 {
 	netNs := netns.None()
 	netNs := netns.None()
-	for pid := range c.pids {
+	for pid := range c.processes {
 		if pid == agentPid {
 		if pid == agentPid {
 			netNs = selfNetNs
 			netNs = selfNetNs
 			break
 			break
@@ -829,8 +861,12 @@ func (c *Container) gc(now time.Time) {
 	established := map[AddrPair]struct{}{}
 	established := map[AddrPair]struct{}{}
 	establishedDst := map[netaddr.IPPort]struct{}{}
 	establishedDst := map[netaddr.IPPort]struct{}{}
 	listens := map[netaddr.IPPort]string{}
 	listens := map[netaddr.IPPort]string{}
-	for pid := range c.pids {
-		sockets, err := proc.GetSockets(pid)
+	seenNamespaces := map[string]bool{}
+	for _, p := range c.processes {
+		if seenNamespaces[p.NetNsId] {
+			continue
+		}
+		sockets, err := proc.GetSockets(p.Pid)
 		if err != nil {
 		if err != nil {
 			continue
 			continue
 		}
 		}
@@ -842,7 +878,7 @@ func (c *Container) gc(now time.Time) {
 				establishedDst[s.DAddr] = struct{}{}
 				establishedDst[s.DAddr] = struct{}{}
 			}
 			}
 		}
 		}
-		break
+		seenNamespaces[p.NetNsId] = true
 	}
 	}
 
 
 	c.revalidateListens(now, listens)
 	c.revalidateListens(now, listens)
@@ -888,9 +924,9 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I
 			continue
 			continue
 		}
 		}
 		klog.Warningln("deleting the outdated listen:", addr)
 		klog.Warningln("deleting the outdated listen:", addr)
-		for pid, closedAt := range byPid {
-			if closedAt.IsZero() {
-				byPid[pid] = now
+		for _, details := range byPid {
+			if details.ClosedAt.IsZero() {
+				details.ClosedAt = now
 			}
 			}
 		}
 		}
 	}
 	}
@@ -903,8 +939,8 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I
 			continue
 			continue
 		}
 		}
 		open := false
 		open := false
-		for _, closedAt := range byPids {
-			if closedAt.IsZero() {
+		for _, details := range byPids {
+			if details.ClosedAt.IsZero() {
 				open = true
 				open = true
 				break
 				break
 			}
 			}
@@ -916,7 +952,7 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I
 
 
 	if len(missingListens) > 0 {
 	if len(missingListens) > 0 {
 		inodeToPid := map[string]uint32{}
 		inodeToPid := map[string]uint32{}
-		for pid := range c.pids {
+		for pid := range c.processes {
 			fds, err := proc.ReadFds(pid)
 			fds, err := proc.ReadFds(pid)
 			if err != nil {
 			if err != nil {
 				continue
 				continue
@@ -938,8 +974,8 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I
 	}
 	}
 
 
 	for addr, pids := range c.listens {
 	for addr, pids := range c.listens {
-		for pid, closedAt := range pids {
-			if !closedAt.IsZero() && now.Sub(closedAt) > gcInterval {
+		for pid, details := range pids {
+			if !details.ClosedAt.IsZero() && now.Sub(details.ClosedAt) > gcInterval {
 				delete(c.listens[addr], pid)
 				delete(c.listens[addr], pid)
 			}
 			}
 		}
 		}