Quellcode durchsuchen

Feature #TASK_QT-15938 容器化workload属性采集

Carl vor 1 Jahr
Ursprung
Commit
3aac371910
9 geänderte Dateien mit 144 neuen und 13 gelöschten Zeilen
  1. 1 1
      Dockerfile
  2. 1 1
      cloudwise-apm-euspace.yaml
  3. 6 1
      containers/registry.go
  4. 10 7
      containers/registry_apm.go
  5. 13 2
      go.mod
  6. 7 1
      go.sum
  7. 68 0
      kube/client.go
  8. 30 0
      kube/transfer/transfer.go
  9. 8 0
      main.go

+ 1 - 1
Dockerfile

@@ -8,7 +8,7 @@ ADD ./dist/package_dir/  $EUSPACE_BASE_PATH/
 
 ARG EUSPACE_BIN_PATH=$EUSPACE_BASE_PATH/bin
 
-# 拷贝euspace可执行文件
+# 拷贝euspace可执行文件,make直接生成到dist/package_dir/bin/euspace
 #COPY ./euspace $EUSPACE_BIN_PATH/
 
 # 设置工作目录

+ 1 - 1
cloudwise-apm-euspace.yaml

@@ -27,7 +27,7 @@ spec:
         kubernetes.io/arch: amd64
       containers:
         - name: cloudwise-apm-euspace
-          image: harbor.cloudwise.com/apm/euspace_dev:1.1
+          image: harbor.cloudwise.com/apm/euspace_dev:1.2
           imagePullPolicy: Always
         # imagePullPolicy: IfNotPresent  
           args: ["--listen", "0.0.0.0:8123", "--cgroupfs-root", "/host/sys/fs/cgroup","--run-in-container"]

+ 6 - 1
containers/registry.go

