| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 |
- package containers
- import (
- "fmt"
- "github.com/coroot/coroot-node-agent/cgroup"
- "github.com/coroot/coroot-node-agent/common"
- "github.com/coroot/coroot-node-agent/ebpftracer"
- "github.com/coroot/coroot-node-agent/flags"
- "github.com/coroot/coroot-node-agent/proc"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/vishvananda/netns"
- "k8s.io/klog/v2"
- "os"
- "time"
- )
- var (
- selfNetNs = netns.None()
- hostNetNsId = netns.None().UniqueId()
- agentPid = uint32(os.Getpid())
- )
- type Registry struct {
- reg prometheus.Registerer
- tracer *ebpftracer.Tracer
- events chan ebpftracer.Event
- containersById map[ContainerID]*Container
- containersByCgroupId map[string]*Container
- containersByPid map[uint32]*Container
- }
- func NewRegistry(reg prometheus.Registerer, kernelVersion string) (*Registry, error) {
- ns, err := proc.GetSelfNetNs()
- if err != nil {
- return nil, err
- }
- selfNetNs = ns
- hostNetNs, err := proc.GetHostNetNs()
- if err != nil {
- return nil, err
- }
- defer hostNetNs.Close()
- hostNetNsId = hostNetNs.UniqueId()
- err = proc.ExecuteInNetNs(hostNetNs, selfNetNs, func() error {
- if err := TaskstatsInit(); err != nil {
- return err
- }
- if err := ConntrackInit(); err != nil {
- return err
- }
- return nil
- })
- if err != nil {
- return nil, err
- }
- if err := cgroup.Init(); err != nil {
- return nil, err
- }
- if err := DockerdInit(); err != nil {
- klog.Warningln(err)
- }
- if err := ContainerdInit(); err != nil {
- klog.Warningln(err)
- }
- if err := JournaldInit(); err != nil {
- klog.Warningln(err)
- }
- cs := &Registry{
- reg: reg,
- events: make(chan ebpftracer.Event, 10000),
- containersById: map[ContainerID]*Container{},
- containersByCgroupId: map[string]*Container{},
- containersByPid: map[uint32]*Container{},
- }
- go cs.handleEvents(cs.events)
- t, err := ebpftracer.NewTracer(cs.events, kernelVersion, *flags.DisableL7Tracing)
- if err != nil {
- close(cs.events)
- return nil, err
- }
- cs.tracer = t
- return cs, nil
- }
- func (r *Registry) Close() {
- r.tracer.Close()
- close(r.events)
- }
- func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
- gcTicker := time.NewTicker(gcInterval)
- defer gcTicker.Stop()
- for {
- select {
- case now := <-gcTicker.C:
- for pid, c := range r.containersByPid {
- cg, err := proc.ReadCgroup(pid)
- if err != nil {
- delete(r.containersByPid, pid)
- if c != nil {
- c.onProcessExit(pid, false)
- }
- continue
- }
- if c != nil && cg.Id != c.cgroup.Id {
- delete(r.containersByPid, pid)
- c.onProcessExit(pid, false)
- }
- }
- for id, c := range r.containersById {
- if !c.Dead(now) {
- continue
- }
- for cg, cc := range r.containersByCgroupId {
- if cc == c {
- delete(r.containersByCgroupId, cg)
- }
- }
- for pid, cc := range r.containersByPid {
- if cc == c {
- delete(r.containersByPid, pid)
- }
- }
- prometheus.WrapRegistererWith(prometheus.Labels{"container_id": string(id)}, r.reg).Unregister(c)
- delete(r.containersById, id)
- c.Close()
- }
- case e, more := <-ch:
- if !more {
- return
- }
- switch e.Type {
- case ebpftracer.EventTypeProcessStart:
- c, seen := r.containersByPid[e.Pid]
- switch { // possible pids wraparound + missed `process-exit` event
- case c == nil && seen: // ignored
- delete(r.containersByPid, e.Pid)
- case c != nil: // revalidating by cgroup
- cg, err := proc.ReadCgroup(e.Pid)
- if err != nil || cg.Id != c.cgroup.Id {
- delete(r.containersByPid, e.Pid)
- c.onProcessExit(e.Pid, false)
- }
- }
- if c := r.getOrCreateContainer(e.Pid); c != nil {
- c.onProcessStart(e.Pid)
- }
- case ebpftracer.EventTypeProcessExit:
- if c := r.containersByPid[e.Pid]; c != nil {
- c.onProcessExit(e.Pid, e.Reason == ebpftracer.EventReasonOOMKill)
- }
- delete(r.containersByPid, e.Pid)
- case ebpftracer.EventTypeFileOpen:
- if c := r.getOrCreateContainer(e.Pid); c != nil {
- c.onFileOpen(e.Pid, e.Fd)
- }
- case ebpftracer.EventTypeListenOpen:
- if c := r.getOrCreateContainer(e.Pid); c != nil {
- c.onListenOpen(e.Pid, e.SrcAddr, false)
- } else {
- klog.Infoln("TCP listen open from unknown container", e)
- }
- case ebpftracer.EventTypeListenClose:
- if c := r.containersByPid[e.Pid]; c != nil {
- c.onListenClose(e.Pid, e.SrcAddr)
- }
- case ebpftracer.EventTypeConnectionOpen:
- if c := r.getOrCreateContainer(e.Pid); c != nil {
- c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, false)
- } else {
- klog.Infoln("TCP connection from unknown container", e)
- }
- case ebpftracer.EventTypeConnectionError:
- if c := r.getOrCreateContainer(e.Pid); c != nil {
- c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, true)
- } else {
- klog.Infoln("TCP connection error from unknown container", e)
- }
- case ebpftracer.EventTypeConnectionClose:
- srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
- for _, c := range r.containersById {
- if c.onConnectionClose(srcDst) {
- break
- }
- }
- case ebpftracer.EventTypeTCPRetransmit:
- srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
- for _, c := range r.containersById {
- if c.onRetransmit(srcDst) {
- break
- }
- }
- case ebpftracer.EventTypeL7Request:
- if e.L7Request == nil {
- continue
- }
- //klog.Infof("L7 request proto:%s pid:%d fd:%d", e.L7Request.Protocol.String(), e.Pid, e.Fd)
- if c := r.containersByPid[e.Pid]; c != nil {
- c.onL7Request(e.Pid, e.Fd, e.L7Request)
- }
- }
- }
- }
- }
- func (r *Registry) getOrCreateContainer(pid uint32) *Container {
- if c, seen := r.containersByPid[pid]; c != nil {
- return c
- } else if seen { // ignored
- return nil
- }
- cg, err := proc.ReadCgroup(pid)
- if err != nil {
- if !common.IsNotExist(err) {
- klog.Warningln("failed to read proc cgroup:", err)
- }
- return nil
- }
- if c := r.containersByCgroupId[cg.Id]; c != nil {
- r.containersByPid[pid] = c
- return c
- }
- md, err := getContainerMetadata(cg)
- if err != nil {
- klog.Warningf("failed to get container metadata for pid %d -> %s: %s", pid, cg.Id, err)
- return nil
- }
- id := calcId(cg, md)
- klog.Infof("calculated container id %d -> %s -> %s", pid, cg.Id, id)
- if id == "" {
- if cg.Id == "/init.scope" && pid != 1 {
- klog.InfoS("ignoring without persisting", "cg", cg.Id, "pid", pid)
- } else {
- klog.InfoS("ignoring", "cg", cg.Id, "pid", pid)
- r.containersByPid[pid] = nil
- }
- return nil
- }
- if c := r.containersById[id]; c != nil {
- klog.Warningln("id conflict:", id)
- if cg.CreatedAt().After(c.cgroup.CreatedAt()) {
- c.cgroup = cg
- c.metadata = md
- c.runLogParser("")
- }
- r.containersByPid[pid] = c
- r.containersByCgroupId[cg.Id] = c
- return c
- }
- c := NewContainer(cg, md)
- klog.InfoS("detected a new container", "pid", pid, "cg", cg.Id, "id", id)
- if err := prometheus.WrapRegistererWith(prometheus.Labels{"container_id": string(id)}, r.reg).Register(c); err != nil {
- klog.Warningln(err)
- return nil
- }
- r.containersByPid[pid] = c
- r.containersByCgroupId[cg.Id] = c
- r.containersById[id] = c
- return c
- }
- func calcId(cg *cgroup.Cgroup, md *ContainerMetadata) ContainerID {
- if cg.ContainerType == cgroup.ContainerTypeSystemdService {
- return ContainerID(cg.ContainerId)
- }
- if cg.ContainerType != cgroup.ContainerTypeDocker && cg.ContainerType != cgroup.ContainerTypeContainerd {
- return ""
- }
- if md.labels["io.kubernetes.pod.name"] != "" {
- pod := md.labels["io.kubernetes.pod.name"]
- namespace := md.labels["io.kubernetes.pod.namespace"]
- name := md.labels["io.kubernetes.container.name"]
- if name == "" || name == "POD" { // skip pause|sandbox containers
- return ""
- }
- return ContainerID(fmt.Sprintf("/k8s/%s/%s/%s", namespace, pod, name))
- }
- if md.name == "" { // should be "pure" dockerd container here
- klog.Warningln("empty dockerd container name for:", cg.ContainerId)
- return ""
- }
- return ContainerID("/docker/" + md.name)
- }
- func getContainerMetadata(cg *cgroup.Cgroup) (*ContainerMetadata, error) {
- if cg.ContainerType != cgroup.ContainerTypeDocker && cg.ContainerType != cgroup.ContainerTypeContainerd {
- return &ContainerMetadata{}, nil
- }
- var dockerdErr error
- if dockerdClient != nil {
- md, err := DockerdInspect(cg.ContainerId)
- if err == nil {
- return md, nil
- }
- klog.Warningf("failed to inspect container %s: %s", cg.ContainerId, err)
- dockerdErr = err
- }
- var containerdErr error
- if containerdClient != nil {
- md, err := ContainerdInspect(cg.ContainerId)
- if err == nil {
- return md, nil
- }
- klog.Warningf("failed to inspect container %s: %s", cg.ContainerId, err)
- containerdErr = err
- }
- return nil, fmt.Errorf("failed to interact with dockerd (%s) or with containerd (%s)", dockerdErr, containerdErr)
- }
|