client.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package kube
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/coroot/coroot-node-agent/kube/transfer"
  6. "github.com/coroot/coroot-node-agent/utils"
  7. "github.com/coroot/coroot-node-agent/utils/try"
  8. "github.com/sirupsen/logrus"
  9. v1 "k8s.io/api/core/v1"
  10. "k8s.io/client-go/kubernetes"
  11. "k8s.io/client-go/rest"
  12. "k8s.io/klog/v2"
  13. "os"
  14. "strings"
  15. "time"
  16. )
  17. var CwK8sClient *transfer.CwK8sClient
  18. func GetKubeClient() (*transfer.CwK8sClient, error) {
  19. if CwK8sClient == nil {
  20. return CwK8sClient, fmt.Errorf("K8sClient is nil")
  21. }
  22. return CwK8sClient, nil
  23. }
  24. func NewKubeClient() (*transfer.CwK8sClient, error) {
  25. if CwK8sClient != nil {
  26. return CwK8sClient, nil
  27. }
  28. ctx := context.Background()
  29. config, err := rest.InClusterConfig()
  30. if err != nil {
  31. return nil, err
  32. }
  33. config.TLSClientConfig.Insecure = false
  34. // creates the clientset
  35. clientset, err := kubernetes.NewForConfig(config)
  36. if err != nil {
  37. logrus.WithError(err).Error("[kube] connect init error")
  38. return nil, err
  39. }
  40. cwk8sclient := transfer.NewCwK8sClient(ctx, clientset)
  41. //go cwk8sclient.InformerSharedfactory.Start(make(chan struct{}, 0))
  42. try.GoParams(cwk8sclient.InformerSharedfactory.Start, utils.CatchFn, make(chan struct{}, 0))
  43. CwK8sClient = cwk8sclient
  44. logrus.Info("[kube] connect init success")
  45. time.Sleep(1 * time.Second)
  46. return cwk8sclient, nil
  47. }
  48. func GetWorkload(ns, pod string) (string, error) {
  49. client, err := GetKubeClient()
  50. if err != nil {
  51. return "", err
  52. }
  53. podsItems, err := client.InformerSharedfactory.Core().V1().Pods().Lister().Pods(ns).Get(pod)
  54. if err != nil {
  55. return "", err
  56. }
  57. if len(podsItems.ObjectMeta.OwnerReferences) > 0 {
  58. ownerReferences := podsItems.ObjectMeta.OwnerReferences[0]
  59. kind := ownerReferences.Kind
  60. workload := ownerReferences.Name
  61. switch kind {
  62. case "ReplicaSet":
  63. lastDash := strings.LastIndex(workload, "-")
  64. if lastDash != -1 {
  65. workload = workload[:lastDash]
  66. }
  67. }
  68. return workload, nil
  69. }
  70. return "", fmt.Errorf("workload not found")
  71. }
  72. func GetNodeIpByCore(nodeName string) (string, error) {
  73. ip := os.Getenv("node_ip")
  74. if ip != "" {
  75. klog.Info("[kube] get node ip by core:]", ip)
  76. return ip, nil
  77. }
  78. client, err := GetKubeClient()
  79. if err != nil {
  80. return "", err
  81. }
  82. nodeItem, err := client.InformerSharedfactory.Core().V1().Nodes().Lister().Get(nodeName)
  83. if err != nil {
  84. logrus.WithError(err).Error("[kube] get node info error")
  85. for {
  86. time.Sleep(1 * time.Second)
  87. nodeItem, err = client.InformerSharedfactory.Core().V1().Nodes().Lister().Get(nodeName)
  88. if err != nil {
  89. logrus.WithError(err).Error("[kube] get node info error")
  90. } else {
  91. break
  92. }
  93. }
  94. }
  95. // 解析 Node IP
  96. var internalIP, externalIP string
  97. for _, addr := range nodeItem.Status.Addresses {
  98. if addr.Type == v1.NodeInternalIP {
  99. internalIP = addr.Address
  100. } else if addr.Type == v1.NodeExternalIP {
  101. externalIP = addr.Address
  102. }
  103. }
  104. if internalIP == "" && externalIP == "" {
  105. return "", fmt.Errorf("internalIP or externalIP not found")
  106. }
  107. if internalIP != "" {
  108. return internalIP, nil
  109. }
  110. return externalIP, nil
  111. }