Selaa lähdekoodia

Feature #TASK_QT-18250 EBPF-euspace接入熔断/恢复-实现控制范围

Tom 1 vuosi sitten
vanhempi
säilyke
c89f7388db
4 muutettua tiedostoa jossa 90 lisäystä ja 52 poistoa
  1. 33 18
      containers/apm_fusing.go
  2. 1 1
      dist/package_dir/bin/agentctl
  3. 50 31
      ebpftracer/tracer.go
  4. 6 2
      utils/namedpipe/namedpipe_ctl.go

+ 33 - 18
containers/apm_fusing.go

@@ -1,29 +1,44 @@
 package containers
 
-import log "github.com/sirupsen/logrus"
+import (
+	log "github.com/sirupsen/logrus"
+	"os"
+)
 
-func (r *Registry) DoFusing() {
-
-	/*//先处于熔断状态 (应用层的uprobes将停止attach)
+func (r *Registry) DoFusing() error {
+	log.Infof("-----DoFusing will to execute fuse operates -----\n")
+	var (
+		lastErr error
+		err     error
+	)
+	//先处于熔断状态 (应用层的uprobes\stackUprobes将进行Detach操作)
 	r.isFusing = true
-	//再关闭应用层的uprobes
-	for pid, c := range r.containersByPid {
-		if c != nil {
-			c.detachUprobes(pid)
-		}
+	//再停止数据发送
+	if err = os.Setenv("SEND", ""); err != nil {
+		lastErr = err
 	}
 	//最后关闭内核层的tracepoint、kprobe
-	r.tracer.UnlinkEbpfProg()*/
-	log.Infof("-----DoFusing will to execute fuse operates -----\n")
+	if err = r.tracer.UnlinkEbpfProg(); err != nil {
+		lastErr = err
+	}
+	return lastErr
 }
 
 func (r *Registry) DoResume() error {
-	/*//先开启内核层的tracepoint、kprobe
-	if err := r.tracer.LinkEbpfProg(); err != nil {
-		return err
-	}
-	//再处于非熔断状态 (应用层的uprobes将开启attach)
-	r.isFusing = false*/
 	log.Infof("-----DoResume will to execute resume operates -----\n")
-	return nil
+	var (
+		lastErr error
+		err     error
+	)
+	//先开启内核层的tracepoint、kprobe
+	if err = r.tracer.LinkEbpfProg(); err != nil {
+		lastErr = err
+	}
+	//再开启数据发送
+	if err = os.Setenv("SEND", "1"); err != nil {
+		lastErr = err
+	}
+	//最后处于非熔断状态 (应用层的uprobes\stackUprobes将进行Attach操作)
+	r.isFusing = false
+	return lastErr
 }

+ 1 - 1
dist/package_dir/bin/agentctl