@@ -3,6 +3,7 @@ package containers
 import (
 	"bytes"
 	"fmt"
+	"github.com/coroot/coroot-node-agent/kube"
 	. "github.com/coroot/coroot-node-agent/utils"
 	"github.com/coroot/coroot-node-agent/utils/enums"
 	. "github.com/coroot/coroot-node-agent/utils/modelse"
@@ -279,6 +280,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 				}
 				if ok := prometheus.WrapRegistererWith(setLabels(string(id),
 					c.K8sContainer.ns,
+					c.K8sContainer.workload,
 					c.K8sContainer.podName,
 					c.K8sContainer.containerName,
 					c.K8sContainer.pid), r.reg).Unregister(c); !ok {
@@ -519,6 +521,7 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
 	setK8sTag(c, extensionTag, pid)
 	if err := prometheus.WrapRegistererWith(setLabels(string(id),
 		extensionTag[Namespace],
+		extensionTag[Workload],
 		extensionTag[PodName],
 		extensionTag[ProcessName],
 		fmt.Sprintf("%d", pid)), r.reg).Register(c); err != nil {
@@ -563,7 +566,9 @@ func calcId(cg *cgroup.Cgroup, md *ContainerMetadata, pid uint32) (ContainerID,
 			return "", extensionTag
 		}
 		extensionTag[Namespace] = namespace
-		extensionTag[Workload] = ""
+		if *flags.RunInContainer {
+			extensionTag[Workload], _ = kube.GetWorkload(namespace, pod)
+		}
 		extensionTag[PodName] = pod
 		//extensionTag[ProcessName] = name
 		return ContainerID(fmt.Sprintf("/k8s/%s/%s/%s", namespace, pod, name)), extensionTag

+ 10 - 7
containers/registry_apm.go

@@ -12,19 +12,21 @@ import (
 )
 
 const (
-	Namespace   = "namespace"
+	Namespace   = "ns"
 	PodName     = "pod_name"
 	Workload    = "workload"
 	ProcessName = "process_name"
+	ContainerId = "container_id"
 )
 
-func setLabels(container_id, ns, pod_name, process_name, pid string) prometheus.Labels {
+func setLabels(container_id, ns, workload, pod_name, process_name, pid string) prometheus.Labels {
 	return map[string]string{
-		"container_id": container_id,
-		"ns":           ns,
-		"pod_name":     pod_name,
-		"process_name": process_name,
-		"pid":          pid,
+		ContainerId: container_id,
+		Namespace:   ns,
+		Workload:    workload,
+		PodName:     pod_name,
+		ProcessName: process_name,
+		"pid":       pid,
 	}
 }
 
@@ -33,6 +35,7 @@ func setK8sTag(c *Container, tag map[string]string, pid uint32) {
 	c.K8sContainer.ns = tag[Namespace]
 	c.K8sContainer.podName = tag[PodName]
 	c.K8sContainer.containerName = tag[ProcessName]
+	c.K8sContainer.workload = tag[Workload]
 	c.K8sContainer.pid = sPid
 }
 

+ 13 - 2
go.mod

@@ -49,6 +49,8 @@ require (
 	gopkg.in/alecthomas/kingpin.v2 v2.2.6
 	gopkg.in/yaml.v2 v2.4.0
 	inet.af/netaddr v0.0.0-20230525184311-b8eac61e914a
+	k8s.io/apimachinery v0.28.6
+	k8s.io/client-go v0.28.6
 )
 
 require (
@@ -82,6 +84,7 @@ require (
 	github.com/docker/go-connections v0.4.0 // indirect
 	github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c // indirect
 	github.com/docker/go-units v0.5.0 // indirect
+	github.com/emicklei/go-restful/v3 v3.11.0 // indirect
 	github.com/felixge/httpsnoop v1.0.4 // indirect
 	github.com/fsnotify/fsnotify v1.7.0 // indirect
 	github.com/go-logfmt/logfmt v0.6.0 // indirect
@@ -104,7 +107,9 @@ require (
 	github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
 	github.com/golang/protobuf v1.5.3 // indirect
 	github.com/golang/snappy v0.0.4 // indirect
+	github.com/google/gnostic-models v0.6.8 // indirect
 	github.com/google/go-cmp v0.6.0 // indirect
+	github.com/google/gofuzz v1.2.0 // indirect
 	github.com/google/pprof v0.0.0-20240117000934-35fc243c5815 // indirect
 	github.com/google/uuid v1.5.0 // indirect
 	github.com/grafana/regexp v0.0.0-20221123153739-15dc172cd2db // indirect
@@ -134,6 +139,7 @@ require (
 	github.com/moby/term v0.5.0 // indirect
 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
 	github.com/modern-go/reflect2 v1.0.2 // indirect
+	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
 	github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
 	github.com/oklog/ulid v1.3.1 // indirect
 	github.com/opencontainers/go-digest v1.0.0 // indirect
@@ -177,6 +183,7 @@ require (
 	golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
 	golang.org/x/oauth2 v0.16.0 // indirect
 	golang.org/x/sync v0.6.0 // indirect
+	golang.org/x/term v0.18.0 // indirect
 	golang.org/x/text v0.14.0 // indirect
 	golang.org/x/tools v0.19.0 // indirect
 	google.golang.org/appengine v1.6.8 // indirect
@@ -184,12 +191,16 @@ require (
 	google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac // indirect
 	google.golang.org/grpc v1.61.0 // indirect
 	google.golang.org/protobuf v1.32.0 // indirect
+	gopkg.in/inf.v0 v0.9.1 // indirect
 	gopkg.in/ini.v1 v1.67.0 // indirect
 	gopkg.in/yaml.v3 v3.0.1 // indirect
-	k8s.io/apimachinery v0.28.6 // indirect
-	k8s.io/client-go v0.28.6 // indirect
+	k8s.io/api v0.28.6 // indirect
 	k8s.io/klog/v2 v2.120.1 // indirect
+	k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
 	k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
+	sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
+	sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
+	sigs.k8s.io/yaml v1.3.0 // indirect
 )
 
 replace (

+ 7 - 1
go.sum

@@ -356,7 +356,6 @@ github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8E
 github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
 github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
 github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
-github.com/emicklei/go-restful v2.16.0+incompatible h1:rgqiKNjTnFQA6kkhFe16D8epTksy9HQ1MyrbDXSdYhM=
 github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
 github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
 github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@@ -468,6 +467,8 @@ github.com/go-resty/resty/v2 v2.11.0/go.mod h1:iiP/OpA0CkcL3IGt1O0+/SIItFUbkkyw5
 github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
 github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
+github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
 github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg=
 github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
 github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0=
@@ -863,6 +864,9 @@ github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+
 github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg=
 github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
+github.com/onsi/ginkgo/v2 v2.9.4 h1:xR7vG4IXt5RWx6FfIjyAtsoMAtnc3C/rFXBBd2AjZwE=
+github.com/onsi/ginkgo/v2 v2.9.4/go.mod h1:gCQYp2Q+kSoIj7ykSVb9nskRSsR6PUj4AiLywzIhbKM=
 github.com/onsi/gomega v0.0.0-20151007035656-2152b45fa28a/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
 github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
 github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
@@ -870,6 +874,8 @@ github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
 github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
 github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA=
 github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc=
+github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE=
+github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg=
 github.com/opencontainers/go-digest v0.0.0-20170106003457-a6d0ee40d420/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
 github.com/opencontainers/go-digest v0.0.0-20180430190053-c9281466c8b2/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
 github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=

+ 68 - 0
kube/client.go

@@ -0,0 +1,68 @@
+package kube
+
+import (
+	"context"
+	"fmt"
+	"github.com/coroot/coroot-node-agent/kube/transfer"
+	"github.com/sirupsen/logrus"
+	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/rest"
+	"strings"
+)
+
+var CwK8sClient *transfer.CwK8sClient
+
+func GetKubeClient() (*transfer.CwK8sClient, error) {
+	if CwK8sClient == nil {
+		return CwK8sClient, fmt.Errorf("K8sClient is nil")
+	}
+	return CwK8sClient, nil
+}
+
+func NewKubeClient() (*transfer.CwK8sClient, error) {
+	if CwK8sClient != nil {
+		return CwK8sClient, nil
+	}
+	ctx := context.Background()
+	config, err := rest.InClusterConfig()
+	if err != nil {
+		return nil, err
+	}
+	config.TLSClientConfig.Insecure = false
+	// creates the clientset
+	clientset, err := kubernetes.NewForConfig(config)
+	if err != nil {
+		logrus.WithError(err).Error("[kube] connect init error")
+		return nil, err
+	}
+	cwk8sclient := transfer.NewCwK8sClient(ctx, clientset)
+	go cwk8sclient.InformerSharedfactory.Start(make(chan struct{}, 0))
+	CwK8sClient = cwk8sclient
+	logrus.Info("[kube] connect init success")
+	return cwk8sclient, nil
+}
+
+func GetWorkload(ns, pod string) (string, error) {
+	client, err := GetKubeClient()
+	if err != nil {
+		return "", err
+	}
+	podsItems, err := client.InformerSharedfactory.Core().V1().Pods().Lister().Pods(ns).Get(pod)
+	if err != nil {
+		return "", err
+	}
+	if len(podsItems.ObjectMeta.OwnerReferences) > 0 {
+		ownerReferences := podsItems.ObjectMeta.OwnerReferences[0]
+		kind := ownerReferences.Kind
+		workload := ownerReferences.Name
+		switch kind {
+		case "ReplicaSet":
+			lastDash := strings.LastIndex(workload, "-")
+			if lastDash != -1 {
+				workload = workload[:lastDash]
+			}
+		}
+		return workload, nil
+	}
+	return "", fmt.Errorf("workload not found")
+}

+ 30 - 0
kube/transfer/transfer.go

@@ -0,0 +1,30 @@
+package transfer
+
+import (
+	"context"
+	"k8s.io/client-go/informers"
+	"k8s.io/client-go/kubernetes"
+	"time"
+)
+
+type CwK8sClient struct {
+	K8sClient             *kubernetes.Clientset
+	InformerSharedfactory informers.SharedInformerFactory
+	ctx                   context.Context
+}
+
+func NewCwK8sClient(ctx context.Context, client *kubernetes.Clientset) *CwK8sClient {
+	sharedInformer := informers.NewSharedInformerFactory(client, 30*time.Second)
+	sharedInformer.Core().V1().Pods().Informer()
+	sharedInformer.Core().V1().Nodes().Informer()
+	sharedInformer.Core().V1().Namespaces().Informer()
+	sharedInformer.Apps().V1().StatefulSets().Informer()
+	sharedInformer.Apps().V1().DaemonSets().Informer()
+	sharedInformer.Apps().V1().Deployments().Informer()
+	sharedInformer.Apps().V1().ReplicaSets().Informer()
+	return &CwK8sClient{
+		K8sClient:             client,
+		ctx:                   ctx,
+		InformerSharedfactory: sharedInformer,
+	}
+}

+ 8 - 0
main.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"github.com/coroot/coroot-node-agent/kube"
 	"github.com/coroot/coroot-node-agent/utils"
 	"github.com/coroot/coroot-node-agent/utils/enums"
 	log "github.com/sirupsen/logrus"
@@ -153,6 +154,13 @@ func main() {
 	tracing.Init(machineId, hostname, version)
 	logs.Init(machineId, hostname, version)
 
+	if *flags.RunInContainer {
+		_, err = kube.NewKubeClient()
+		if err != nil {
+			log.WithError(err).Errorf("Failed to init kube client.")
+		}
+	}
+
 	registry := prometheus.NewRegistry()
 	registerer := prometheus.WrapRegistererWith(prometheus.Labels{"machine_id": machineId}, registry)