client.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package kube
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/coroot/coroot-node-agent/kube/transfer"
  6. "github.com/coroot/coroot-node-agent/utils"
  7. "github.com/coroot/coroot-node-agent/utils/try"
  8. "github.com/sirupsen/logrus"
  9. v1 "k8s.io/api/core/v1"
  10. "k8s.io/client-go/kubernetes"
  11. "k8s.io/client-go/rest"
  12. "strings"
  13. "time"
  14. )
  15. var CwK8sClient *transfer.CwK8sClient
  16. func GetKubeClient() (*transfer.CwK8sClient, error) {
  17. if CwK8sClient == nil {
  18. return CwK8sClient, fmt.Errorf("K8sClient is nil")
  19. }
  20. return CwK8sClient, nil
  21. }
  22. func NewKubeClient() (*transfer.CwK8sClient, error) {
  23. if CwK8sClient != nil {
  24. return CwK8sClient, nil
  25. }
  26. ctx := context.Background()
  27. config, err := rest.InClusterConfig()
  28. if err != nil {
  29. return nil, err
  30. }
  31. config.TLSClientConfig.Insecure = false
  32. // creates the clientset
  33. clientset, err := kubernetes.NewForConfig(config)
  34. if err != nil {
  35. logrus.WithError(err).Error("[kube] connect init error")
  36. return nil, err
  37. }
  38. cwk8sclient := transfer.NewCwK8sClient(ctx, clientset)
  39. //go cwk8sclient.InformerSharedfactory.Start(make(chan struct{}, 0))
  40. try.GoParams(cwk8sclient.InformerSharedfactory.Start, utils.CatchFn, make(chan struct{}, 0))
  41. CwK8sClient = cwk8sclient
  42. logrus.Info("[kube] connect init success")
  43. time.Sleep(1 * time.Second)
  44. return cwk8sclient, nil
  45. }
  46. func GetWorkload(ns, pod string) (string, error) {
  47. client, err := GetKubeClient()
  48. if err != nil {
  49. return "", err
  50. }
  51. podsItems, err := client.InformerSharedfactory.Core().V1().Pods().Lister().Pods(ns).Get(pod)
  52. if err != nil {
  53. return "", err
  54. }
  55. if len(podsItems.ObjectMeta.OwnerReferences) > 0 {
  56. ownerReferences := podsItems.ObjectMeta.OwnerReferences[0]
  57. kind := ownerReferences.Kind
  58. workload := ownerReferences.Name
  59. switch kind {
  60. case "ReplicaSet":
  61. lastDash := strings.LastIndex(workload, "-")
  62. if lastDash != -1 {
  63. workload = workload[:lastDash]
  64. }
  65. }
  66. return workload, nil
  67. }
  68. return "", fmt.Errorf("workload not found")
  69. }
  70. func GetNodeIpByCore(nodeName string) (string, error) {
  71. client, err := GetKubeClient()
  72. if err != nil {
  73. return "", err
  74. }
  75. nodeItem, err := client.InformerSharedfactory.Core().V1().Nodes().Lister().Get(nodeName)
  76. if err != nil {
  77. logrus.WithError(err).Error("[kube] get node info error")
  78. for {
  79. time.Sleep(1 * time.Second)
  80. nodeItem, err = client.InformerSharedfactory.Core().V1().Nodes().Lister().Get(nodeName)
  81. if err != nil {
  82. logrus.WithError(err).Error("[kube] get node info error")
  83. } else {
  84. break
  85. }
  86. }
  87. }
  88. // 解析 Node IP
  89. var internalIP, externalIP string
  90. for _, addr := range nodeItem.Status.Addresses {
  91. if addr.Type == v1.NodeInternalIP {
  92. internalIP = addr.Address
  93. } else if addr.Type == v1.NodeExternalIP {
  94. externalIP = addr.Address
  95. }
  96. }
  97. if internalIP == "" && externalIP == "" {
  98. return "", fmt.Errorf("internalIP or externalIP not found")
  99. }
  100. if internalIP != "" {
  101. return internalIP, nil
  102. }
  103. return externalIP, nil
  104. }