@@ -203,7 +203,7 @@ agentStatus() {
 info() {
   local agentPid="$(getAgentPid)"
   local version=$(cat "${INSTALL_DIR}/installer.version" | tr -d '\n')
-  printf '{"pid":%d,"version":"%s","agent_id":"%s","config":"%s"}' "${agentPid}" "${version}" "${BRAND_PRODUCT_NAME_LOWER}" "${AGENT_CONFIG_PATH}"
+  printf '{"pid":%d,"version":"%s","agent_id":"%s","config":"%s","pipe": true}' "${agentPid}" "${version}" "${BRAND_PRODUCT_NAME_LOWER}" "${AGENT_CONFIG_PATH}"
 }
 
 ################################################################################

+ 50 - 31
ebpftracer/tracer.go

@@ -344,8 +344,9 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 func (t *Tracer) LinkEbpfProg() error {
 	klog.Infof("[start] Look eBPF specPrograms")
 	var (
-		l   link.Link
-		err error
+		l       link.Link
+		err     error
+		lastErr error
 	)
 	for _, programSpec := range t.collectionSpec.Programs {
 		program := t.collection.Programs[programSpec.Name]
@@ -380,23 +381,41 @@ func (t *Tracer) LinkEbpfProg() error {
 			l, err = link.Kprobe(programSpec.AttachTo, program, nil)
 		}
 		if err != nil {
+			lastErr = err
 			t.Close()
-			return fmt.Errorf("failed to link program: %w", err)
+			klog.Errorf("LinkEbpfProg failed to program[%s] link program: %s", programSpec.Name, err)
+			//return fmt.Errorf("failed to link program: %w", err)
+		} else {
+			t.links = append(t.links, l)
 		}
-		t.links = append(t.links, l)
 	}
 	klog.Infof("[end] Look eBPF specPrograms")
+
+	if lastErr != nil {
+		return fmt.Errorf("failed to link program: %w", lastErr)
+	}
 	return nil
 }
-func (t *Tracer) UnlinkEbpfProg() {
 
-	for _, p := range t.uprobes {
-		_ = p.Close()
+func (t *Tracer) UnlinkEbpfProg() error {
+	var (
+		lastErr error
+		err     error
+	)
+	for pName, p := range t.uprobes {
+		if err = p.Close(); err != nil {
+			lastErr = err
+			klog.Errorf("UnlinkEbpfProg close program[%s] uprobe occurs error: %s", pName, err.Error())
+		}
 	}
 
 	for _, l := range t.links {
-		_ = l.Close()
+		if err = l.Close(); err != nil {
+			lastErr = err
+			klog.Errorf("UnlinkEbpfProg close link occurs error: %s", err.Error())
+		}
 	}
+	return lastErr
 }
 
 func (t EventType) String() string {
@@ -482,13 +501,13 @@ type l7Event struct {
 	TraceStart          uint32
 	TraceEnd            uint32
 	EventCount          uint32
-	Sport				uint16
-    Dport				uint16
-    SAddr          		[16]byte
-	DAddr          		[16]byte
-	ComponentSport		uint16
-    ComponentDport		uint16
-    ComponentSAddr      [16]byte
+	Sport               uint16
+	Dport               uint16
+	SAddr               [16]byte
+	DAddr               [16]byte
+	ComponentSport      uint16
+	ComponentDport      uint16
+	ComponentSAddr      [16]byte
 	ComponentDAddr      [16]byte
 	AssumedAppId        HashByte
 	SpanId              HashByte
@@ -710,23 +729,23 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 
 			payload := reader.Bytes()
 			req := &l7.RequestData{
-				Protocol:     l7.Protocol(v.Protocol),
-				Status:       l7.Status(v.Status),
-				Duration:     time.Duration(v.Duration),
-				Method:       l7.Method(v.Method),
-				StatementId:  v.StatementId,
-				TraceId:      v.TraceId,
-				TraceStart:   v.TraceStart,
-				TraceEnd:     v.TraceEnd,
-				EventCount:   v.EventCount,
-				AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]),
-				SpanId:       hex.EncodeToString(v.SpanId[:]),
-				StartAt:      v.StartAt,
-				EndAt:        v.EndtAt,
-				ComponentSAddr: ipPort(v.ComponentSAddr,v.ComponentSport),
-				ComponentDAddr: ipPort(v.ComponentDAddr,v.ComponentDport),
+				Protocol:       l7.Protocol(v.Protocol),
+				Status:         l7.Status(v.Status),
+				Duration:       time.Duration(v.Duration),
+				Method:         l7.Method(v.Method),
+				StatementId:    v.StatementId,
+				TraceId:        v.TraceId,
+				TraceStart:     v.TraceStart,
+				TraceEnd:       v.TraceEnd,
+				EventCount:     v.EventCount,
+				AssumedAppId:   hex.EncodeToString(v.AssumedAppId[:]),
+				SpanId:         hex.EncodeToString(v.SpanId[:]),
+				StartAt:        v.StartAt,
+				EndAt:          v.EndtAt,
+				ComponentSAddr: ipPort(v.ComponentSAddr, v.ComponentSport),
+				ComponentDAddr: ipPort(v.ComponentDAddr, v.ComponentDport),
 			}
-			if req.Protocol == l7.ProtocolHTTP{
+			if req.Protocol == l7.ProtocolHTTP {
 				klog.Infof("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())
 				klog.Infof("runEventsReader ComponentDAddr.String %s", req.ComponentDAddr.String())
 			}

+ 6 - 2
utils/namedpipe/namedpipe_ctl.go

@@ -193,10 +193,14 @@ func (ctl *NamedPipeCtl) AcceptAndDisposeMsg(reg *containers.Registry) {
 
 				switch msg.Type {
 				case MsgTypeFusing:
-					reg.DoFusing()
+					if err := reg.DoFusing(); err != nil {
+						//todo save the err to the status-page instruct
+						log.Errorf("AcceptAndDisposeMsg DoFusing occurs error: %s", err.Error())
+					}
 				case MsgTypeResume:
 					if err := reg.DoResume(); err != nil {
-						log.Fatalf("AcceptAndDisposeMsg DoResume occurs error: %s", err.Error())
+						//todo save the err to the status-page instruct
+						log.Errorf("AcceptAndDisposeMsg DoResume occurs error: %s", err.Error())
 					}
 				}
 			}