瀏覽代碼

Feature #TASK_QT-18250 EBPF-euspace接入熔断/恢复机制(1)

Tom 1 年之前
父節點
當前提交
38379d33fa
共有 8 個文件被更改,包括 284 次插入27 次删除
  1. 29 0
      containers/apm_fusing.go
  2. 2 1
      containers/registry.go
  3. 34 11
      ebpftracer/tracer.go
  4. 12 3
      main.go
  5. 5 0
      utils/enums/enums.go
  6. 3 12
      utils/namedpipe/name_pipe.go
  7. 180 0
      utils/namedpipe/namedpipe_ctl.go
  8. 19 0
      utils/util.go

+ 29 - 0
containers/apm_fusing.go

@@ -0,0 +1,29 @@
+package containers
+
+import log "github.com/sirupsen/logrus"
+
+func (r *Registry) DoFusing() {
+
+	/*//先处于熔断状态 (应用层的uprobes将停止attach)
+	r.isFusing = true
+	//再关闭应用层的uprobes
+	for pid, c := range r.containersByPid {
+		if c != nil {
+			c.detachUprobes(pid)
+		}
+	}
+	//最后关闭内核层的tracepoint、kprobe
+	r.tracer.UnlinkEbpfProg()*/
+	log.Infof("-----DoFusing will to execute fuse operates -----\n")
+}
+
+func (r *Registry) DoResume() error {
+	/*//先开启内核层的tracepoint、kprobe
+	if err := r.tracer.LinkEbpfProg(); err != nil {
+		return err
+	}
+	//再处于非熔断状态 (应用层的uprobes将开启attach)
+	r.isFusing = false*/
+	log.Infof("-----DoResume will to execute resume operates -----\n")
+	return nil
+}

+ 2 - 1
containers/registry.go

