Переглянути джерело

Fixed #TSB-24556 新增ns等tag

Fixed #TASK_GK-2944 pod销毁时删除标签问题
Carl 2 роки тому
батько
коміт
8d899789e3
5 змінених файлів з 159 додано та 16 видалено
  1. 13 2
      containers/container.go
  2. 1 1
      containers/metrics.go
  3. 32 13
      containers/registry.go
  4. 31 0
      containers/registry_apm.go
  5. 82 0
      main.go

+ 13 - 2
containers/container.go

@@ -24,7 +24,7 @@ import (
 )
 
 var (
-	gcInterval  = 10 * time.Minute
+	gcInterval  = 1 * time.Minute
 	pingTimeout = 300 * time.Millisecond
 )
 
@@ -90,11 +90,22 @@ type PidFd struct {
 	Fd  uint64
 }
 
+type K8sContainer struct {
+	ns            string
+	podName       string
+	podId         string
+	workload      string
+	containerName string
+	pid           string
+}
+
 type Container struct {
 	id       ContainerID
 	cgroup   *cgroup.Cgroup
 	metadata *ContainerMetadata
 
+	K8sContainer
+
 	processes map[uint32]*Process
 
 	startedAt time.Time
@@ -277,7 +288,7 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 	}
 
 	for addr, open := range c.getListens() {
-		ch <- gauge(metrics.NetListenInfo, float64(open), addr.String(), "")
+		ch <- gauge(metrics.NetListenInfo, float64(open), addr.String())
 	}
 	for proxy, addrs := range c.getProxiedListens() {
 		for addr := range addrs {

+ 1 - 1
containers/metrics.go

@@ -69,7 +69,7 @@ var metrics = struct {
 	DiskWriteOps:   metric("container_resources_disk_writes_total", "Total number of writes completed successfully by the container", "mount_point", "device", "volume"),
 	DiskWriteBytes: metric("container_resources_disk_written_bytes_total", "Total number of bytes written to the disk by the container", "mount_point", "device", "volume"),
 
-	NetListenInfo:         metric("container_net_tcp_listen_info", "Listen address of the container", "listen_addr", "proxy"),
+	NetListenInfo:         metric("container_net_tcp_listen_info", "Listen address of the container", "listen_addr"),
 	NetConnectsSuccessful: metric("container_net_tcp_successful_connects_total", "Total number of successful TCP connects", "destination", "actual_destination"),
 	NetConnectsFailed:     metric("container_net_tcp_failed_connects_total", "Total number of failed TCP connects", "destination"),
 	NetConnectionsActive:  metric("container_net_tcp_active_connections", "Number of active outbound connections used by the container", "destination", "actual_destination"),

+ 32 - 13
containers/registry.go

@@ -153,7 +153,11 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 						delete(r.containersByPid, pid)
 					}
 				}
-				if ok := prometheus.WrapRegistererWith(prometheus.Labels{"container_id": string(id)}, r.reg).Unregister(c); !ok {
+				if ok := prometheus.WrapRegistererWith(setLabels(string(id),
+					c.K8sContainer.ns,
+					c.K8sContainer.podName,
+					c.K8sContainer.containerName,
+					c.K8sContainer.pid), r.reg).Unregister(c); !ok {
 					klog.Warningln("failed to unregister container:", id)
 				}
 				delete(r.containersById, id)
@@ -277,7 +281,8 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
 		klog.Warningf("failed to get container metadata for pid %d -> %s: %s", pid, cg.Id, err)
 		return nil
 	}
-	id := calcId(cg, md)
+	// add ns/workload/podname
+	id, extensionTag := calcId(cg, md)
 	klog.Infof("calculated container id %d -> %s -> %s", pid, cg.Id, id)
 	if id == "" {
 		if cg.Id == "/init.scope" && pid != 1 {
@@ -299,6 +304,7 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
 				c.nsConntrack = nil
 			}
 		}
+		setK8sTag(c, extensionTag, pid)
 		r.containersByPid[pid] = c
 		r.containersByCgroupId[cg.Id] = c
 		return c
@@ -310,7 +316,15 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
 	}
 
 	klog.InfoS("detected a new container", "pid", pid, "cg", cg.Id, "id", id)
