Преглед на файлове

Merge branch 'dev-k8s' into dev

Carl преди 1 година
родител
ревизия
3787a91067

+ 21 - 14
Dockerfile

@@ -1,14 +1,21 @@
-FROM golang:1.21-bullseye AS builder
-RUN apt update && apt install -y libsystemd-dev
-WORKDIR /tmp/src
-COPY go.mod .
-COPY go.sum .
-RUN go mod download
-COPY . .
-ARG VERSION=unknown
-RUN CGO_ENABLED=1 go build -mod=readonly -ldflags "-X main.version=$VERSION" -o coroot-node-agent .
-
-FROM debian:bullseye
-RUN apt update && apt install -y ca-certificates && apt clean
-COPY --from=builder /tmp/src/coroot-node-agent /usr/bin/coroot-node-agent
-ENTRYPOINT ["coroot-node-agent"]
+FROM ubuntu:22.04
+
+ARG EUSPACE_BASE_PATH=/opt/cloudwise/apm/euspace
+RUN mkdir -p $EUSPACE_BASE_PATH
+
+#拷贝安装目录结构
+ADD ./dist/package_dir/  $EUSPACE_BASE_PATH/
+
+ARG EUSPACE_BIN_PATH=$EUSPACE_BASE_PATH/bin
+
+# 拷贝euspace可执行文件,make直接生成到dist/package_dir/bin/euspace
+#COPY ./euspace $EUSPACE_BIN_PATH/
+
+# 设置工作目录
+WORKDIR $EUSPACE_BIN_PATH
+
+# 设置PATH变量
+ENV PATH=$PATH:$EUSPACE_BIN_PATH
+
+ENTRYPOINT ["euspace"]
+

+ 135 - 0
cloudwise-apm-euspace.yaml

@@ -0,0 +1,135 @@
+apiVersion: v1
+kind: Namespace
+metadata:
+  name: cloudwise
+---
+apiVersion: apps/v1
+kind: DaemonSet
+metadata:
+  name: cloudwise-apm-euspace
+  namespace: cloudwise
+spec:
+  selector:
+    matchLabels:
+      app: cloudwise-apm-euspace
+  template:
+    metadata:
+      annotations:
+        container.apparmor.security.beta.kubernetes.io/cloudwise-apm-euspace: unconfined
+      name: cloudwise-apm-euspace
+      namespace: cloudwise
+      labels:
+        app: cloudwise-apm-euspace
+    spec:
+      hostPID: true
+      nodeSelector:
+        kubernetes.io/os: linux
+        kubernetes.io/arch: amd64
+      containers:
+        - name: cloudwise-apm-euspace
+          image: harbor.cloudwise.com/apm/euspace_dev:1.0
+          imagePullPolicy: Always
+        # imagePullPolicy: IfNotPresent  
+          args: ["--listen", "0.0.0.0:8123", "--cgroupfs-root", "/host/sys/fs/cgroup","--run-in-container"]
+          ports:
+            - containerPort: 8123
+              name: http
+          securityContext:
+            privileged: true
+            runAsUser: 0
+          volumeMounts:
+            - name: sys-fs-cgroup
+              mountPath: /host/sys/fs/cgroup
+              readOnly: true
+            - name: sys-kernel-debug
+              mountPath: /sys/kernel/debug
+              readOnly: true
+            - name: host-usr
+              mountPath: /host/usr
+              readOnly: true
+              mountPropagation: HostToContainer
+            - name: host-var
+              mountPath: /host/var
+              readOnly: false
+              mountPropagation: HostToContainer
+            - name: host-run
+              mountPath: /host/run
+              readOnly: false
+              mountPropagation: HostToContainer
+            - name: host-tmp
+              mountPath: /host/tmp
+              readOnly: false
+              mountPropagation: HostToContainer    
+          env:
+            - name: SEND
+              value: '1'
+            - name: TRACES_ENDPOINT
+              value: 'http://10.0.16.250:18080/docp/api/v2/data/receive'
+            - name: DISABLE_E2E_TRACING
+              value: 'false'
+            - name: DISABLE_STACK_TRACING
+              value: 'true'
+      volumes:
+        - name: sys-fs-cgroup
+          hostPath:
+            path: /sys/fs/cgroup
+        - name: sys-kernel-debug
+          hostPath:
+            path: /sys/kernel/debug
+        - name: host-usr
+          hostPath:
+            path: /usr  
+            type: Directory
+        - name: host-var
+          hostPath:
+            path: /var
+            type: Directory
+        - name: host-run
+          hostPath:
+            path: /run
+            type: Directory
+        - name: host-tmp
+          hostPath:
+            path: /tmp
+            type: Directory   
+---
+kind: ClusterRole
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  name: euspace-agent-role
+rules:
+  - apiGroups: [""]
+    resources:
+      - nodes
+      - namespaces
+      - configmaps
+      - services
+      - pods
+      - replicationcontrollers
+    verbs: ["get", "list", "watch"]
+  - apiGroups: ["apps"]
+    resources:
+      - daemonsets
+      - deployments
+      - replicasets
+      - statefulsets
+    verbs: ["get", "list", "watch"]
+  - apiGroups: ["extensions", "networking.k8s.io"]
+    resources: ["ingresses"]
+    verbs: ["get", "list", "watch"]
+  - apiGroups: ["route.openshift.io"]
+    resources: ["routes"]
+    verbs: ["get", "list", "watch"]
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRoleBinding
+metadata:
+  name: cw-agent-view-binding
+subjects:
+  - kind: ServiceAccount
+    name: default
+    namespace: cloudwise
+roleRef:
+  kind: ClusterRole
+  name: euspace-agent-role
+  apiGroup: rbac.authorization.k8s.io

+ 4 - 3
containers/container.go

