Просмотр исходного кода

Feature #TASK_QT-18250 cgroup优化

Carl 1 месяц назад
Родитель
Сommit
03f1f606ee
6 измененных файлов с 186 добавлено и 103 удалено
  1. 10 0
      cgroup/cgroup.go
  2. 23 9
      containers/container.go
  3. 17 12
      containers/container_apm.go
  4. 1 0
      containers/process.go
  5. 4 1
      containers/registry.go
  6. 131 81
      ebpftracer/tracer.go

+ 10 - 0
cgroup/cgroup.go

@@ -88,6 +88,16 @@ func (cg *Cgroup) CreatedAt() time.Time {
 	return fi.ModTime()
 }
 
+func (cg *Cgroup) FileSystemPath() string {
+	if cg == nil {
+		return ""
+	}
+	if cg.Version == V1 {
+		return path.Join(cgRoot, "cpu", cg.subsystems["cpu"])
+	}
+	return path.Join(cgRoot, cg.subsystems[""])
+}
+
 func NewFromProcessCgroupFile(filePath string) (*Cgroup, error) {
 	data, err := os.ReadFile(filePath)
 	if err != nil {

+ 23 - 9
containers/container.go

@@ -1469,10 +1469,16 @@ func (c *Container) AttachUprobes(tracer *ebpftracer.Tracer, pid uint32, _type s
 		return err
 	}
 
-	// 对 l4-header 允许的语言类型,将 PID 加入 sk_msg cgroup 以启用 Header 注入
+	// 对 l4-header 允许的语言类型,在业务 cgroup 上启用 sockops/cgroup-skb。
 	if flags.IsL4HeaderEnabled(codeType.String()) {
-		if cgErr := tracer.AddPidToCgroup(pid); cgErr != nil {
-			klog.Errorf("[attach] add pid %d (type=%s) to cgroup failed: %v", pid, codeType.String(), cgErr)
+		p := c.processes[pid]
+		if p != nil && p.l4HeaderCgroup == "" {
+			cgroupPath, cgErr := tracer.EnsureL4HeaderCgroup(c.cgroup)
+			if cgErr != nil {
+				klog.Errorf("[attach] enable l4-header for pid %d (type=%s) failed: %v", pid, codeType.String(), cgErr)
+			} else {
+				p.l4HeaderCgroup = cgroupPath
+			}
 		}
 	}
 
@@ -1591,8 +1597,9 @@ func (c *Container) attachJVMUprobes(tracer *ebpftracer.Tracer, pid uint32) erro
 		} else {
 			klog.Infof("[attach] InitKProcInfo succeed! pid:[%d]", pid)
 		}
-	} else {
-		klog.Infof("[attach] %s-%d already attach status:%v", codeType.String(), pid, c.Isl7AttachSuccess())
+	} else if !c.Isl7AttachSuccess() {
+		klog.Infof("[attach] %s-%d previous attach failed, skip retry", codeType.String(), pid)
+		return fmt.Errorf("[attach] %s-%d previous attach failed, skip retry", codeType.String(), pid)
 	}
 	return nil
 }
@@ -1601,8 +1608,8 @@ func isSupportedJava(major, minor, patch int) bool {
 	switch major {
 	case 1:
 		return minor == 8 // JDK 8
-	case 11: // JDK 11/17/21
-		//case 11, 17, 21:
+	//case 11: // JDK 11/17/21
+	case 11, 17, 21:
 		return true // JDK 11/17/21
 	default:
 		return false
@@ -1659,8 +1666,9 @@ func (c *Container) attachJavaAotUprobes(tracer *ebpftracer.Tracer, pid uint32)
 		//p.jvmUprobesChecked = true
 		c.l7AttachSuccess()
 		tracer.InitKProcInfo(pid, &c.AppInfo)
-	} else {
-		klog.Infof("[attach] %s-%d already attach", codeType.String(), pid)
+	} else if !c.Isl7AttachSuccess() {
+		klog.Infof("[attach] %s-%d previous attach failed, skip retry", codeType.String(), pid)
+		return fmt.Errorf("[attach] %s-%d previous attach failed, skip retry", codeType.String(), pid)
 	}
 	return nil
 }