-	if err := prometheus.WrapRegistererWith(prometheus.Labels{"container_id": string(id)}, r.reg).Register(c); err != nil {
+	// add ns/workload/podname/pid/ctype
+	//sType := fmt.Sprintf("%d", cg.ContainerType)
+
+	setK8sTag(c, extensionTag, pid)
+	if err := prometheus.WrapRegistererWith(setLabels(string(id),
+		extensionTag[Namespace],
+		extensionTag[PodName],
+		extensionTag[ProcessName],
+		fmt.Sprintf("%d", pid)), r.reg).Register(c); err != nil {
 		klog.Warningln("failed to register container:", err)
 		return nil
 	}
@@ -320,20 +334,21 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
 	return c
 }
 
-func calcId(cg *cgroup.Cgroup, md *ContainerMetadata) ContainerID {
+func calcId(cg *cgroup.Cgroup, md *ContainerMetadata) (ContainerID, map[string]string) {
+	extensionTag := map[string]string{Namespace: "", Workload: "", PodName: "", ProcessName: ""}
 	if cg.ContainerType == cgroup.ContainerTypeSystemdService {
 		if strings.HasPrefix(cg.ContainerId, "/system.slice/crio-conmon-") {
-			return ""
+			return "", extensionTag
 		}
-		return ContainerID(cg.ContainerId)
+		return ContainerID(cg.ContainerId), extensionTag
 	}
 	switch cg.ContainerType {
 	case cgroup.ContainerTypeDocker, cgroup.ContainerTypeContainerd, cgroup.ContainerTypeSandbox, cgroup.ContainerTypeCrio:
 	default:
-		return ""
+		return "", extensionTag
 	}
 	if cg.ContainerId == "" {
-		return ""
+		return "", extensionTag
 	}
 	if md.labels["io.kubernetes.pod.name"] != "" {
 		pod := md.labels["io.kubernetes.pod.name"]
@@ -343,9 +358,13 @@ func calcId(cg *cgroup.Cgroup, md *ContainerMetadata) ContainerID {
 			name = "sandbox"
 		}
 		if name == "" || name == "POD" { // skip pause containers
-			return ""
+			return "", extensionTag
 		}
-		return ContainerID(fmt.Sprintf("/k8s/%s/%s/%s", namespace, pod, name))
+		extensionTag[Namespace] = namespace
+		extensionTag[Workload] = ""
+		extensionTag[PodName] = pod
+		extensionTag[ProcessName] = name
+		return ContainerID(fmt.Sprintf("/k8s/%s/%s/%s", namespace, pod, name)), extensionTag
 	}
 	if taskNameParts := strings.SplitN(md.labels["com.docker.swarm.task.name"], ".", 3); len(taskNameParts) == 3 {
 		namespace := md.labels["com.docker.stack.namespace"]
@@ -356,13 +375,13 @@ func calcId(cg *cgroup.Cgroup, md *ContainerMetadata) ContainerID {
 		if namespace == "" {
 			namespace = "_"
 		}
-		return ContainerID(fmt.Sprintf("/swarm/%s/%s/%s", namespace, service, taskNameParts[1]))
+		return ContainerID(fmt.Sprintf("/swarm/%s/%s/%s", namespace, service, taskNameParts[1])), extensionTag
 	}
 	if md.name == "" { // should be "pure" dockerd container here
 		klog.Warningln("empty dockerd container name for:", cg.ContainerId)
-		return ""
+		return "", extensionTag
 	}
-	return ContainerID("/docker/" + md.name)
+	return ContainerID("/docker/" + md.name), extensionTag
 }
 
 func getContainerMetadata(cg *cgroup.Cgroup) (*ContainerMetadata, error) {

+ 31 - 0
containers/registry_apm.go

@@ -0,0 +1,31 @@
+package containers
+
+import (
+	"fmt"
+	"github.com/prometheus/client_golang/prometheus"
+)
+
+const (
+	Namespace   = "namespace"
+	PodName     = "pod_name"
+	Workload    = "workload"
+	ProcessName = "process_name"
+)
+
+func setLabels(container_id, ns, pod_name, process_name, pid string) prometheus.Labels {
+	return map[string]string{
+		"container_id": container_id,
+		"ns":           ns,
+		"pod_name":     pod_name,
+		"process_name": process_name,
+		"pid":          pid,
+	}
+}
+
+func setK8sTag(c *Container, tag map[string]string, pid uint32) {
+	sPid := fmt.Sprintf("%d", pid)
+	c.K8sContainer.ns = tag[Namespace]
+	c.K8sContainer.podName = tag[PodName]
+	c.K8sContainer.containerName = tag[ProcessName]
+	c.K8sContainer.pid = sPid
+}

+ 82 - 0
main.go

@@ -9,6 +9,9 @@ import (
 	"runtime"
 	"strings"
 
+	"encoding/json"
+	dto "github.com/prometheus/client_model/go"
+
 	"github.com/coroot/coroot-node-agent/common"
 	"github.com/coroot/coroot-node-agent/containers"
 	"github.com/coroot/coroot-node-agent/flags"
@@ -95,6 +98,16 @@ func whitelistNodeExternalNetworks() {
 	}
 }
 
+type MetricItemData struct {
+	Label map[string]string `json:"metric_tags"`
+	Value any               `json:"value"`
+}
+
+type MetricData struct {
+	MetricKey string           `json:"metric_key"`
+	Metric    []MetricItemData `json:"metric"`
+}
+
 func main() {
 	klog.LogToStderr(false)
 	klog.SetOutput(&RateLimitedLogOutput{limiter: rate.NewLimiter(rate.Limit(*flags.LogPerSecond), *flags.LogBurst)})
@@ -141,12 +154,81 @@ func main() {
 
 	profiling.Start()
 	defer profiling.Stop()
+	// 创建一个/metrics路由处理函数
+	metricsHandler := func(w http.ResponseWriter, r *http.Request) {
+		// 从注册表中获取指标数据
+		metrics, err := registry.Gather()
+		if err != nil {
+			// 错误处理
+			http.Error(w, err.Error(), http.StatusInternalServerError)
+			return
+		}
+		var Data []MetricData
+		for _, metric := range metrics {
+			if metric.GetName() != "container_net_tcp_successful_connects_total" &&
+				metric.GetName() != "container_net_tcp_failed_connects_total" &&
+				metric.GetName() != "container_net_tcp_retransmits_total" &&
+				metric.GetName() != "container_net_tcp_listen_info" &&
+				metric.GetName() != "container_http_requests_total" &&
+				metric.GetName() != "container_http_requests_duration_seconds_total" &&
+				metric.GetName() != "container_application_type" {
+				continue
+			}
+
+			var item MetricData
+			var itemOther MetricData
+			item.MetricKey = metric.GetName()
+
+			for _, m := range metric.GetMetric() {
+				metricItem := MetricItemData{}
+				label := make(map[string]string)
+				for _, l := range m.GetLabel() {
+					label[l.GetName()] = l.GetValue()
+				}
+				metricItem.Label = label
+				switch metric.GetType() {
+				case dto.MetricType_COUNTER:
+					metricItem.Value = m.GetCounter().GetValue()
+					item.Metric = append(item.Metric, metricItem)
+				case dto.MetricType_GAUGE:
+					metricItem.Value = m.GetGauge().GetValue()
+					item.Metric = append(item.Metric, metricItem)
+				case dto.MetricType_HISTOGRAM:
+					item.MetricKey = metric.GetName() + "_sum"
+					metricItem.Value = m.GetHistogram().GetSampleSum()
+					item.Metric = append(item.Metric, metricItem)
+					metricItemOther := MetricItemData{}
+					metricItemOther.Label = label
+
+					itemOther.MetricKey = metric.GetName() + "_count"
+					metricItemOther.Value = m.GetHistogram().GetSampleCount()
+					itemOther.Metric = append(itemOther.Metric, metricItemOther)
+				default:
+					continue
+				}
+			}
+			Data = append(Data, item)
+			if metric.GetType() == dto.MetricType_HISTOGRAM {
+				Data = append(Data, itemOther)
+			}
+		}
+		// 将指标数据转换为JSON格式
+		jsonData, err := json.Marshal(Data)
+		//jsonData, err := json.Marshal(metrics)
+		if err != nil {
+			http.Error(w, err.Error(), http.StatusInternalServerError)
+			return
+		}
+		w.Header().Set("Content-Type", "application/json")
+		w.Write(jsonData)
+	}
 
 	if err := prom.StartAgent(machineId); err != nil {
 		klog.Exitln(err)
 	}
 
 	http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorLog: logger{}, Registry: registerer}))
+	http.HandleFunc("/metrics2", metricsHandler)
 	klog.Infoln("listening on:", *flags.ListenAddress)
 	klog.Errorln(http.ListenAndServe(*flags.ListenAddress, nil))
 }