Forráskód Böngészése

try to determine the actual connection destination using the conntrack table in the container net namespace

Nikolay Sivko 3 éve
szülő
commit
c96452b9d6
4 módosított fájl, 88 hozzáadás és 44 törlés
  1. 18 17
      containers/conntrack.go
  2. 55 22
      containers/container.go
  3. 14 4
      containers/registry.go
  4. 1 1
      logs/tail_reader_test.go

+ 18 - 17
containers/conntrack.go

@@ -3,29 +3,25 @@ package containers
 import (
 import (
 	"github.com/coroot/coroot-node-agent/common"
 	"github.com/coroot/coroot-node-agent/common"
 	"github.com/florianl/go-conntrack"
 	"github.com/florianl/go-conntrack"
+	"github.com/vishvananda/netns"
 	"inet.af/netaddr"
 	"inet.af/netaddr"
 	"k8s.io/klog/v2"
 	"k8s.io/klog/v2"
 	"syscall"
 	"syscall"
 )
 )
 
 
-var (
-	conntrackClient *conntrack.Nfct
-)
+type Conntrack struct {
+	client *conntrack.Nfct
+}
 
 
-func ConntrackInit() error {
-	c, err := conntrack.Open(&conntrack.Config{})
+func NewConntrack(netNs netns.NsHandle) (*Conntrack, error) {
+	c, err := conntrack.Open(&conntrack.Config{NetNS: int(netNs)})
 	if err != nil {
 	if err != nil {
-		return err
+		return nil, err
 	}
 	}
-	conntrackClient = c
-	return nil
+	return &Conntrack{client: c}, nil
 }
 }
 
 