@@ -71,6 +71,7 @@ type Registry struct {
 	trafficStatsLock        sync.Mutex
 	trafficStatsUpdateCh    chan *TrafficStatsUpdate
 	nodeInfo                *NodeInfoT
+	isFusing                bool
 }
 
 var (
@@ -214,7 +215,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 				if c != nil {
 					if c != nil && !common.IsOpenFilter() {
 						verifyAttachConditions, openStack := c.verifyAttachConditions(r, pid)
-						if verifyAttachConditions {
+						if verifyAttachConditions && !r.isFusing {
 							err = c.RegisterAppInfo(r, pid)
 							if err == nil {
 								klog.WithField("pid", pid).Infoln("[registry] Attach uprobes.")

+ 34 - 11
ebpftracer/tracer.go

@@ -142,13 +142,14 @@ type Tracer struct {
 	disableE2ETracing   bool
 	disableStackTracing bool
 
-	collection *ebpf.Collection
-	readers    map[string]*perf.Reader
-	links      []link.Link
-	uprobes    map[string]*ebpf.Program
-	Symbols    []debugelf.Symbol
-	Uprobes    []tracer.Uprobe
-	UprobesMap map[string]tracer.Uprobe
+	collection     *ebpf.Collection
+	collectionSpec *ebpf.CollectionSpec
+	readers        map[string]*perf.Reader
+	links          []link.Link
+	uprobes        map[string]*ebpf.Program
+	Symbols        []debugelf.Symbol
+	Uprobes        []tracer.Uprobe
+	UprobesMap     map[string]tracer.Uprobe
 }
 
 func NewTracer(kernelVersion string, disableL7Tracing, disableE2ETracing, disableStackTracing bool) *Tracer {
@@ -308,7 +309,7 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 		return fmt.Errorf("failed to load collection: %w", err)
 	}
 	tracer.Offset()
-
+	t.collectionSpec = collectionSpec
 	t.collection = c
 
 	perfMaps := []perfMap{
@@ -344,7 +345,21 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 	klog.Infof("[end] Look eBPF perf_maps")
 
 	klog.Infof("[start] Look eBPF specPrograms")
-	for _, programSpec := range collectionSpec.Programs {
+	if err = t.LinkEbpfProg(); err != nil {
+		return err
+	}
+	klog.Infof("[end] Look eBPF specPrograms")
+
+	return nil
+}
+
+func (t *Tracer) LinkEbpfProg() error {
+	klog.Infof("[start] Look eBPF specPrograms")
+	var (
+		l   link.Link
+		err error
+	)
+	for _, programSpec := range t.collectionSpec.Programs {
 		program := t.collection.Programs[programSpec.Name]
 		klog.Infof("%s:[%s]", programSpec.SectionName, programSpec.Name)
 		if t.DisableL7Tracing() {
@@ -357,7 +372,6 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 				continue
 			}
 		}
-		var l link.Link
 		switch programSpec.Type {
 		case ebpf.TracePoint:
 			if strings.Contains(programSpec.SectionName, "prog") {
@@ -384,9 +398,18 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 		t.links = append(t.links, l)
 	}
 	klog.Infof("[end] Look eBPF specPrograms")
-
 	return nil
 }
+func (t *Tracer) UnlinkEbpfProg() {
+
+	for _, p := range t.uprobes {
+		_ = p.Close()
+	}
+
+	for _, l := range t.links {
+		_ = l.Close()
+	}
+}
 
 func (t EventType) String() string {
 	switch t {

+ 12 - 3
main.go

@@ -2,10 +2,13 @@ package main
 
 import (
 	"bytes"
+	"encoding/json"
 	"github.com/cilium/ebpf/rlimit"
 	"github.com/coroot/coroot-node-agent/kube"
 	"github.com/coroot/coroot-node-agent/utils"
 	"github.com/coroot/coroot-node-agent/utils/enums"
+	"github.com/coroot/coroot-node-agent/utils/namedpipe"
+	dto "github.com/prometheus/client_model/go"
 	log "github.com/sirupsen/logrus"
 	"net/http"
 	_ "net/http/pprof"
@@ -19,9 +22,6 @@ import (
 	"time" 
 	"regexp" 
 
-	"encoding/json"
-	dto "github.com/prometheus/client_model/go"
-
 	"github.com/coroot/coroot-node-agent/common"
 	"github.com/coroot/coroot-node-agent/containers"
 	"github.com/coroot/coroot-node-agent/flags"
@@ -176,6 +176,12 @@ func main() {
 		log.Fatalf("the minimum Linux kernel version required is %s or later", minSupportedKernelVersion)
 	}
 
+	//namedpipe初始化
+	npCtl, err := namedpipe.NewNamedPipeCtl(nil)
+	if err != nil {
+		log.Fatalf("get namedpipeCtl occurs error: %s", err.Error())
+	}
+
 	whitelistNodeExternalNetworks()
 
 	machineId := nodeInfo.GetNodeInfo().SystemUUID
@@ -208,6 +214,9 @@ func main() {
 	defer cr.Close()
 	log.Infoln("START_TRACE")
 
+	//监听&处理-熔断信号
+	npCtl.AcceptAndDisposeMsg(cr)
+
 	//profiling.Start()
 	//defer profiling.Stop()
 	// 创建一个/metrics路由处理函数

+ 5 - 0
utils/enums/enums.go

@@ -10,6 +10,9 @@ const (
 	DaemonServiceFile = "omniagent"
 	CpuLimit          = "omniagent-cpulimit"
 
+	AgentNameEuspace  = DaemonProc
+	AgentNameOneAgent = "cw-oneagent"
+
 	TestApp = "eBPF-APP"
 	// windows
 	DaemonCtlProcEXE   = "omniagent-ctl.exe"
@@ -25,4 +28,6 @@ const (
 	DefaultLocalDataPort   = "18085"
 	DefaultLocalFilePort   = "18086"
 	DefaultVpcKey          = "identification"
+
+	TimeoutNamedPipe3S = 3
 )

+ 3 - 12
utils/namedpipe/name_pipe.go

@@ -46,17 +46,8 @@ type MsgInfo struct {
 const (
 	msgSeparator = '\n'
 
-	MsgTypeSignal          = "signal"
-	MsgTypeHeartbeat       = "heartbeat"
-	MsgTypeData            = "data"
-	MsgTypeFusing          = "fusing"
-	MsgTypeResume          = "resume"
-	MsgTypeChecker         = "checker"
-	MsgTypeManager         = "manager"
-	MsgTypeNetFlow         = "netflow"
-	MsgTypeNetFlowFuseData = "netflowFuseData"
-	MsgTypeSetFusePids     = "setFusePids"
-	MsgTypeSetFuseInterval = "setFuseInterval"
+	MsgTypeFusing = "fusing"
+	MsgTypeResume = "resume"
 )
 
 var (
@@ -176,7 +167,7 @@ func (np *namedPipe) Write(msg string, msgType string) (int, error) {
 		defer np.wg.Done()
 
 		if atomic.LoadUint32(&np.quitFlag) == 0 {
-			if !(msgType == MsgTypeResume || msgType == MsgTypeFusing || msgType == MsgTypeSignal || msgType == MsgTypeManager || msgType == MsgTypeHeartbeat || msgType == MsgTypeData || msgType == MsgTypeNetFlow || msgType == MsgTypeNetFlowFuseData || msgType == MsgTypeSetFusePids) {
+			if !(msgType == MsgTypeResume || msgType == MsgTypeFusing) {
 				return 0, errors.New(fmt.Sprintf("WriteToPipe,invalid msg type: %s", msgType))
 			}
 			sendPipe := np.sendPipe

+ 180 - 0
utils/namedpipe/namedpipe_ctl.go

@@ -0,0 +1,180 @@
+package namedpipe
+
+import (
+	"fmt"
+	"github.com/coroot/coroot-node-agent/containers"
+	"github.com/coroot/coroot-node-agent/utils"
+	"github.com/coroot/coroot-node-agent/utils/enums"
+	log "github.com/sirupsen/logrus"
+	"io"
+	"path/filepath"
+	"runtime/debug"
+	"sync"
+	"time"
+)
+
+type NamedPipeCtl struct {
+	mtx       *sync.Mutex
+	stopChan  chan struct{}
+	isClosed  bool
+	namedPipe NamedPipe
+}
+
+func NewNamedPipeCtl(stopChan chan struct{}) (*NamedPipeCtl, error) {
+	ctl := &NamedPipeCtl{
+		mtx:      new(sync.Mutex),
+		stopChan: stopChan,
+	}
+	err := ctl.initNamedPipe()
+	if err != nil {
+		return nil, err
+	}
+	return ctl, nil
+}
+
+func (ctl *NamedPipeCtl) initNamedPipe() error {
+
+	if ctl.isClosed {
+		log.Warnf("namedpipectl has been closed, return before try to listen namedpipe")
+		return fmt.Errorf("namedpipectl has been closed")
+	}
+
+	ctl.namedPipe = nil
+	errChan := make(chan error)
+	t := time.NewTimer(enums.TimeoutNamedPipe3S * time.Second)
+	defer utils.StopTimerWithDrainChan(t)
+	go func() {
+		defer ctl.recoverAndLogError("ListenNpipe")
+
+		log.Infof("namedpipectl to listen namedpipe")
+		//RootPath => /opt/cloudwise/omniagent/agents/apm/current/
+		p, err := ListenNpipe(enums.AgentNameEuspace, enums.AgentNameOneAgent, false, false, filepath.Join(filepath.Dir(filepath.Dir(filepath.Dir(utils.GetRootPath()))), "runtime"))
+		if err != nil || p == nil {
+			log.Errorf("namedpipectl to listen namedpipe,occurs error:%s ", err.Error())
+			errChan <- fmt.Errorf("namedpipectl to listen namedpipe error: %v", err.Error())
+		} else {
+			log.Infof("namedpipectl listen namedpipe success")
+			ctl.mtx.Lock()
+			if ctl.isClosed {
+				ctl.mtx.Unlock()
+				log.Infof("namedpipectl has been closed, will to free namedpipe")
+				if p != nil {
+					p.FreeUp4EOF()
+				}
+				errChan <- fmt.Errorf("namedpipectl has been closed,freed")
+			} else {
+				ctl.namedPipe = p
+				ctl.mtx.Unlock()
+				errChan <- nil
+			}
+		}
+	}()
+
+	select {
+	case err := <-errChan:
+		return err
+	case <-t.C:
+		ctl.Close()
+		log.Errorf("namedpipectl to listen namedpipe timeout (%v s)", enums.TimeoutNamedPipe3S)
+		return fmt.Errorf("namedpipectl to listen namedpipe timeout")
+	case <-ctl.stopChan:
+		ctl.Close()
+		log.Infof("euspace stopping,close namedpipectl")
+		return fmt.Errorf("euspace was stoped")
+	}
+}
+
+func (ctl *NamedPipeCtl) Close() {
+	ctl.mtx.Lock()
+	defer ctl.mtx.Unlock()
+	if ctl.isClosed {
+		log.Warn("namedPipeCtl has been closed")
+		return
+	}
+	ctl.isClosed = true
+	if ctl.namedPipe != nil {
+		ctl.namedPipe.FreeUp4EOF()
+		ctl.namedPipe = nil
+		log.Info("namedPipeCtl namedPipe is closed")
+	} else {
+		log.Warn("namedPipeCtl namedPipe is nil, do nothing")
+	}
+}
+
+func (ctl *NamedPipeCtl) recoverAndLogError(target string) {
+	if i := recover(); i != nil {
+		//获取错误消息
+		eMsg := ""
+		switch v := i.(type) {
+		case string:
+			eMsg = v
+		case error:
+			eMsg = v.Error()
+		default:
+			eMsg = fmt.Sprintf("%+v", v)
+		}
+		sMsg := string(debug.Stack())
+		log.Errorf("goroutine[ %s ] panic: %s ,stack: %s", target, eMsg, sMsg)
+	}
+}
+
+func (ctl *NamedPipeCtl) AcceptAndDisposeMsg(reg *containers.Registry) {
+	log.Info("namedPipeCtl start listen and dispose msg")
+	go func() {
+		defer func() {
+			ctl.recoverAndLogError("AcceptAndDisposeMsg")
+			log.Info("namedPipeCtl stop listen and dispose msg")
+		}()
+
+		for {
+
+			if ctl.isClosed {
+				log.Errorf("AcceptAndDisposeMsg namedPipeCtl has been closed, stop accept msg.")
+				return
+			}
+
+			if ctl.namedPipe == nil {
+				if err := ctl.initNamedPipe(); err != nil {
+					log.Errorf("AcceptAndDisposeMsg re-initNamedPipe occurs error: %s", err.Error())
+				}
+			}
+
+			if ctl.stopChan != nil {
+				select {
+				case <-ctl.stopChan:
+					log.Warnf("AcceptAndDisposeMsg euspace stopping, stop accept msg")
+					return
+				default:
+				}
+			}
+
+			if msg, err := ctl.namedPipe.Accept(); err != nil {
+				if err == io.EOF {
+					log.Warnf("AcceptAndDisposeMsg get io.EOF,will re-initNamedPipe")
+					ctl.mtx.Lock()
+					ctl.namedPipe.FreeUp4EOF()
+					ctl.namedPipe = nil
+					ctl.mtx.Unlock()
+					if err := ctl.initNamedPipe(); err != nil {
+						log.Errorf("AcceptAndDisposeMsg get io.EOF,re-initNamedPipe occurs error: %s", err.Error())
+					}
+				} else {
+					log.Errorf("AcceptAndDisposeMsg occurs error: %s", err.Error())
+				}
+			} else {
+				if ctl.isClosed {
+					log.Errorf("AcceptAndDisposeMsg namedPipeCtl has been closed, stop dispose msg :%#v", msg)
+					return
+				}
+				switch msg.Type {
+				case MsgTypeFusing:
+					reg.DoFusing()
+				case MsgTypeResume:
+					if err := reg.DoResume(); err != nil {
+						log.Fatalf("AcceptAndDisposeMsg DoResume occurs error: %s", err.Error())
+					}
+				}
+			}
+		}
+	}()
+}

+ 19 - 0
utils/util.go

@@ -945,3 +945,22 @@ func GetSoPath(pid uint32, soname string, rootfs string) (string, error) {
 	}
 	return "", fmt.Errorf("library %s not found in process.", soname)
 }
+
+func StopTimerWithDrainChan(timer *time.Timer) {
+	if !timer.Stop() {
+		select {
+		case <-timer.C:
+		default:
+		}
+	}
+}
+
+func ResetTimerWithDrainChan(timer *time.Timer, d time.Duration) {
+	if !timer.Stop() {
+		select {
+		case <-timer.C:
+		default:
+		}
+	}
+	timer.Reset(d)
+}