@@ -46,6 +46,7 @@ type ContainerMetadata struct {
 	logDecoder  logparser.Decoder
 	hostListens map[string][]netaddr.IPPort
 	networks    map[string]ContainerNetwork
+	rootfs      string
 }
 
 type Delays struct {
@@ -1130,7 +1131,7 @@ func (c *Container) GetCodeTypeFromCache(pid uint32) CodeType {
 		return CodeTypeUnknown
 	}
 	if p.codeType.IsWaitCheck() {
-		p.codeType = GetExeType(pid)
+		p.codeType = GetExeType(pid, c.getRootfs())
 	}
 	return p.codeType
 }
@@ -1172,14 +1173,14 @@ func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) erro
 	if !p.jvmUprobesChecked {
 		tracer.InitKProcInfo(pid, &c.AppInfo)
 
-		libNioProbes, err := tracer.AttachJavaNioReadUprobes(pid, codeType)
+		libNioProbes, err := tracer.AttachJavaNioReadUprobes(pid, codeType, c.getRootfs())
 		if err != nil {
 			klog.Error(err)
 			return err
 		}
 		p.uprobes = append(p.uprobes, libNioProbes...)
 
-		libNetProbes, err := tracer.AttachJavaNetWriteUprobes(pid)
+		libNetProbes, err := tracer.AttachJavaNetWriteUprobes(pid, c.getRootfs())
 		if err != nil {
 			klog.Error(err)
 			return err

+ 12 - 5
containers/container_apm.go

@@ -5,7 +5,9 @@ import (
 	"bytes"
 	"debug/elf"
 	"fmt"
+	"github.com/coroot/coroot-node-agent/flags"
 	"os"
+	"path"
 	"sort"
 	"strconv"
 	"strings"
@@ -23,7 +25,9 @@ import (
 	"inet.af/netaddr"
 )
 
-const TRACE_STATUS = 1
+const (
+	TRACE_STATUS = 1
+)
 
 func (c *Container) getTrace(traceId uint64) (*tracing.Trace, bool) {
 	trace, ok := c.traceMap[traceId]
@@ -195,8 +199,6 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 		}
 
 	case l7.ProtocolDM:
-		fmt.Println("---- onL7RequestApm ProtocolDM  start ---->")
-		fmt.Println("-------dm r.Status :", r.Status)
 		//统计dm的query次数
 		stats.observe(r.Status.String(), "", r.Duration)
 		//是否发送数据
@@ -206,13 +208,11 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 			}
 			query := conn.dmParser.Parse(r.Payload, r.StatementId)
 			apmTrace, err := c.getOrInitTrace(r.TraceId)
-			fmt.Println("-------dm r.TraceId:", r.TraceId)
 			if err == nil {
 				apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest)
 				c.SendEvent(apmTrace, r.TraceId)
 			}
 		}
-		fmt.Println("---- onL7RequestApm ProtocolDM  end <----")
 	case l7.ProtocolMemcached:
 		stats.observe(r.Status.String(), "", r.Duration)
 		if c.l7Attach && c.valuableTrace(r.TraceId) {
@@ -547,3 +547,10 @@ func (c *Container) detachUprobes(pid uint32) {
 		}
 	}
 }
+
+func (c *Container) getRootfs() string {
+	if c.metadata != nil && c.metadata.rootfs != "" {
+		return path.Join(*flags.HostDirPathPrefix, c.metadata.rootfs)
+	}
+	return ""
+}

+ 1 - 0
containers/containerd.go

@@ -65,6 +65,7 @@ func ContainerdInspect(containerID string) (*ContainerMetadata, error) {
 		labels:  c.Labels,
 		image:   c.Image,
 		volumes: map[string]string{},
+		rootfs:  fmt.Sprintf("/run/containerd/io.containerd.runtime.v2.task/k8s.io/%s/rootfs", containerID),
 	}
 
 	var spec oci.Spec

+ 1 - 0
containers/dockerd.go

