containerd.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package containers
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/containerd/containerd"
  9. "github.com/containerd/containerd/oci"
  10. "github.com/containerd/containerd/pkg/cri/constants"
  11. "github.com/coroot/coroot-node-agent/common"
  12. "github.com/coroot/coroot-node-agent/proc"
  13. "github.com/coroot/logparser"
  14. klog "github.com/sirupsen/logrus"
  15. )
  16. const containerdTimeout = 30 * time.Second
  17. var (
  18. containerdClient *containerd.Client
  19. )
  20. func ContainerdInit() error {
  21. sockets := []string{
  22. "/var/snap/microk8s/common/run/containerd.sock",
  23. "/run/k0s/containerd.sock",
  24. "/run/k3s/containerd/containerd.sock",
  25. "/run/containerd/containerd.sock",
  26. }
  27. var err error
  28. for _, socket := range sockets {
  29. containerdClient, err = containerd.New(proc.HostPath(socket),
  30. containerd.WithDefaultNamespace(constants.K8sContainerdNamespace),
  31. containerd.WithTimeout(time.Second))
  32. if err == nil {
  33. klog.Infoln("using", socket)
  34. break
  35. }
  36. }
  37. if containerdClient == nil {
  38. return fmt.Errorf(
  39. "couldn't connect to containerd through the following UNIX sockets [%s]: %s",
  40. strings.Join(sockets, ","), err,
  41. )
  42. }
  43. return nil
  44. }
  45. func ContainerdInspect(containerID string) (*ContainerMetadata, error) {
  46. if containerdClient == nil {
  47. return nil, fmt.Errorf("containerd client not initialized")
  48. }
  49. ctx, cancel := context.WithTimeout(context.Background(), containerdTimeout)
  50. defer cancel()
  51. c, err := containerdClient.ContainerService().Get(ctx, containerID)
  52. if err != nil {
  53. return nil, err
  54. }
  55. res := &ContainerMetadata{
  56. labels: c.Labels,
  57. image: c.Image,
  58. volumes: map[string]string{},
  59. rootfs: fmt.Sprintf("/run/containerd/io.containerd.runtime.v2.task/k8s.io/%s/rootfs", containerID),
  60. }
  61. var spec oci.Spec
  62. if err := json.Unmarshal(c.Spec.Value, &spec); err != nil {
  63. klog.Warningln(err)
  64. } else {
  65. for _, m := range spec.Mounts {
  66. res.volumes[m.Destination] = common.ParseKubernetesVolumeSource(m.Source)
  67. }
  68. }
  69. if data, ok := c.Extensions["io.cri-containerd.container.metadata"]; ok {
  70. var md = struct { // from data.TypeUrl
  71. Metadata struct {
  72. LogPath string
  73. }
  74. }{}
  75. if err := json.Unmarshal(data.Value, &md); err != nil {
  76. klog.Warningln(err)
  77. } else {
  78. res.logPath = md.Metadata.LogPath
  79. res.logDecoder = logparser.CriDecoder{}
  80. }
  81. }
  82. return res, nil
  83. }