Pārlūkot izejas kodu

Feature #TASK_QT-18250 EBPF-euspace接入熔断/恢复-实现控制范围(2)

Tom 1 gadu atpakaļ
vecāks
revīzija
b0ea6c55c9
4 mainītis faili ar 82 papildinājumiem un 20 dzēšanām
  1. 62 14
      containers/apm_fusing.go
  2. 1 0
      containers/registry.go
  3. 1 0
      flags/flags.go
  4. 18 6
      utils/namedpipe/namedpipe_ctl.go

+ 62 - 14
containers/apm_fusing.go

@@ -1,8 +1,11 @@
 package containers
 
 import (
+	"fmt"
+	"github.com/coroot/coroot-node-agent/flags"
 	log "github.com/sirupsen/logrus"
 	"os"
+	"time"
 )
 
 func (r *Registry) DoFusing() error {
@@ -11,15 +14,37 @@ func (r *Registry) DoFusing() error {
 		lastErr error
 		err     error
 	)
+
 	//先处于熔断状态 (应用层的uprobes\stackUprobes将进行Detach操作)
 	r.isFusing = true
-	//再停止数据发送
-	if err = os.Setenv("SEND", ""); err != nil {
-		lastErr = err
+	for tryLeft := *flags.FuseTryMax; tryLeft > 0; tryLeft-- {
+		lastErr = nil
+		//再停止数据发送
+		if err = os.Setenv("SEND", ""); err != nil {
+			lastErr = fmt.Errorf("failed to set SEND env: %w", err)
+			log.Warnf("DoFusing,FuseTryMax[%d],tryLeft[%d] set SEND env occurs error:%s", *flags.FuseTryMax, tryLeft-1, err.Error())
+		} else {
+			log.Infof("DoFusing,FuseTryMax[%d],tryLeft[%d] set SEND env OK", *flags.FuseTryMax, tryLeft-1)
+		}
+		//最后关闭内核层的tracepoint、kprobe
+		if err = r.tracer.UnlinkEbpfProg(); err != nil {
+			lastErr = fmt.Errorf("failed to unlink eBPF program: %w", err)
+			log.Warnf("DoFusing,FuseTryMax[%d],tryLeft[%d] UnlinkEbpfProg occurs error:%s", *flags.FuseTryMax, tryLeft-1, err.Error())
+		} else {
+			log.Infof("DoFusing,FuseTryMax[%d],tryLeft[%d] UnlinkEbpfProg OK", *flags.FuseTryMax, tryLeft-1)
+		}
+
+		if lastErr == nil {
+			break
+		}
+
+		time.Sleep(time.Duration(100) * time.Millisecond)
 	}
-	//最后关闭内核层的tracepoint、kprobe
-	if err = r.tracer.UnlinkEbpfProg(); err != nil {
-		lastErr = err
+
+	if lastErr != nil {
+		//重试N次后,仍然失败 设置熔断机制异常标识 (后续就不进行恢复操作了)
+		r.IsFuseException = true
+		log.Warnf("DoFusing,lastErr is not nil, set IsFuseException = true. error: %s", lastErr.Error())
 	}
 	return lastErr
 }
@@ -30,15 +55,38 @@ func (r *Registry) DoResume() error {
 		lastErr error
 		err     error
 	)
-	//先开启内核层的tracepoint、kprobe
-	if err = r.tracer.LinkEbpfProg(); err != nil {
-		lastErr = err
+
+	for tryLeft := *flags.FuseTryMax; tryLeft > 0; tryLeft-- {
+		//先开启内核层的tracepoint、kprobe
+		if err = r.tracer.LinkEbpfProg(); err != nil {
+			lastErr = fmt.Errorf("failed to link eBPF program: %w", err)
+			log.Warnf("DoResume,FuseTryMax[%d],tryLeft[%d] LinkEbpfProg occurs error:%s", *flags.FuseTryMax, tryLeft-1, err.Error())
+		} else {
+			log.Infof("DoResume,FuseTryMax[%d],tryLeft[%d] LinkEbpfProg OK", *flags.FuseTryMax, tryLeft-1)
+		}
+		//再开启数据发送
+		if err = os.Setenv("SEND", "1"); err != nil {
+			lastErr = fmt.Errorf("failed to set SEND env: %w", err)
+			log.Warnf("DoResume,FuseTryMax[%d],tryLeft[%d] set SEND env occurs error:%s", *flags.FuseTryMax, tryLeft-1, err.Error())
+		} else {
+			log.Infof("DoResume,FuseTryMax[%d],tryLeft[%d] set SEND env OK", *flags.FuseTryMax, tryLeft-1)
+		}
+
+		if lastErr == nil {
+			break
+		}
+
+		time.Sleep(time.Duration(100) * time.Millisecond)
 	}
-	//再开启数据发送
-	if err = os.Setenv("SEND", "1"); err != nil {
-		lastErr = err
+
+	if lastErr != nil {
+		//重试N次后,仍然失败 将状态设置为熔断,设置熔断机制异常标识 (后续就不进行熔断操作了)
+		r.IsFuseException = true
+		r.isFusing = true
+		log.Warnf("DoResume,lastErr is not nil, set IsFuseException = true. error: %s", lastErr.Error())
+	} else {
+		//最后处于非熔断状态 (应用层的uprobes\stackUprobes将进行Attach操作)
+		r.isFusing = false
 	}
-	//最后处于非熔断状态 (应用层的uprobes\stackUprobes将进行Attach操作)
-	r.isFusing = false
 	return lastErr
 }

+ 1 - 0
containers/registry.go

@@ -72,6 +72,7 @@ type Registry struct {
 	trafficStatsUpdateCh    chan *TrafficStatsUpdate
 	nodeInfo                *NodeInfoT
 	isFusing                bool
+	IsFuseException         bool
 }
 
 var (

+ 1 - 0
flags/flags.go

@@ -65,6 +65,7 @@ var (
 	WalDir         = kingpin.Flag("wal-dir", "Path to where the agent stores data (e.g. the metrics Write-Ahead Log)").Default("/tmp/coroot-node-agent").Envar("WAL_DIR").String()
 
 	HostDirPathPrefix = kingpin.Flag("host-dir-path-prefix", "Set the prefix of path about the mount point of the host directory").Envar("HOST_DIR_PATH_PREFIX").Default("").String()
+	FuseTryMax        = kingpin.Flag("fuse_try_max", "The maximum number of the fuse operation try").Default("3").Envar("FUSE_TRY_MAX").Int()
 )
 
 func GetString(fl *string) string {

+ 18 - 6
utils/namedpipe/namedpipe_ctl.go

@@ -193,14 +193,26 @@ func (ctl *NamedPipeCtl) AcceptAndDisposeMsg(reg *containers.Registry) {
 
 				switch msg.Type {
 				case MsgTypeFusing:
-					if err := reg.DoFusing(); err != nil {
-						//todo save the err to the status-page instruct
-						log.Errorf("AcceptAndDisposeMsg DoFusing occurs error: %s", err.Error())
+					if reg.IsFuseException {
+						log.Errorf("AcceptAndDisposeMsg fuse exception, will not to call DoFusing")
+					} else {
+						if err := reg.DoFusing(); err != nil {
+							//todo save the err to the status-page instruct
+							log.Errorf("AcceptAndDisposeMsg DoFusing occurs error: %s", err.Error())
+						} else {
+							log.Infof("AcceptAndDisposeMsg DoFusing finish")
+						}
 					}
 				case MsgTypeResume:
-					if err := reg.DoResume(); err != nil {
-						//todo save the err to the status-page instruct
-						log.Errorf("AcceptAndDisposeMsg DoResume occurs error: %s", err.Error())
+					if reg.IsFuseException {
+						log.Errorf("AcceptAndDisposeMsg fuse exception, will not to call DoResume")
+					} else {
+						if err := reg.DoResume(); err != nil {
+							//todo save the err to the status-page instruct
+							log.Errorf("AcceptAndDisposeMsg DoResume occurs error: %s", err.Error())
+						} else {
+							log.Infof("AcceptAndDisposeMsg DoResume finish")
+						}
 					}
 				}
 			}