@@ -56,6 +56,7 @@ func DockerdInspect(containerID string) (*ContainerMetadata, error) {
 		volumes:     map[string]string{},
 		hostListens: map[string][]netaddr.IPPort{},
 		networks:    map[string]ContainerNetwork{},
+		rootfs:      c.GraphDriver.Data["MergedDir"],
 	}
 	for _, m := range c.Mounts {
 		res.volumes[m.Destination] = common.ParseKubernetesVolumeSource(m.Source)

+ 13 - 6
containers/registry.go

@@ -3,6 +3,7 @@ package containers
 import (
 	"bytes"
 	"fmt"
+	"github.com/coroot/coroot-node-agent/kube"
 	. "github.com/coroot/coroot-node-agent/utils"
 	"github.com/coroot/coroot-node-agent/utils/enums"
 	. "github.com/coroot/coroot-node-agent/utils/modelse"
@@ -199,7 +200,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 			runtimeApps := make(map[uint32]AppStatusInfo)
 			for pid, c := range r.containersByPid {
 				if c != nil {
-					if !common.IsOpenFilter() {
+					if c != nil && !common.IsOpenFilter() {
 						verifyAttachConditions := c.verifyAttachConditions(r, pid)
 						if verifyAttachConditions {
 							err = c.RegisterAppInfo(r, pid)
@@ -284,6 +285,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 				}
 				if ok := prometheus.WrapRegistererWith(setLabels(string(id),
 					c.K8sContainer.ns,
+					c.K8sContainer.workload,
 					c.K8sContainer.podName,
 					c.K8sContainer.containerName,
 					c.K8sContainer.pid), r.reg).Unregister(c); !ok {
@@ -408,10 +410,12 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 					}
 				}
 			case ebpftracer.EventTypeL7Request:
+
 				//fmt.Println("e.L7Request Payload:", string(e.L7Request.Payload))
 				if e.L7Request == nil {
 					continue
 				}
+
 				if c := r.containersByPid[e.Pid]; c != nil {
 					//fmt.Println("EventTypeL7Request", e.Pid, c.checkL7AttachReady())
 					//a, _ := json.Marshal(e.L7Request)
@@ -522,6 +526,7 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
 	setK8sTag(c, extensionTag, pid)
 	if err := prometheus.WrapRegistererWith(setLabels(string(id),
 		extensionTag[Namespace],
+		extensionTag[Workload],
 		extensionTag[PodName],
 		extensionTag[ProcessName],
 		fmt.Sprintf("%d", pid)), r.reg).Register(c); err != nil {
@@ -537,7 +542,8 @@ func (r *Registry) getOrCreateContainer(pid uint32) *Container {
 func calcId(cg *cgroup.Cgroup, md *ContainerMetadata, pid uint32) (ContainerID, map[string]string) {
 	// 卡一下防止概率性获取为bash
 	time.Sleep(1 * time.Millisecond)
-	extensionTag := map[string]string{Namespace: "", Workload: "", PodName: "", ProcessName: ""}
+	procName := proc.GetProcName(pid)
+	extensionTag := map[string]string{Namespace: "", Workload: "", PodName: "", ProcessName: procName}
 	if cg.ContainerType == cgroup.ContainerTypeSystemdService {
 		if strings.HasPrefix(cg.ContainerId, "/system.slice/crio-conmon-") {
 			return "", extensionTag
@@ -545,8 +551,7 @@ func calcId(cg *cgroup.Cgroup, md *ContainerMetadata, pid uint32) (ContainerID,
 		return ContainerID(cg.ContainerId), extensionTag
 	}
 	if cg.ContainerType == cgroup.ContainerTypeStandaloneProcess {
-		procName := proc.GetProcName(pid)
-		extensionTag[ProcessName] = procName
+		//extensionTag[ProcessName] = procName
 		return ContainerID(fmt.Sprintf("/%s/%s/%d", "standalone", procName, pid)), extensionTag
 	}
 	switch cg.ContainerType {
@@ -568,9 +573,11 @@ func calcId(cg *cgroup.Cgroup, md *ContainerMetadata, pid uint32) (ContainerID,
 			return "", extensionTag
 		}
 		extensionTag[Namespace] = namespace
-		extensionTag[Workload] = ""
+		if *flags.RunInContainer {
+			extensionTag[Workload], _ = kube.GetWorkload(namespace, pod)
+		}
 		extensionTag[PodName] = pod
-		extensionTag[ProcessName] = name
+		//extensionTag[ProcessName] = name
 		return ContainerID(fmt.Sprintf("/k8s/%s/%s/%s", namespace, pod, name)), extensionTag
 	}
 	if taskNameParts := strings.SplitN(md.labels["com.docker.swarm.task.name"], ".", 3); len(taskNameParts) == 3 {

+ 10 - 7
containers/registry_apm.go

@@ -12,19 +12,21 @@ import (
 )
 
 const (
-	Namespace   = "namespace"
+	Namespace   = "ns"
 	PodName     = "pod_name"
 	Workload    = "workload"
 	ProcessName = "process_name"
+	ContainerId = "container_id"
 )
 
-func setLabels(container_id, ns, pod_name, process_name, pid string) prometheus.Labels {
+func setLabels(container_id, ns, workload, pod_name, process_name, pid string) prometheus.Labels {
 	return map[string]string{
-		"container_id": container_id,
-		"ns":           ns,
-		"pod_name":     pod_name,
-		"process_name": process_name,
-		"pid":          pid,
+		ContainerId: container_id,
+		Namespace:   ns,
+		Workload:    workload,
+		PodName:     pod_name,
+		ProcessName: process_name,
+		"pid":       pid,
 	}
 }
 
@@ -33,6 +35,7 @@ func setK8sTag(c *Container, tag map[string]string, pid uint32) {
 	c.K8sContainer.ns = tag[Namespace]
 	c.K8sContainer.podName = tag[PodName]
 	c.K8sContainer.containerName = tag[ProcessName]
+	c.K8sContainer.workload = tag[Workload]
 	c.K8sContainer.pid = sPid
 }
 

+ 64 - 24
containers/util.go

@@ -3,21 +3,20 @@ package containers
 import (
 	"debug/elf"
 	"fmt"
+	"github.com/coroot/coroot-node-agent/flags"
 	. "github.com/coroot/coroot-node-agent/utils/modelse"
 	klog "github.com/sirupsen/logrus"
-	"io/ioutil"
 	"os"
 	"os/exec"
 	"regexp"
 	"runtime"
 	"strings"
-
 )
 
 var libjvmRegex = regexp.MustCompile(`.*/libjvm\.so`)
 
-func GetExeType(pid uint32) CodeType {
-	mapsFilePath := fmt.Sprintf("/proc/%d/maps", pid)
+func GetExeType(pid uint32, rootfs string) CodeType {
+	mapsFilePath := fmt.Sprintf("%sproc/%d/maps", "/", pid)
 
 	data, err := os.ReadFile(mapsFilePath)
 	if err != nil {
@@ -29,18 +28,18 @@ func GetExeType(pid uint32) CodeType {
 
 	if libjvmRegex.MatchString(content) {
 		//fmt.Println("is java process")
-		if isJavaAotProcess(pid) {
+		if isJavaAotProcess(pid, rootfs) {
 			//fmt.Println("is javaAot process")
 			return CodeTypeJavaAot
 		}
 		return CodeTypeJava
-	} else if isJavaAotProcess(pid) {
+	} else if isJavaAotProcess(pid, rootfs) {
 		//fmt.Println("is javaAot process")
 		return CodeTypeJavaAot
-	} else if isGoProcess(pid) {
+	} else if isGoProcess(pid, rootfs) {
 		//fmt.Println("is go process")
 		return CodeTypeGo
-	} else if isNetCoreProcess(pid) {
+	} else if isNetCoreProcess(pid, rootfs) {
 		//	fmt.Println("is netcore process")
 		return CodeTypeNetCoreAot
 	}
@@ -48,19 +47,32 @@ func GetExeType(pid uint32) CodeType {
 }
 
 // isJavaAotProcess checks if the process with the given PID is a GraalVM native image
-func isJavaAotProcess(pid uint32) bool {
+func isJavaAotProcess(pid uint32, rootfs string) bool {
 	// Get the executable path for the given PID
-	exePath, err := os.Readlink(fmt.Sprintf("/proc/%d/exe", pid))
+	exePath, err := os.Readlink(fmt.Sprintf("%sproc/%d/exe", "/", pid))
 	if err != nil {
-		fmt.Printf("Error reading executable path for PID %d: %v\n", pid, err)
+		//fmt.Printf("Error reading executable path for PID %d: %v\n", pid, err)
+		klog.WithError(err).Errorf("isJavaAotProcess,failed to reading executable path for PID [%d]", pid)
 		return false
 	}
 
 	// Read the content of the executable file
-	content, err := ioutil.ReadFile(exePath)
+	pathPrefix := rootfs
+	if pathPrefix == "" && *flags.RunInContainer {
+		pathPrefix = *flags.HostDirPathPrefix
+	}
+	content, err := os.ReadFile(fmt.Sprintf("%s%s", pathPrefix, exePath))
 	if err != nil {
-		fmt.Printf("Error reading executable file for PID %d: %v\n", pid, err)
-		return false
+		if _, err = os.Stat(exePath); err == nil {
+			content, err = os.ReadFile(exePath)
+			if err != nil {
+				klog.WithError(err).Errorf("isJavaAotProcess,failed to reading executable file local for PID [%d]", pid)
+				return false
+			}
+		} else {
+			klog.WithError(err).Errorf("isJavaAotProcess,failed to reading executable file for PID [%d]", pid)
+			return false
+		}
 	}
 
 	// Check if the file contains the "graal_attach_thread" string
@@ -71,16 +83,30 @@ func isJavaAotProcess(pid uint32) bool {
 	return false
 }
 
-func isNetCoreProcess(pid uint32) bool {
+func isNetCoreProcess(pid uint32, rootfs string) bool {
 	path, err := getProcessPath(pid)
 	if err != nil {
-		fmt.Printf("无法获取进程路径:%s\n", err)
+		//fmt.Printf("无法获取进程路径:%s\n", err)
+		klog.WithError(err).Errorf("isNetCoreProcess,failed to open as elf binary path for PID [%d]", pid)
 		return false
 	}
-	ef, err := elf.Open(path)
+
+	pathPrefix := rootfs
+	if pathPrefix == "" && *flags.RunInContainer {
+		pathPrefix = *flags.HostDirPathPrefix
+	}
+	ef, err := elf.Open(pathPrefix + path)
 	if err != nil {
-		fmt.Println("failed to open as elf binary", err)
-		return false
+		if _, err = os.Stat(path); err == nil {
+			ef, err = elf.Open(path)
+			if err != nil {
+				klog.WithError(err).Errorf("isNetCoreProcess,failed to open as elf binary file local for PID [%d]", pid)
+				return false
+			}
+		} else {
+			klog.WithError(err).Errorf("isNetCoreProcess,failed to open as elf binary file for PID [%d]", pid)
+			return false
+		}
 	}
 	defer ef.Close()
 
@@ -92,16 +118,30 @@ func isNetCoreProcess(pid uint32) bool {
 	return false
 }
 
-func isGoProcess(pid uint32) bool {
+func isGoProcess(pid uint32, rootfs string) bool {
 	path, err := getProcessPath(pid)
 	if err != nil {
-		fmt.Printf("无法获取进程路径:%s\n", err)
+		klog.WithError(err).Errorf("isGoProcess,failed to open as elf binary path for PID [%d]", pid)
 		return false
 	}
-	ef, err := elf.Open(path)
+
+	pathPrefix := rootfs
+	if pathPrefix == "" && *flags.RunInContainer {
+		pathPrefix = *flags.HostDirPathPrefix
+	}
+
+	ef, err := elf.Open(pathPrefix + path)
 	if err != nil {
-		fmt.Println("failed to open as elf binary", err)
-		return false
+		if _, err = os.Stat(path); err == nil {
+			ef, err = elf.Open(path)
+			if err != nil {
+				klog.WithError(err).Errorf("isGoProcess,failed to open as elf binary file local for PID [%d]", pid)
+				return false
+			}
+		} else {
+			klog.WithError(err).Errorf("isGoProcess,failed to open as elf binary file for PID [%d]", pid)
+			return false
+		}
 	}
 	defer ef.Close()
 

+ 1 - 0
ebpftracer/ebpf/include/bpf_base.h

@@ -106,6 +106,7 @@ static long (*bpf_probe_read_user_str)(void *dst, __u32 size, const void *unsafe
 #define PT_REGS_PARM4(x) ((x)->rcx)
 #define PT_REGS_PARM5(x) ((x)->r8)
 #define PT_REGS_PARM6(x) ((x)->r9)
+#define PT_REGS_12(x) ((x)->r12)
 #define PT_REGS_RET(x) ((x)->rsp)
 #define PT_REGS_FP(x) ((x)->rbp)
 #define PT_REGS_RC(x) ((x)->rax)

+ 4 - 1
ebpftracer/ebpf/l7/l7.c

@@ -248,7 +248,7 @@ void send_event(void *ctx, struct l7_event *e, __u32 pid, __u64 fd) {
     e->pid = pid;
     long error = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
 	if (error ==0){
-		cw_add_event_count(e->trace_id);
+	        cw_add_event_count(e->trace_id);
 	}
 }
 
@@ -593,6 +593,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         e->method = METHOD_HTTP2_CLIENT_FRAMES;
         e->duration = bpf_ktime_get_ns();
         e->payload_size = size;
+        e->trace_id = get_apm_trace_id(pid, tid);
         COPY_PAYLOAD(e->payload, size, payload);
         send_event(ctx, e, k.pid, k.fd);
         return 0;
@@ -681,6 +682,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
     e->method = METHOD_UNKNOWN;
     e->statement_id = 0;
     e->payload_size = 0;
+    e->trace_id = 0;
 	__u8 b[8];
 	bpf_read(payload, b);
 //    __u32 k0 = 0;
@@ -796,6 +798,7 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
             e->method = METHOD_HTTP2_SERVER_FRAMES;
             e->duration = bpf_ktime_get_ns();
             e->payload_size = ret;
+            e->trace_id = get_apm_trace_id(pid, tid);
             COPY_PAYLOAD(e->payload, ret, payload);
             send_event(ctx, e, k.pid, k.fd);
             return 0;

+ 1 - 1
ebpftracer/ebpf/utrace/java/include/java_common.h

@@ -8,7 +8,7 @@
 
 #if defined(__x86_64__)
 #define PT_LEN_REGS(x) (PT_REGS_RBP(x) - 0x10058);
-#define PT_HTTP_RESP_REGS(x) PT_REGS_PARM2(x)
+#define PT_HTTP_RESP_REGS(x) PT_REGS_12(x)
 #elif defined(__aarch64__)
 #define PT_LEN_REGS(x) PT_REGS_SP(x);
 #define PT_HTTP_RESP_REGS(ctx) ({                \

+ 28 - 5
ebpftracer/jvm.go

@@ -3,6 +3,7 @@ package ebpftracer
 import (
 	"errors"
 	"github.com/coroot/coroot-node-agent/ebpftracer/tracer/inject"
+	"github.com/coroot/coroot-node-agent/flags"
 	"github.com/coroot/coroot-node-agent/utils"
 	. "github.com/coroot/coroot-node-agent/utils/modelse"
 	klog "github.com/sirupsen/logrus"
@@ -26,16 +27,16 @@ const (
 	symbolsocketRead0 = "Java_sun_nio_ch_FileDispatcherImpl_read0"
 )
 
-func (t *Tracer) AttachJavaNioReadUprobes(pid uint32, codeType CodeType) ([]link.Link, error) {
+func (t *Tracer) AttachJavaNioReadUprobes(pid uint32, codeType CodeType, rootfs string) ([]link.Link, error) {
 	if t.DisableL7Tracing() {
 		return nil, nil
 	}
 	var links []link.Link
-	var bpath = ""
+	var bpath string
 
 	// JavaAOT 逻辑
 	if codeType.IsJavaAotCode() {
-		bpath = proc.Path(uint32(pid), "exe")
+		bpath = proc.Path(pid, "exe")
 	} else {
 		version := UsePIDToGetJDKVersion(pid)
 		klog.Infof("[attach] java version is %s", version)
@@ -44,15 +45,17 @@ func (t *Tracer) AttachJavaNioReadUprobes(pid uint32, codeType CodeType) ([]link
 			return nil, errors.New("can not find nio.so")
 		}
 	}
+	bpath = rootfs + bpath
 	klog.Infof("[attach] find the nio.so path is  %s", bpath)
 	ex, err := link.OpenExecutable(bpath)
 	if err != nil {
+		klog.Errorf("[attach] open executable: %v", err)
 		return nil, err
 	}
 	ef, err := elf.Open(bpath)
 	if err != nil {
+		klog.Errorf("[attach] open elf: %v", err)
 		return nil, err
-		//PID:    int(pid),
 	}
 	defer ef.Close()
 
@@ -120,6 +123,7 @@ func (t *Tracer) AttachJavaNioReadUprobes(pid uint32, codeType CodeType) ([]link
 				if err != nil {
 					return nil, fmt.Errorf("failed to attach uprobe_ret_Java_sun_nio_ch_FileDispatcherImpl_read0 uprobe")
 				}
+				klog.Infof("[attach] java symbol offset is  %s", offset)
 				links = append(links, l)
 			}
 		}
@@ -132,7 +136,7 @@ func (t *Tracer) AttachJavaNioReadUprobes(pid uint32, codeType CodeType) ([]link
 	return links, nil
 }
 
-func (t *Tracer) AttachJavaNetWriteUprobes(pid uint32) ([]link.Link, error) {
+func (t *Tracer) AttachJavaNetWriteUprobes(pid uint32, rootfs string) ([]link.Link, error) {
 	if t.DisableL7Tracing() {
 		return nil, nil
 	}
@@ -141,6 +145,22 @@ func (t *Tracer) AttachJavaNetWriteUprobes(pid uint32) ([]link.Link, error) {
 		return nil, nil
 	}
 	cwJvmLibPath := utils.GetDefaultLibsPath("jvm", "cwlibnet.so")
+	tmpSo := "/tmp/cwlibnet.so"
+	procLoadPath := cwJvmLibPath
+
+	pathPrefix := rootfs
+	if pathPrefix == "" && *flags.RunInContainer {
+		pathPrefix = *flags.HostDirPathPrefix
+	}
+	if pathPrefix != "" {
+		// copy
+		size, err := utils.CopyFile(cwJvmLibPath, rootfs+tmpSo)
+		if err != nil || size == 0 {
+			return nil, fmt.Errorf("[jvm] Failed to copy cwlibnet.so %v", err)
+		}
+		cwJvmLibPath = rootfs + tmpSo
+		procLoadPath = tmpSo
+	}
 	//inject
 	originFunc := "Java_java_net_SocketOutputStream_socketWrite0"
 
@@ -174,13 +194,16 @@ func (t *Tracer) AttachJavaNetWriteUprobes(pid uint32) ([]link.Link, error) {
 			FuncSymbol: inject.InstInfo{
 				SymName: uProbeData.Func,
 			},
+			ProcLoadPath: procLoadPath,
 		},
 		RecodeInfo: inject.LibNetInfo{FuncSymbol: inject.InstInfo{SymName: "CW_RECODE_" + originFunc}},
 		Uprobe:     uProbeData,
+		Rootfs:     rootfs,
 	}
 
 	err := inject.JvmInject(jvmInjector)
 	if err != nil {
+		klog.WithError(err).Errorf("[jvm] inject.JvmInject error.")
 		return nil, err
 	}
 

+ 1 - 1
ebpftracer/tracer/inject/include/hotpatch.h

@@ -108,7 +108,7 @@ int hotpatch_detach(hotpatch_t *);
 int hotpatch_set_execution_pointer(hotpatch_t *, uintptr_t location);
 
 
-int cw_inject_library(int pid, int verbose, char *dll);
+int cw_inject_library(int pid, int verbose, char *dll, char *rootfs);
 
 #ifdef __cplusplus
 } /* end of extern C */

+ 23 - 17
ebpftracer/tracer/inject/inject_linux_amd64.go

@@ -45,10 +45,11 @@ type InnerSymbolInfo struct {
 }
 
 type LibNetInfo struct {
-	LibName     string
-	LibPath     string
-	FuncSymbol  InstInfo
-	InnerSymbol InnerSymbolInfo
+	LibName      string
+	LibPath      string
+	FuncSymbol   InstInfo
+	InnerSymbol  InnerSymbolInfo
+	ProcLoadPath string
 }
 
 type UprobeData struct {
@@ -75,6 +76,7 @@ type JvmInjector struct {
 		NetSendFuncCheck bool
 	}
 	Uprobe UprobeData
+	Rootfs string
 }
 
 func (j *JvmInjector) findReleaseAddressInfoFromMem() error {
@@ -468,8 +470,8 @@ func (j *JvmInjector) findLibBaseFromProcMaps(libName string) (uint64, string, e
 			if len(fields) > 5 {
 				path := fields[5]
 				if strings.HasSuffix(path, ".so") {
-					klog.Infof("[inject] found library %s", path)
-					return start, path, nil
+					klog.Infof("[inject] found library in map %s", path)
+					return start, j.Rootfs + path, nil
 				}
 			}
 		}
@@ -502,7 +504,7 @@ func (j *JvmInjector) findLibBaseByPathFromProcMaps(libPath string) (uint64, str
 		}
 	}
 
-	return 1, "", fmt.Errorf("library %s not found", libPath)
+	return 1, "", fmt.Errorf("library %s not found in process.", libPath)
 }
 
 func (j *JvmInjector) getFunctionOffset(libPath, functionName string) (elf.Symbol, error) {
@@ -578,16 +580,16 @@ func (j *JvmInjector) findDebugFuncContextFromLibPath() error {
 	//libName := j.DebugLibNetInfo.LibPath
 
 	// 获取release库的基地址
-	baseAddress, libPath, err := j.findLibBaseByPathFromProcMaps(j.DebugLibNetInfo.LibPath)
-	fmt.Println("debug libPath", libPath)
+	baseAddress, libPath, err := j.findLibBaseByPathFromProcMaps(j.DebugLibNetInfo.ProcLoadPath)
+	klog.Infof("[inject] debug base address of %s : %x", libPath, baseAddress)
 	functionName := j.DebugLibNetInfo.FuncSymbol.SymName
-	j.DebugLibNetInfo.LibPath = libPath
+	//j.DebugLibNetInfo.LibPath = libPath
 	if err != nil {
 		return err
 	}
 
 	// 获取函数的偏移量
-	functionSym, err := j.getFunctionOffset(libPath, functionName)
+	functionSym, err := j.getFunctionOffset(j.DebugLibNetInfo.LibPath, functionName)
 	// 计算函数的实际内存地址
 	j.DebugLibNetInfo.FuncSymbol.SymAddr = baseAddress + functionSym.Value
 	j.DebugLibNetInfo.FuncSymbol.SymSize = functionSym.Size
@@ -624,9 +626,10 @@ func printCodeData(data LibNetInfo) {
 }
 
 func (j *JvmInjector) jvmInjectLib() int {
-	dll := C.CString(j.DebugLibNetInfo.LibPath) // 替换为实际的DLL路径
-	defer C.free(unsafe.Pointer(dll))           // 确保在使用完字符串后释放内存
-	result := C.cw_inject_library(C.int(j.Pid), C.int(1), dll)
+	dll := C.CString(j.DebugLibNetInfo.ProcLoadPath)
+	rootfs := C.CString(j.Rootfs)
+	defer C.free(unsafe.Pointer(dll))
+	result := C.cw_inject_library(C.int(j.Pid), C.int(1), dll, rootfs)
 	fmt.Printf("Result: %d\n", result)
 	return int(result)
 }
@@ -796,11 +799,14 @@ func JvmInject(jvmInjector *JvmInjector) error {
 	if err != nil {
 		// load so
 		if _type == 1 {
-			fmt.Println(err, "Load it.")
-			if jvmInjector.jvmInjectLib() == 0 {
+			klog.Infoln("[inject] start load so.")
+			resCode := jvmInjector.jvmInjectLib()
+			if resCode == 0 {
+				klog.Infof("[inject] load so successful. proc load path is [%s], file path in node is [%s]", jvmInjector.DebugLibNetInfo.ProcLoadPath, jvmInjector.DebugLibNetInfo.LibPath)
 				jvmInjector.PreCheck.LoadingCheck = true
 			} else {
-				return err
+				klog.Errorf("[inject] Failed load so. so path is [%s]", jvmInjector.DebugLibNetInfo.LibPath)
+				return fmt.Errorf("[inject] Failed load so. code is %d so path is [%s]", resCode, jvmInjector.DebugLibNetInfo.LibPath)
 			}
 		}
 	} else {

BIN
ebpftracer/tracer/inject/lib/libhotpatch_amd64.a


+ 12 - 0
flags/flags.go

@@ -29,12 +29,14 @@ var (
 	DisableE2ETracing   = kingpin.Flag("disable-e2e-tracing", "Disable e2e tracing").Default("true").Envar("DISABLE_E2E_TRACING").Bool()
 	DisableStackTracing = kingpin.Flag("disable-stack-tracing", "Disable stack tracing").Default("true").Envar("DISABLE_STACK_TRACING").Bool()
 	LicenseKey          = kingpin.Flag("license-key", "Apm API key").Default("J45Engw88NeHUZ4Q7qNsK8L47FTH**QvgW113IEnsNaBNMR5zZ**oj/g!!!!").Envar("LICENSE_KEY").String()
+	RunInContainer      = kingpin.Flag("run-in-container", "run in container").Default("false").Envar("RUN_IN_CONTAINER").Bool()
 
 	ListenAddress             = kingpin.Flag("listen", "Listen address - ip:port or :port").Default("0.0.0.0:8123").Envar("LISTEN").String()
 	CgroupRoot                = kingpin.Flag("cgroupfs-root", "The mount point of the host cgroupfs root").Default("/sys/fs/cgroup").Envar("CGROUPFS_ROOT").String()
 	DisableLogParsing         = kingpin.Flag("disable-log-parsing", "Disable container log parsing").Default("false").Envar("DISABLE_LOG_PARSING").Bool()
 	DisablePinger             = kingpin.Flag("disable-pinger", "Don't ping upstreams").Default("false").Envar("DISABLE_PINGER").Bool()
 	DisableL7Tracing          = kingpin.Flag("disable-l7-tracing", "Disable L7 tracing").Default("false").Envar("DISABLE_L7_TRACING").Bool()
+
 	ExternalNetworksWhitelist = kingpin.Flag("track-public-network", "Allow track connections to the specified IP networks, all private networks are allowed by default (e.g., Y.Y.Y.Y/mask)").Envar("TRACK_PUBLIC_NETWORK").Strings()
 	EphemeralPortRange        = kingpin.Flag("ephemeral-port-range", "Destination and Listen TCP ports from this range will be skipped").Default("").Envar("EPHEMERAL_PORT_RANGE").String()
 
@@ -55,6 +57,8 @@ var (
 
 	ScrapeInterval = kingpin.Flag("scrape-interval", "How often to gather metrics from the agent").Default("15s").Envar("SCRAPE_INTERVAL").Duration()
 	WalDir         = kingpin.Flag("wal-dir", "Path to where the agent stores data (e.g. the metrics Write-Ahead Log)").Default("/tmp/coroot-node-agent").Envar("WAL_DIR").String()
+
+	HostDirPathPrefix = kingpin.Flag("host-dir-path-prefix", "Set the prefix of path about the mount point of the host directory").Envar("HOST_DIR_PATH_PREFIX").Default("").String()
 )
 
 func GetString(fl *string) string {
@@ -128,6 +132,14 @@ func init() {
 			klog.Error(err)
 		}
 	}
+
+	if *RunInContainer {
+		if *HostDirPathPrefix == "" {
+			*HostDirPathPrefix = "/host"
+		}
+	} else {
+		*HostDirPathPrefix = ""
+	}
 }
 
 func DumpTableFeatures() {

+ 13 - 2
go.mod

@@ -49,6 +49,8 @@ require (
 	gopkg.in/alecthomas/kingpin.v2 v2.2.6
 	gopkg.in/yaml.v2 v2.4.0
 	inet.af/netaddr v0.0.0-20230525184311-b8eac61e914a
+	k8s.io/apimachinery v0.28.6
+	k8s.io/client-go v0.28.6
 )
 
 require (
@@ -82,6 +84,7 @@ require (
 	github.com/docker/go-connections v0.4.0 // indirect
 	github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c // indirect
 	github.com/docker/go-units v0.5.0 // indirect
+	github.com/emicklei/go-restful/v3 v3.11.0 // indirect
 	github.com/felixge/httpsnoop v1.0.4 // indirect
 	github.com/fsnotify/fsnotify v1.7.0 // indirect
 	github.com/go-logfmt/logfmt v0.6.0 // indirect
@@ -104,7 +107,9 @@ require (
 	github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
 	github.com/golang/protobuf v1.5.3 // indirect
 	github.com/golang/snappy v0.0.4 // indirect
+	github.com/google/gnostic-models v0.6.8 // indirect
 	github.com/google/go-cmp v0.6.0 // indirect
+	github.com/google/gofuzz v1.2.0 // indirect
 	github.com/google/pprof v0.0.0-20240117000934-35fc243c5815 // indirect
 	github.com/google/uuid v1.5.0 // indirect
 	github.com/grafana/regexp v0.0.0-20221123153739-15dc172cd2db // indirect
@@ -134,6 +139,7 @@ require (
 	github.com/moby/term v0.5.0 // indirect
 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
 	github.com/modern-go/reflect2 v1.0.2 // indirect
+	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
 	github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
 	github.com/oklog/ulid v1.3.1 // indirect
 	github.com/opencontainers/go-digest v1.0.0 // indirect
@@ -177,6 +183,7 @@ require (
 	golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
 	golang.org/x/oauth2 v0.16.0 // indirect
 	golang.org/x/sync v0.6.0 // indirect
+	golang.org/x/term v0.18.0 // indirect
 	golang.org/x/text v0.14.0 // indirect
 	golang.org/x/tools v0.19.0 // indirect
 	google.golang.org/appengine v1.6.8 // indirect
@@ -184,12 +191,16 @@ require (
 	google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac // indirect
 	google.golang.org/grpc v1.61.0 // indirect
 	google.golang.org/protobuf v1.32.0 // indirect
+	gopkg.in/inf.v0 v0.9.1 // indirect
 	gopkg.in/ini.v1 v1.67.0 // indirect
 	gopkg.in/yaml.v3 v3.0.1 // indirect
-	k8s.io/apimachinery v0.28.6 // indirect
-	k8s.io/client-go v0.28.6 // indirect
+	k8s.io/api v0.28.6 // indirect
 	k8s.io/klog/v2 v2.120.1 // indirect
+	k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
 	k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
+	sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
+	sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
+	sigs.k8s.io/yaml v1.3.0 // indirect
 )
 
 replace (

+ 7 - 1
go.sum

@@ -356,7 +356,6 @@ github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8E
 github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
 github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
 github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
-github.com/emicklei/go-restful v2.16.0+incompatible h1:rgqiKNjTnFQA6kkhFe16D8epTksy9HQ1MyrbDXSdYhM=
 github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
 github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
 github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@@ -468,6 +467,8 @@ github.com/go-resty/resty/v2 v2.11.0/go.mod h1:iiP/OpA0CkcL3IGt1O0+/SIItFUbkkyw5
 github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
 github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
+github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
 github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg=
 github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
 github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0=
@@ -863,6 +864,9 @@ github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+
 github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg=
 github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
+github.com/onsi/ginkgo/v2 v2.9.4 h1:xR7vG4IXt5RWx6FfIjyAtsoMAtnc3C/rFXBBd2AjZwE=
+github.com/onsi/ginkgo/v2 v2.9.4/go.mod h1:gCQYp2Q+kSoIj7ykSVb9nskRSsR6PUj4AiLywzIhbKM=
 github.com/onsi/gomega v0.0.0-20151007035656-2152b45fa28a/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
 github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
 github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
@@ -870,6 +874,8 @@ github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
 github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
 github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA=
 github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc=
+github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE=
+github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg=
 github.com/opencontainers/go-digest v0.0.0-20170106003457-a6d0ee40d420/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
 github.com/opencontainers/go-digest v0.0.0-20180430190053-c9281466c8b2/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
 github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=

+ 68 - 0
kube/client.go

@@ -0,0 +1,68 @@
+package kube
+
+import (
+	"context"
+	"fmt"
+	"github.com/coroot/coroot-node-agent/kube/transfer"
+	"github.com/sirupsen/logrus"
+	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/rest"
+	"strings"
+)
+
+var CwK8sClient *transfer.CwK8sClient
+
+func GetKubeClient() (*transfer.CwK8sClient, error) {
+	if CwK8sClient == nil {
+		return CwK8sClient, fmt.Errorf("K8sClient is nil")
+	}
+	return CwK8sClient, nil
+}
+
+func NewKubeClient() (*transfer.CwK8sClient, error) {
+	if CwK8sClient != nil {
+		return CwK8sClient, nil
+	}
+	ctx := context.Background()
+	config, err := rest.InClusterConfig()
+	if err != nil {
+		return nil, err
+	}
+	config.TLSClientConfig.Insecure = false
+	// creates the clientset
+	clientset, err := kubernetes.NewForConfig(config)
+	if err != nil {
+		logrus.WithError(err).Error("[kube] connect init error")
+		return nil, err
+	}
+	cwk8sclient := transfer.NewCwK8sClient(ctx, clientset)
+	go cwk8sclient.InformerSharedfactory.Start(make(chan struct{}, 0))
+	CwK8sClient = cwk8sclient
+	logrus.Info("[kube] connect init success")
+	return cwk8sclient, nil
+}
+
+func GetWorkload(ns, pod string) (string, error) {
+	client, err := GetKubeClient()
+	if err != nil {
+		return "", err
+	}
+	podsItems, err := client.InformerSharedfactory.Core().V1().Pods().Lister().Pods(ns).Get(pod)
+	if err != nil {
+		return "", err
+	}
+	if len(podsItems.ObjectMeta.OwnerReferences) > 0 {
+		ownerReferences := podsItems.ObjectMeta.OwnerReferences[0]
+		kind := ownerReferences.Kind
+		workload := ownerReferences.Name
+		switch kind {
+		case "ReplicaSet":
+			lastDash := strings.LastIndex(workload, "-")
+			if lastDash != -1 {
+				workload = workload[:lastDash]
+			}
+		}
+		return workload, nil
+	}
+	return "", fmt.Errorf("workload not found")
+}

+ 30 - 0
kube/transfer/transfer.go

@@ -0,0 +1,30 @@
+package transfer
+
+import (
+	"context"
+	"k8s.io/client-go/informers"
+	"k8s.io/client-go/kubernetes"
+	"time"
+)
+
+type CwK8sClient struct {
+	K8sClient             *kubernetes.Clientset
+	InformerSharedfactory informers.SharedInformerFactory
+	ctx                   context.Context
+}
+
+func NewCwK8sClient(ctx context.Context, client *kubernetes.Clientset) *CwK8sClient {
+	sharedInformer := informers.NewSharedInformerFactory(client, 30*time.Second)
+	sharedInformer.Core().V1().Pods().Informer()
+	sharedInformer.Core().V1().Nodes().Informer()
+	sharedInformer.Core().V1().Namespaces().Informer()
+	sharedInformer.Apps().V1().StatefulSets().Informer()
+	sharedInformer.Apps().V1().DaemonSets().Informer()
+	sharedInformer.Apps().V1().Deployments().Informer()
+	sharedInformer.Apps().V1().ReplicaSets().Informer()
+	return &CwK8sClient{
+		K8sClient:             client,
+		ctx:                   ctx,
+		InformerSharedfactory: sharedInformer,
+	}
+}

+ 8 - 0
main.go

@@ -3,6 +3,7 @@ package main
 import (
 	"bytes"
 	"github.com/cilium/ebpf/rlimit"
+	"github.com/coroot/coroot-node-agent/kube"
 	"github.com/coroot/coroot-node-agent/utils"
 	"github.com/coroot/coroot-node-agent/utils/enums"
 	log "github.com/sirupsen/logrus"
@@ -165,6 +166,13 @@ func main() {
 	tracing.Init(machineId, hostname, version)
 	logs.Init(machineId, hostname, version)
 
+	if *flags.RunInContainer {
+		_, err = kube.NewKubeClient()
+		if err != nil {
+			log.WithError(err).Errorf("Failed to init kube client.")
+		}
+	}
+
 	registry := prometheus.NewRegistry()
 	registerer := prometheus.WrapRegistererWith(prometheus.Labels{"machine_id": machineId}, registry)
 

+ 1 - 1
utils/util.go

@@ -410,7 +410,7 @@ func UpdateConfig(oldPath, newPath string) error {
 	return nil
 }
 
-func CopyFile(dstName, srcName string) (written int64, err error) {
+func CopyFile(srcName, dstName string) (written int64, err error) {
 	src, err := os.Open(srcName)
 	if err != nil {
 		return