@@ -1699,6 +1707,9 @@ func (c *Container) attachNetCoreUprobes(tracer *ebpftracer.Tracer, pid uint32)
 		}
 		p.uprobes = append(p.uprobes, WriteProbes...)
 		c.l7AttachSuccess()
+	} else if !c.Isl7AttachSuccess() {
+		klog.Infof("[attach] netcore-%d previous attach failed, skip retry", pid)
+		return fmt.Errorf("[attach] netcore-%d previous attach failed, skip retry", pid)
 	}
 
 	return nil
@@ -1721,6 +1732,9 @@ func (c *Container) attachPythonUprobes(tracer *ebpftracer.Tracer, pid uint32) e
 		}
 		klog.Infof("[attach] python proc succeed! pid:[%d]", pid)
 		c.l7AttachSuccess()
+	} else if !c.Isl7AttachSuccess() {
+		klog.Infof("[attach] python-%d previous attach failed, skip retry", pid)
+		return fmt.Errorf("[attach] python-%d previous attach failed, skip retry", pid)
 	}
 	return nil
 }

+ 17 - 12
containers/container_apm.go

@@ -974,11 +974,7 @@ func (c *Container) ctrlStack(r *Registry, pid uint32) {
 func (c *Container) verifyAttachConditions(r *Registry, pid uint32) (bool, int) {
 	p := c.processes[pid]
 	if p != nil {
-		codeType := c.GetCodeTypeFromCache(pid)
-		if codeType.IsUnknownCode() {
-			klog.WithField("pid", pid).Debug("[verify] unknown language.")
-			return false, 0
-		}
+		// 先校验 cmdline 是否匹配白名单
 		cmdline := p.GetCmdline()
 
 		if len(cmdline) == 0 {
@@ -995,18 +991,22 @@ func (c *Container) verifyAttachConditions(r *Registry, pid uint32) (bool, int)
 		whiteListByCode := r.getWhiteListAll()
 		//klog.WithField("pid", pid).WithField("codeType", codeType.String()).
 		//	Infof("[verify] white list %v", utils.ToString(whiteListByCode))
-		// 当前语言的白名单规则
+		// 白名单规则匹配
 		for _, setting := range whiteListByCode {
 			ruleVal := setting.Filters
 			if ruleVal == "" {
 				continue
 			}
-			// 判断规则
+			// cmdline 匹配白名单后,再判断 codeType 是否可识别
 			if strings.Contains(cmdline, ruleVal) {
-				//if !codeType.IsJvmCode() {
-				//	klog.WithField("pid", pid).Warning("[verify] This agent version only supports JVM applications.")
-				//	return false, 0
-				//}
+				codeType := c.GetCodeTypeFromCache(pid)
+				if codeType.IsUnknownCode() {
+					klog.WithField("pid", pid).
+						WithField("ruleVal", ruleVal).
+						WithField("cmdline", cmdline).
+						Debug("[verify] cmdline matched but language unknown, skip.")
+					return false, 0
+				}
 				c.WhiteSettingInfo.AppName = setting.AppName
 				c.WhiteSettingInfo.Filters = setting.Filters
 				klog.WithField("pid", pid).
@@ -1080,7 +1080,12 @@ func (c *Container) DetachUprobes(tracer *ebpftracer.Tracer, pid uint32, detachT
 			klog.Infof("[DetachUprobes] delete KProcInfo success for pid %d,detachType:%s", pid, detachType.String())
 			c.AppInfo.EBPFProcInfo = nil
 		}
-		tracer.RemovePidFromCgroup(pid)
+		if p.l4HeaderCgroup != "" {
+			if err := tracer.ReleaseL4HeaderCgroup(p.l4HeaderCgroup); err != nil {
+				return fmt.Errorf("[DetachUprobes] failed to release l4-header cgroup for pid %d: %w", pid, err)
+			}
+			p.l4HeaderCgroup = ""
+		}
 	} else {
 		return fmt.Errorf("[DetachUprobes] cannot find uprobe for pid %d", pid)
 	}

+ 1 - 0
containers/process.go

@@ -46,6 +46,7 @@ type Process struct {
 	codeType         CodeType
 	cmdline          string
 	pythonGilChecked bool
+	l4HeaderCgroup   string
 }
 
 func NewProcess(pid uint32, stats *taskstats.Stats, tracer *ebpftracer.Tracer) *Process {

+ 4 - 1
containers/registry.go

@@ -264,6 +264,9 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 			}
 
 			for pid, c := range r.containersByPid {
+				if pid < 1000 {
+					continue
+				}
 				if c == nil {
 					klog.WithField("pid", pid).Warningln("[handle] container is nil.")
 					app, ok := r.RegistryApps[pid]
@@ -293,7 +296,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 					delete(r.containersByPid, pid)
 					_, ok := r.RegistryApps[pid]
 					if ok {
-						klog.Infof("[handle] cgroup id changed %d", pid)
+						klog.Infof("[handle] cgroup id changed %d [%s] [%s]", pid, cg.Id, c.cgroup.Id)
 					}
 					c.onProcessExit(pid, false)
 					// 重新验证cgroup,可能需要重新创建容器

+ 131 - 81
ebpftracer/tracer.go

@@ -11,6 +11,7 @@ import (
 	"path/filepath"
 	"strconv"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/coroot/coroot-node-agent/utils"
@@ -19,6 +20,7 @@ import (
 	"github.com/cilium/ebpf"
 	"github.com/cilium/ebpf/link"
 	"github.com/cilium/ebpf/perf"
+	"github.com/coroot/coroot-node-agent/cgroup"
 	"github.com/coroot/coroot-node-agent/common"
 	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
 	"github.com/coroot/coroot-node-agent/ebpftracer/tracer"
@@ -154,6 +156,14 @@ type Tracer struct {
 	Symbols        []debugelf.Symbol
 	Uprobes        []tracer.Uprobe
 	UprobesMap     map[string]tracer.Uprobe
+
+	cgroupLinksMu sync.Mutex
+	cgroupLinks   map[string]*cgroupLinkState
+}
+
+type cgroupLinkState struct {
+	refCount int
+	links    []link.Link
 }
 
 func NewTracer(kernelVersion string, disableL7Tracing, disableE2ETracing, disableStackTracing bool) *Tracer {
@@ -172,6 +182,8 @@ func NewTracer(kernelVersion string, disableL7Tracing, disableE2ETracing, disabl
 		readers: map[string]*perf.Reader{},
 		uprobes: map[string]*ebpf.Program{},
 		links:   []link.Link{},
+
+		cgroupLinks: map[string]*cgroupLinkState{},
 	}
 }
 
@@ -198,6 +210,17 @@ func (t *Tracer) Close() {
 			klog.WithError(err).Infof("[close] links")
 		}
 	}
+	t.cgroupLinksMu.Lock()
+	for cgPath, state := range t.cgroupLinks {
+		for _, l := range state.links {
+			if l != nil {
+				err := l.Close()
+				klog.WithError(err).Infof("[close] cgroup links %s", cgPath)
+			}
+		}
+	}
+	t.cgroupLinks = map[string]*cgroupLinkState{}
+	t.cgroupLinksMu.Unlock()
 	for k, r := range t.readers {
 		if r != nil {
 			err := r.Close()
@@ -479,74 +502,13 @@ func (t *Tracer) LinkEbpfProg() error {
 
 		case ebpf.SockOps:
 			klog.Infof("Processing SockOps program: %s", programSpec.SectionName)
-			// 获取sockops程序
-			sockopsProg, exists := t.collection.Programs["sockops_cb"]
-			if !exists {
-				klog.Errorf("sockops_cb program not found")
-				continue
-			}
-
-			// 清理旧的cgroup(可能残留上次agent运行的PID)
-			cgroupPath := "/sys/fs/cgroup/ebpf-sockops"
-			cleanupCgroup(cgroupPath)
-
-			// 重新创建cgroup路径
-			if err := os.MkdirAll(cgroupPath, 0755); err != nil {
-				klog.Errorf("Failed to create cgroup path: %v", err)
-				continue
-			}
-
-			// 从环境变量获取要监控的PID
-			filterPidStr := os.Getenv("FILTER_PID")
-			if filterPidStr == "" {
-				klog.Warnf("FILTER_PID environment variable not set, using current process")
-				filterPidStr = fmt.Sprint(os.Getpid())
-			}
-
-			// 将指定PID添加到cgroup
-			if err := os.WriteFile(filepath.Join(cgroupPath, "cgroup.procs"), []byte(filterPidStr), 0644); err != nil {
-				klog.Errorf("Failed to add process %s to cgroup: %v", filterPidStr, err)
-				continue
-			}
-			klog.Infof("Added process %s to cgroup for monitoring", filterPidStr)
-
-			// 附加sockops程序到cgroup
-			l, err = link.AttachCgroup(link.CgroupOptions{
-				Path:    cgroupPath,
-				Program: sockopsProg,
-				Attach:  ebpf.AttachCGroupSockOps,
-			})
-			if err != nil {
-				klog.Errorf("Failed to attach sockops program: %v", err)
-				continue
-			}
-			klog.Infof("Successfully attached sockops program to cgroup: %s", cgroupPath)
+			klog.Infof("Deferring sockops cgroup program linking until a business cgroup is requested")
+			continue
 
 		case ebpf.CGroupSKB:
 			klog.Infof("Processing CGroupSKB program: %s", programSpec.SectionName)
-			// 处理cgroup/skb程序
-			if programSpec.SectionName == "cgroup/skb" {
-				cgroupSkbProg, exists := t.collection.Programs["http_request_handler"]
-				if !exists {
-					klog.Errorf("http_request_handler program not found")
-					continue
-				}
-
-				// 复用sockops阶段已创建的cgroup路径
-				cgroupPath := "/sys/fs/cgroup/ebpf-sockops"
-
-				// 附加cgroup/skb程序到cgroup
-				l, err = link.AttachCgroup(link.CgroupOptions{
-					Path:    cgroupPath,
-					Program: cgroupSkbProg,
-					Attach:  ebpf.AttachCGroupInetEgress,
-				})
-				if err != nil {
-					klog.Errorf("Failed to attach cgroup/skb program: %v", err)
-					continue
-				}
-				klog.Infof("Successfully attached cgroup/skb program to cgroup: %s", cgroupPath)
-			}
+			klog.Infof("Deferring cgroup/skb program linking until a business cgroup is requested")
+			continue
 
 		case ebpf.SkMsg:
 			klog.Infof("Processing SkMsg program: %s", programSpec.SectionName)
@@ -1177,27 +1139,115 @@ func (t *Tracer) DelKProcInfo(pid uint32) error {
 	return err
 }
 
-const skMsgCgroupPath = "/sys/fs/cgroup/ebpf-sockops"
+func (t *Tracer) EnsureL4HeaderCgroup(cg *cgroup.Cgroup) (string, error) {
+	if cg == nil {
+		return "", fmt.Errorf("business cgroup is nil")
+	}
+	if cg.Version != cgroup.V2 {
+		return "", fmt.Errorf("l4 header cgroup programs require cgroup v2, got %s", cg.Id)
+	}
+	cgroupPath := cg.FileSystemPath()
+	if cgroupPath == "" {
+		return "", fmt.Errorf("business cgroup path is empty for %s", cg.Id)
+	}
+
+	t.cgroupLinksMu.Lock()
+	if state, ok := t.cgroupLinks[cgroupPath]; ok {
+		state.refCount++
+		t.cgroupLinksMu.Unlock()
+		klog.Infof("[cgroup] reusing business cgroup %s for l4-header, ref=%d", cgroupPath, state.refCount)
+		return cgroupPath, nil
+	}
+	t.cgroupLinksMu.Unlock()
 
-// AddPidToCgroup 将指定 PID 追加到 sk_msg 所用的 cgroup 中,使其 socket 能被 sockops/sk_msg 程序拦截。
-func (t *Tracer) AddPidToCgroup(pid uint32) error {
-	pidStr := fmt.Sprint(pid)
-	if err := os.WriteFile(filepath.Join(skMsgCgroupPath, "cgroup.procs"), []byte(pidStr), 0644); err != nil {
-		return fmt.Errorf("failed to add pid %d to cgroup %s: %w", pid, skMsgCgroupPath, err)
+	sockopsProg, exists := t.collection.Programs["sockops_cb"]
+	if !exists {
+		return "", fmt.Errorf("sockops_cb program not found")
 	}
-	klog.Infof("[cgroup] added pid %d to %s", pid, skMsgCgroupPath)
-	return nil
+	cgroupSkbProg, exists := t.collection.Programs["http_request_handler"]
+	if !exists {
+		return "", fmt.Errorf("http_request_handler program not found")
+	}
+
+	var links []link.Link
+	sockopsLink, err := link.AttachCgroup(link.CgroupOptions{
+		Path:    cgroupPath,
+		Program: sockopsProg,
+		Attach:  ebpf.AttachCGroupSockOps,
+	})
+	if err != nil {
+		return "", fmt.Errorf("attach sockops program to %s: %w", cgroupPath, err)
+	}
+	links = append(links, sockopsLink)
+
+	cgroupSkbLink, err := link.AttachCgroup(link.CgroupOptions{
+		Path:    cgroupPath,
+		Program: cgroupSkbProg,
+		Attach:  ebpf.AttachCGroupInetEgress,
+	})
+	if err != nil {
+		_ = sockopsLink.Close()
+		return "", fmt.Errorf("attach cgroup/skb program to %s: %w", cgroupPath, err)
+	}
+	links = append(links, cgroupSkbLink)
+
+	t.cgroupLinksMu.Lock()
+	if state, ok := t.cgroupLinks[cgroupPath]; ok {
+		state.refCount++
+		t.cgroupLinksMu.Unlock()
+		for _, l := range links {
+			_ = l.Close()
+		}
+		klog.Infof("[cgroup] concurrent reuse of business cgroup %s for l4-header, ref=%d", cgroupPath, state.refCount)
+		return cgroupPath, nil
+	}
+	t.cgroupLinks[cgroupPath] = &cgroupLinkState{
+		refCount: 1,
+		links:    links,
+	}
+	t.cgroupLinksMu.Unlock()
+
+	klog.Infof("[cgroup] attached l4-header programs to business cgroup %s", cgroupPath)
+	return cgroupPath, nil
 }
 
-// RemovePidFromCgroup 将指定 PID 移回根 cgroup,使其脱离 sk_msg 拦截。
-func (t *Tracer) RemovePidFromCgroup(pid uint32) error {
-	rootCgroup := "/sys/fs/cgroup/cgroup.procs"
-	pidStr := fmt.Sprint(pid)
-	if err := os.WriteFile(rootCgroup, []byte(pidStr), 0644); err != nil {
-		return fmt.Errorf("failed to move pid %d to root cgroup: %w", pid, err)
+func (t *Tracer) ReleaseL4HeaderCgroup(cgroupPath string) error {
+	if cgroupPath == "" {
+		return nil
 	}
-	klog.Infof("[cgroup] removed pid %d from %s", pid, skMsgCgroupPath)
-	return nil
+
+	var links []link.Link
+	t.cgroupLinksMu.Lock()
+	state, ok := t.cgroupLinks[cgroupPath]
+	if !ok {
+		t.cgroupLinksMu.Unlock()
+		return nil
+	}
+	state.refCount--
+	if state.refCount > 0 {
+		ref := state.refCount
+		t.cgroupLinksMu.Unlock()
+		klog.Infof("[cgroup] keep business cgroup %s for l4-header, ref=%d", cgroupPath, ref)
+		return nil
+	}
+	links = state.links
+	delete(t.cgroupLinks, cgroupPath)
+	t.cgroupLinksMu.Unlock()
+
+	var lastErr error
+	for _, l := range links {
+		if l == nil {
+			continue
+		}
+		if err := l.Close(); err != nil {
+			lastErr = err
+			klog.WithError(err).Errorf("[cgroup] close business cgroup link %s", cgroupPath)
+		}
+	}
+	if lastErr == nil {
+		klog.Infof("[cgroup] detached l4-header programs from business cgroup %s", cgroupPath)
+	}
+	return lastErr
 }
 
 // TODO check language