-func ConntrackGetActualDestination(src, dst netaddr.IPPort) netaddr.IPPort {
-	if conntrackClient == nil {
-		return dst
-	}
-
+func (c *Conntrack) GetActualDestination(src, dst netaddr.IPPort) *netaddr.IPPort {
 	tcp := uint8(syscall.IPPROTO_TCP)
 	tcp := uint8(syscall.IPPROTO_TCP)
 	sip := src.IP().IPAddr().IP
 	sip := src.IP().IPAddr().IP
 	dip := dst.IP().IPAddr().IP
 	dip := dst.IP().IPAddr().IP
@@ -47,12 +43,12 @@ func ConntrackGetActualDestination(src, dst netaddr.IPPort) netaddr.IPPort {
 	if dst.IP().Is6() {
 	if dst.IP().Is6() {
 		family = conntrack.IPv6
 		family = conntrack.IPv6
 	}
 	}
-	sessions, err := conntrackClient.Get(conntrack.Conntrack, family, req)
+	sessions, err := c.client.Get(conntrack.Conntrack, family, req)
 	if err != nil {
 	if err != nil {
 		if !common.IsNotExist(err) {
 		if !common.IsNotExist(err) {
 			klog.Errorf("failed to resolve actual destination for %s->%s: %s", src, dst, err)
 			klog.Errorf("failed to resolve actual destination for %s->%s: %s", src, dst, err)
 		}
 		}
-		return dst
+		return nil
 	}
 	}
 	for _, s := range sessions {
 	for _, s := range sessions {
 		if !ipTupleValid(s.Origin) || !ipTupleValid(s.Reply) {
 		if !ipTupleValid(s.Origin) || !ipTupleValid(s.Reply) {
@@ -71,9 +67,14 @@ func ConntrackGetActualDestination(src, dst netaddr.IPPort) netaddr.IPPort {
 		if !ok {
 		if !ok {
 			continue
 			continue
 		}
 		}
-		return netaddr.IPPortFrom(ip, *reply.Proto.SrcPort)
+		res := netaddr.IPPortFrom(ip, *reply.Proto.SrcPort)
+		return &res
 	}
 	}
-	return dst
+	return nil
+}
+
+func (c *Conntrack) Close() error {
+	return c.client.Close()
 }
 }
 
 
 func ipTuplesEqual(a, b *conntrack.IPTuple) bool {
 func ipTuplesEqual(a, b *conntrack.IPTuple) bool {

+ 55 - 22
containers/container.go

@@ -102,12 +102,21 @@ type Container struct {
 
 
 	logParsers map[string]*LogParser
 	logParsers map[string]*LogParser
 
 
+	isHostNs      bool
+	hostConntrack *Conntrack
+	nsConntrack   *Conntrack
+
 	lock sync.RWMutex
 	lock sync.RWMutex
 
 
 	done chan struct{}
 	done chan struct{}
 }
 }
 
 
-func NewContainer(cg *cgroup.Cgroup, md *ContainerMetadata) *Container {
+func NewContainer(cg *cgroup.Cgroup, md *ContainerMetadata, hostConntrack *Conntrack, pid uint32) (*Container, error) {
+	netNs, err := proc.GetNetNs(pid)
+	if err != nil {
+		return nil, err
+	}
+	defer netNs.Close()
 	c := &Container{
 	c := &Container{
 		cgroup:   cg,
 		cgroup:   cg,
 		metadata: md,
 		metadata: md,
@@ -129,6 +138,9 @@ func NewContainer(cg *cgroup.Cgroup, md *ContainerMetadata) *Container {
 
 
 		logParsers: map[string]*LogParser{},
 		logParsers: map[string]*LogParser{},
 
 
+		isHostNs:      hostNetNsId == netNs.UniqueId(),
+		hostConntrack: hostConntrack,
+
 		done: make(chan struct{}),
 		done: make(chan struct{}),
 	}
 	}
 
 
@@ -147,13 +159,16 @@ func NewContainer(cg *cgroup.Cgroup, md *ContainerMetadata) *Container {
 		}
 		}
 	}()
 	}()
 
 
-	return c
+	return c, nil
 }
 }
 
 
 func (c *Container) Close() {
 func (c *Container) Close() {
 	for _, p := range c.logParsers {
 	for _, p := range c.logParsers {
 		p.Stop()
 		p.Stop()
 	}
 	}
+	if c.nsConntrack != nil {
+		_ = c.nsConntrack.Close()
+	}
 	close(c.done)
 	close(c.done)
 }
 }
 
 
@@ -384,34 +399,52 @@ func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
 }
 }
 
 
 func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool) {
 func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool) {
-	if dst.IP().IsLoopback() {
-		netNs, err := proc.GetNetNs(pid)
-		isHostNs := err == nil && hostNetNsId == netNs.UniqueId()
-		netNs.Close()
-		if !isHostNs {
-			return
-		}
-	} else {
-		whitelisted := false
-		for _, prefix := range flags.ExternalNetworksWhitelist {
-			if prefix.Contains(dst.IP()) {
-				whitelisted = true
-				break
-			}
-		}
-		if !whitelisted && !common.IsIpPrivate(dst.IP()) {
-			return
+	if dst.IP().IsLoopback() && !c.isHostNs {
+		return
+	}
+	whitelisted := false
+	for _, prefix := range flags.ExternalNetworksWhitelist {
+		if prefix.Contains(dst.IP()) {
+			whitelisted = true
+			break
 		}
 		}
 	}
 	}
+	if !whitelisted && !common.IsIpPrivate(dst.IP()) {
+		return
+	}
 	c.lock.Lock()
 	c.lock.Lock()
 	defer c.lock.Unlock()
 	defer c.lock.Unlock()
 	if failed {
 	if failed {
 		c.connectsFailed[dst]++
 		c.connectsFailed[dst]++
 	} else {
 	} else {
-		actualDst := ConntrackGetActualDestination(src, dst)
-		c.connectsSuccessful[AddrPair{src: dst, dst: actualDst}]++
+		actualDst := c.hostConntrack.GetActualDestination(src, dst)
+		if actualDst == nil && !c.isHostNs {
+			if c.nsConntrack == nil {
+				netNs, err := proc.GetNetNs(pid)
+				if err != nil {
+					if !common.IsNotExist(err) {
+						klog.Warningf("cannot open NetNs for pid %d: %s", pid, err)
+					}
+					return
+				}
+				defer netNs.Close()
+				c.nsConntrack, err = NewConntrack(netNs)
+				if err != nil {
+					klog.Warningln(err)
+					return
+				}
+			}
+			actualDst = c.nsConntrack.GetActualDestination(src, dst)
+		}
+		switch {
+		case actualDst == nil:
+			actualDst = &dst
+		case actualDst.IP().IsLoopback() && !c.isHostNs:
+			return
+		}
+		c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
 		c.connectionsActive[AddrPair{src: src, dst: dst}] = &ActiveConnection{
 		c.connectionsActive[AddrPair{src: src, dst: dst}] = &ActiveConnection{
-			ActualDest: actualDst,
+			ActualDest: *actualDst,
 			Pid:        pid,
 			Pid:        pid,
 			Fd:         fd,
 			Fd:         fd,
 			Timestamp:  timestamp,
 			Timestamp:  timestamp,

+ 14 - 4
containers/registry.go

@@ -26,6 +26,8 @@ type Registry struct {
 	tracer *ebpftracer.Tracer
 	tracer *ebpftracer.Tracer
 	events chan ebpftracer.Event
 	events chan ebpftracer.Event
 
 
+	hostConntrack *Conntrack
+
 	containersById       map[ContainerID]*Container
 	containersById       map[ContainerID]*Container
 	containersByCgroupId map[string]*Container
 	containersByCgroupId map[string]*Container
 	containersByPid      map[uint32]*Container
 	containersByPid      map[uint32]*Container
@@ -48,9 +50,6 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string) (*Registry, er
 		if err := TaskstatsInit(); err != nil {
 		if err := TaskstatsInit(); err != nil {
 			return err
 			return err
 		}
 		}
-		if err := ConntrackInit(); err != nil {
-			return err
-		}
 		return nil
 		return nil
 	})
 	})
 	if err != nil {
 	if err != nil {
@@ -68,11 +67,17 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string) (*Registry, er
 	if err := JournaldInit(); err != nil {
 	if err := JournaldInit(); err != nil {
 		klog.Warningln(err)
 		klog.Warningln(err)
 	}
 	}
+	ct, err := NewConntrack(hostNetNs)
+	if err != nil {
+		return nil, err
+	}
 
 
 	cs := &Registry{
 	cs := &Registry{
 		reg:    reg,
 		reg:    reg,
 		events: make(chan ebpftracer.Event, 10000),
 		events: make(chan ebpftracer.Event, 10000),
 
 
+		hostConntrack: ct,
+
 		containersById:       map[ContainerID]*Container{},
 		containersById:       map[ContainerID]*Container{},
 		containersByCgroupId: map[string]*Container{},
 		containersByCgroupId: map[string]*Container{},
 		containersByPid:      map[uint32]*Container{},
 		containersByPid:      map[uint32]*Container{},
@@ -258,7 +263,12 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
 		r.containersByCgroupId[cg.Id] = c
 		r.containersByCgroupId[cg.Id] = c
 		return c
 		return c
 	}
 	}
-	c := NewContainer(cg, md)
+	c, err := NewContainer(cg, md, r.hostConntrack, pid)
+	if err != nil {
+		klog.Warningf("failed to create container pid=%d cg=%s id=%s: %s", pid, cg.Id, id, err)
+		return nil
+	}
+
 	klog.InfoS("detected a new container", "pid", pid, "cg", cg.Id, "id", id)
 	klog.InfoS("detected a new container", "pid", pid, "cg", cg.Id, "id", id)
 	if err := prometheus.WrapRegistererWith(prometheus.Labels{"container_id": string(id)}, r.reg).Register(c); err != nil {
 	if err := prometheus.WrapRegistererWith(prometheus.Labels{"container_id": string(id)}, r.reg).Register(c); err != nil {
 		klog.Warningln(err)
 		klog.Warningln(err)

+ 1 - 1
logs/tail_reader_test.go

@@ -26,7 +26,7 @@ func TestTailReader(t *testing.T) {
 	}
 	}
 
 
 	wait := func() {
 	wait := func() {
-		time.Sleep(3 * tailPollInterval)
+		time.Sleep(time.Second)
 	}
 	}
 
 
 	get := func(expected string) {
 	get := func(expected string) {