registry.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. package containers
  2. import (
  3. "fmt"
  4. "github.com/coroot/coroot-node-agent/cgroup"
  5. "github.com/coroot/coroot-node-agent/common"
  6. "github.com/coroot/coroot-node-agent/ebpftracer"
  7. "github.com/coroot/coroot-node-agent/flags"
  8. "github.com/coroot/coroot-node-agent/proc"
  9. "github.com/prometheus/client_golang/prometheus"
  10. "github.com/vishvananda/netns"
  11. "k8s.io/klog/v2"
  12. "os"
  13. "time"
  14. )
  15. var (
  16. selfNetNs = netns.None()
  17. hostNetNsId = netns.None().UniqueId()
  18. agentPid = uint32(os.Getpid())
  19. )
  20. type Registry struct {
  21. reg prometheus.Registerer
  22. tracer *ebpftracer.Tracer
  23. events chan ebpftracer.Event
  24. containersById map[ContainerID]*Container
  25. containersByCgroupId map[string]*Container
  26. containersByPid map[uint32]*Container
  27. }
  28. func NewRegistry(reg prometheus.Registerer, kernelVersion string) (*Registry, error) {
  29. ns, err := proc.GetSelfNetNs()
  30. if err != nil {
  31. return nil, err
  32. }
  33. selfNetNs = ns
  34. hostNetNs, err := proc.GetHostNetNs()
  35. if err != nil {
  36. return nil, err
  37. }
  38. defer hostNetNs.Close()
  39. hostNetNsId = hostNetNs.UniqueId()
  40. err = proc.ExecuteInNetNs(hostNetNs, selfNetNs, func() error {
  41. if err := TaskstatsInit(); err != nil {
  42. return err
  43. }
  44. if err := ConntrackInit(); err != nil {
  45. return err
  46. }
  47. return nil
  48. })
  49. if err != nil {
  50. return nil, err
  51. }
  52. if err := cgroup.Init(); err != nil {
  53. return nil, err
  54. }
  55. if err := DockerdInit(); err != nil {
  56. klog.Warningln(err)
  57. }
  58. if err := ContainerdInit(); err != nil {
  59. klog.Warningln(err)
  60. }
  61. if err := JournaldInit(); err != nil {
  62. klog.Warningln(err)
  63. }
  64. cs := &Registry{
  65. reg: reg,
  66. events: make(chan ebpftracer.Event, 10000),
  67. containersById: map[ContainerID]*Container{},
  68. containersByCgroupId: map[string]*Container{},
  69. containersByPid: map[uint32]*Container{},
  70. }
  71. go cs.handleEvents(cs.events)
  72. t, err := ebpftracer.NewTracer(cs.events, kernelVersion, *flags.DisableL7Tracing)
  73. if err != nil {
  74. close(cs.events)
  75. return nil, err
  76. }
  77. cs.tracer = t
  78. return cs, nil
  79. }
  80. func (r *Registry) Close() {
  81. r.tracer.Close()
  82. close(r.events)
  83. }
  84. func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
  85. gcTicker := time.NewTicker(gcInterval)
  86. defer gcTicker.Stop()
  87. for {
  88. select {
  89. case now := <-gcTicker.C:
  90. for pid, c := range r.containersByPid {
  91. cg, err := proc.ReadCgroup(pid)
  92. if err != nil {
  93. delete(r.containersByPid, pid)
  94. if c != nil {
  95. c.onProcessExit(pid, false)
  96. }
  97. continue
  98. }
  99. if c != nil && cg.Id != c.cgroup.Id {
  100. delete(r.containersByPid, pid)
  101. c.onProcessExit(pid, false)
  102. }
  103. }
  104. for id, c := range r.containersById {
  105. if !c.Dead(now) {
  106. continue
  107. }
  108. for cg, cc := range r.containersByCgroupId {
  109. if cc == c {
  110. delete(r.containersByCgroupId, cg)
  111. }
  112. }
  113. for pid, cc := range r.containersByPid {
  114. if cc == c {
  115. delete(r.containersByPid, pid)
  116. }
  117. }
  118. prometheus.WrapRegistererWith(prometheus.Labels{"container_id": string(id)}, r.reg).Unregister(c)
  119. delete(r.containersById, id)
  120. c.Close()
  121. }
  122. case e, more := <-ch:
  123. if !more {
  124. return
  125. }
  126. switch e.Type {
  127. case ebpftracer.EventTypeProcessStart:
  128. c, seen := r.containersByPid[e.Pid]
  129. switch { // possible pids wraparound + missed `process-exit` event
  130. case c == nil && seen: // ignored
  131. delete(r.containersByPid, e.Pid)
  132. case c != nil: // revalidating by cgroup
  133. cg, err := proc.ReadCgroup(e.Pid)
  134. if err != nil || cg.Id != c.cgroup.Id {
  135. delete(r.containersByPid, e.Pid)
  136. c.onProcessExit(e.Pid, false)
  137. }
  138. }
  139. if c := r.getOrCreateContainer(e.Pid); c != nil {
  140. c.onProcessStart(e.Pid)
  141. }
  142. case ebpftracer.EventTypeProcessExit:
  143. if c := r.containersByPid[e.Pid]; c != nil {
  144. c.onProcessExit(e.Pid, e.Reason == ebpftracer.EventReasonOOMKill)
  145. }
  146. delete(r.containersByPid, e.Pid)
  147. case ebpftracer.EventTypeFileOpen:
  148. if c := r.getOrCreateContainer(e.Pid); c != nil {
  149. c.onFileOpen(e.Pid, e.Fd)
  150. }
  151. case ebpftracer.EventTypeListenOpen:
  152. if c := r.getOrCreateContainer(e.Pid); c != nil {
  153. c.onListenOpen(e.Pid, e.SrcAddr, false)
  154. } else {
  155. klog.Infoln("TCP listen open from unknown container", e)
  156. }
  157. case ebpftracer.EventTypeListenClose:
  158. if c := r.containersByPid[e.Pid]; c != nil {
  159. c.onListenClose(e.Pid, e.SrcAddr)
  160. }
  161. case ebpftracer.EventTypeConnectionOpen:
  162. if c := r.getOrCreateContainer(e.Pid); c != nil {
  163. c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, false)
  164. } else {
  165. klog.Infoln("TCP connection from unknown container", e)
  166. }
  167. case ebpftracer.EventTypeConnectionError:
  168. if c := r.getOrCreateContainer(e.Pid); c != nil {
  169. c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, true)
  170. } else {
  171. klog.Infoln("TCP connection error from unknown container", e)
  172. }
  173. case ebpftracer.EventTypeConnectionClose:
  174. srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
  175. for _, c := range r.containersById {
  176. if c.onConnectionClose(srcDst) {
  177. break
  178. }
  179. }
  180. case ebpftracer.EventTypeTCPRetransmit:
  181. srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
  182. for _, c := range r.containersById {
  183. if c.onRetransmit(srcDst) {
  184. break
  185. }
  186. }
  187. case ebpftracer.EventTypeL7Request:
  188. if e.L7Request == nil {
  189. continue
  190. }
  191. //klog.Infof("L7 request proto:%s pid:%d fd:%d", e.L7Request.Protocol.String(), e.Pid, e.Fd)
  192. if c := r.containersByPid[e.Pid]; c != nil {
  193. c.onL7Request(e.Pid, e.Fd, e.L7Request)
  194. }
  195. }
  196. }
  197. }
  198. }
  199. func (r *Registry) getOrCreateContainer(pid uint32) *Container {
  200. if c, seen := r.containersByPid[pid]; c != nil {
  201. return c
  202. } else if seen { // ignored
  203. return nil
  204. }
  205. cg, err := proc.ReadCgroup(pid)
  206. if err != nil {
  207. if !common.IsNotExist(err) {
  208. klog.Warningln("failed to read proc cgroup:", err)
  209. }
  210. return nil
  211. }
  212. if c := r.containersByCgroupId[cg.Id]; c != nil {
  213. r.containersByPid[pid] = c
  214. return c
  215. }
  216. md, err := getContainerMetadata(cg)
  217. if err != nil {
  218. klog.Warningf("failed to get container metadata for pid %d -> %s: %s", pid, cg.Id, err)
  219. return nil
  220. }
  221. id := calcId(cg, md)
  222. klog.Infof("calculated container id %d -> %s -> %s", pid, cg.Id, id)
  223. if id == "" {
  224. if cg.Id == "/init.scope" && pid != 1 {
  225. klog.InfoS("ignoring without persisting", "cg", cg.Id, "pid", pid)
  226. } else {
  227. klog.InfoS("ignoring", "cg", cg.Id, "pid", pid)
  228. r.containersByPid[pid] = nil
  229. }
  230. return nil
  231. }
  232. if c := r.containersById[id]; c != nil {
  233. klog.Warningln("id conflict:", id)
  234. if cg.CreatedAt().After(c.cgroup.CreatedAt()) {
  235. c.cgroup = cg
  236. c.metadata = md
  237. c.runLogParser("")
  238. }
  239. r.containersByPid[pid] = c
  240. r.containersByCgroupId[cg.Id] = c
  241. return c
  242. }
  243. c := NewContainer(cg, md)
  244. klog.InfoS("detected a new container", "pid", pid, "cg", cg.Id, "id", id)
  245. if err := prometheus.WrapRegistererWith(prometheus.Labels{"container_id": string(id)}, r.reg).Register(c); err != nil {
  246. klog.Warningln(err)
  247. return nil
  248. }
  249. r.containersByPid[pid] = c
  250. r.containersByCgroupId[cg.Id] = c
  251. r.containersById[id] = c
  252. return c
  253. }
  254. func calcId(cg *cgroup.Cgroup, md *ContainerMetadata) ContainerID {
  255. if cg.ContainerType == cgroup.ContainerTypeSystemdService {
  256. return ContainerID(cg.ContainerId)
  257. }
  258. if cg.ContainerType != cgroup.ContainerTypeDocker && cg.ContainerType != cgroup.ContainerTypeContainerd {
  259. return ""
  260. }
  261. if md.labels["io.kubernetes.pod.name"] != "" {
  262. pod := md.labels["io.kubernetes.pod.name"]
  263. namespace := md.labels["io.kubernetes.pod.namespace"]
  264. name := md.labels["io.kubernetes.container.name"]
  265. if name == "" || name == "POD" { // skip pause|sandbox containers
  266. return ""
  267. }
  268. return ContainerID(fmt.Sprintf("/k8s/%s/%s/%s", namespace, pod, name))
  269. }
  270. if md.name == "" { // should be "pure" dockerd container here
  271. klog.Warningln("empty dockerd container name for:", cg.ContainerId)
  272. return ""
  273. }
  274. return ContainerID("/docker/" + md.name)
  275. }
  276. func getContainerMetadata(cg *cgroup.Cgroup) (*ContainerMetadata, error) {
  277. if cg.ContainerType != cgroup.ContainerTypeDocker && cg.ContainerType != cgroup.ContainerTypeContainerd {
  278. return &ContainerMetadata{}, nil
  279. }
  280. var dockerdErr error
  281. if dockerdClient != nil {
  282. md, err := DockerdInspect(cg.ContainerId)
  283. if err == nil {
  284. return md, nil
  285. }
  286. klog.Warningf("failed to inspect container %s: %s", cg.ContainerId, err)
  287. dockerdErr = err
  288. }
  289. var containerdErr error
  290. if containerdClient != nil {
  291. md, err := ContainerdInspect(cg.ContainerId)
  292. if err == nil {
  293. return md, nil
  294. }
  295. klog.Warningf("failed to inspect container %s: %s", cg.ContainerId, err)
  296. containerdErr = err
  297. }
  298. return nil, fmt.Errorf("failed to interact with dockerd (%s) or with containerd (%s)", dockerdErr, containerdErr)
  299. }