浏览代码

Feature #TASK_QT-19234 merge dev-omni-fuse2 into dev
Feature #TASK_QT-19234 EBPF-euspace增加心跳
Feature #TASK_QT-18250 EBPF-euspace接入熔断/恢复机制(3)
Feature #TASK_QT-18250 EBPF-euspace接入熔断/恢复机制(2)
Feature #TASK_QT-18250 EBPF-euspace接入熔断/恢复机制(1)
Feature #TASK_QT-18250 EBPF-euspace接入熔断/恢复机制-namedpipe

Tom 1 年之前
父节点
当前提交
640488fa35

+ 29 - 0
containers/apm_fusing.go

@@ -0,0 +1,29 @@
+package containers
+
+import log "github.com/sirupsen/logrus"
+
+func (r *Registry) DoFusing() {
+
+	/*//先处于熔断状态 (应用层的uprobes将停止attach)
+	r.isFusing = true
+	//再关闭应用层的uprobes
+	for pid, c := range r.containersByPid {
+		if c != nil {
+			c.detachUprobes(pid)
+		}
+	}
+	//最后关闭内核层的tracepoint、kprobe
+	r.tracer.UnlinkEbpfProg()*/
+	log.Infof("-----DoFusing will to execute fuse operates -----\n")
+}
+
+func (r *Registry) DoResume() error {
+	/*//先开启内核层的tracepoint、kprobe
+	if err := r.tracer.LinkEbpfProg(); err != nil {
+		return err
+	}
+	//再处于非熔断状态 (应用层的uprobes将开启attach)
+	r.isFusing = false*/
+	log.Infof("-----DoResume will to execute resume operates -----\n")
+	return nil
+}

+ 41 - 0
containers/apm_heartbeat.go

@@ -0,0 +1,41 @@
+package containers
+
+import (
+	log "github.com/sirupsen/logrus"
+	"os"
+	"strconv"
+	"time"
+)
+
+var hbStopChan = make(chan struct{})
+
+func DoHeartbeat(hbFilePath string) {
+	defer func() {
+		if err := recover(); err != nil {
+			log.Errorf("DoHeartbeat panic: %v", err)
+		}
+	}()
+
+	doHeartbeat(hbFilePath)
+	ticker := time.NewTicker(time.Second * 30)
+	for {
+		select {
+		case <-ticker.C:
+			doHeartbeat(hbFilePath)
+		case <-hbStopChan:
+			return
+		}
+	}
+}
+
+func doHeartbeat(hbFilePath string) {
+	timeUnix := time.Now().Unix()
+	timeStr := strconv.FormatInt(timeUnix, 10)
+	if err := os.WriteFile(hbFilePath, []byte(timeStr), 0644); err != nil {
+		log.Errorf("write heartbeat to file [%s] occurs error: %v", hbFilePath, err.Error())
+	}
+}
+
+func StopHeartbeat() {
+	close(hbStopChan)
+}

+ 2 - 1
containers/registry.go

@@ -71,6 +71,7 @@ type Registry struct {
 	trafficStatsLock        sync.Mutex
 	trafficStatsUpdateCh    chan *TrafficStatsUpdate
 	nodeInfo                *NodeInfoT
+	isFusing                bool
 }
 
 var (
@@ -214,7 +215,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 				if c != nil {
 					if c != nil && !common.IsOpenFilter() {
 						verifyAttachConditions, openStack := c.verifyAttachConditions(r, pid)
-						if verifyAttachConditions {
+						if verifyAttachConditions && !r.isFusing {
 							err = c.RegisterAppInfo(r, pid)
 							if err == nil {
 								klog.WithField("pid", pid).Infoln("[registry] Attach uprobes.")

+ 34 - 11
ebpftracer/tracer.go

@@ -142,13 +142,14 @@ type Tracer struct {
 	disableE2ETracing   bool
 	disableStackTracing bool
 
-	collection *ebpf.Collection
-	readers    map[string]*perf.Reader
-	links      []link.Link
-	uprobes    map[string]*ebpf.Program
-	Symbols    []debugelf.Symbol
-	Uprobes    []tracer.Uprobe
-	UprobesMap map[string]tracer.Uprobe
+	collection     *ebpf.Collection
+	collectionSpec *ebpf.CollectionSpec
+	readers        map[string]*perf.Reader
+	links          []link.Link
+	uprobes        map[string]*ebpf.Program
+	Symbols        []debugelf.Symbol
+	Uprobes        []tracer.Uprobe
+	UprobesMap     map[string]tracer.Uprobe
 }
 
 func NewTracer(kernelVersion string, disableL7Tracing, disableE2ETracing, disableStackTracing bool) *Tracer {
@@ -308,7 +309,7 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 		return fmt.Errorf("failed to load collection: %w", err)
 	}
 	tracer.Offset()
-
+	t.collectionSpec = collectionSpec
 	t.collection = c
 
 	perfMaps := []perfMap{
@@ -344,7 +345,21 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 	klog.Infof("[end] Look eBPF perf_maps")
 
 	klog.Infof("[start] Look eBPF specPrograms")
-	for _, programSpec := range collectionSpec.Programs {
+	if err = t.LinkEbpfProg(); err != nil {
+		return err
+	}
+	klog.Infof("[end] Look eBPF specPrograms")
+
+	return nil
+}
+
+func (t *Tracer) LinkEbpfProg() error {
+	klog.Infof("[start] Look eBPF specPrograms")
+	var (
+		l   link.Link
+		err error
+	)
+	for _, programSpec := range t.collectionSpec.Programs {
 		program := t.collection.Programs[programSpec.Name]
 		klog.Infof("%s:[%s]", programSpec.SectionName, programSpec.Name)
 		if t.DisableL7Tracing() {
@@ -357,7 +372,6 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 				continue
 			}
 		}
-		var l link.Link
 		switch programSpec.Type {
 		case ebpf.TracePoint:
 			if strings.Contains(programSpec.SectionName, "prog") {
@@ -384,9 +398,18 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 		t.links = append(t.links, l)
 	}
 	klog.Infof("[end] Look eBPF specPrograms")
-
 	return nil
 }
+func (t *Tracer) UnlinkEbpfProg() {
+
+	for _, p := range t.uprobes {
+		_ = p.Close()
+	}
+
+	for _, l := range t.links {
+		_ = l.Close()
+	}
+}
 
 func (t EventType) String() string {
 	switch t {

+ 1 - 0
flags/flags.go

@@ -30,6 +30,7 @@ var (
 	DisableStackTracing = kingpin.Flag("disable-stack-tracing", "Disable stack tracing").Default("true").Envar("DISABLE_STACK_TRACING").Bool()
 	LicenseKey          = kingpin.Flag("license-key", "Apm API key").Default("J45Engw88NeHUZ4Q7qNsK8L47FTH**QvgW113IEnsNaBNMR5zZ**oj/g!!!!").Envar("LICENSE_KEY").String()
 	RunInContainer      = kingpin.Flag("run-in-container", "run in container").Default("false").Envar("RUN_IN_CONTAINER").Bool()
+	RunInOmniagent      = kingpin.Flag("run-in-omniagent", "run in omniagent").Default("false").Envar("RUN_IN_OMNIAGENT").Bool()
 
 	ListenAddress     = kingpin.Flag("listen", "Listen address - ip:port or :port").Default("0.0.0.0:8123").Envar("LISTEN").String()
 	CgroupRoot        = kingpin.Flag("cgroupfs-root", "The mount point of the host cgroupfs root").Default("/sys/fs/cgroup").Envar("CGROUPFS_ROOT").String()

+ 65 - 50
main.go

@@ -2,25 +2,27 @@ package main
 
 import (
 	"bytes"
+	"encoding/json"
+	"fmt"
 	"github.com/cilium/ebpf/rlimit"
 	"github.com/coroot/coroot-node-agent/kube"
 	"github.com/coroot/coroot-node-agent/utils"
 	"github.com/coroot/coroot-node-agent/utils/enums"
+	"github.com/coroot/coroot-node-agent/utils/namedpipe"
+	"github.com/coroot/coroot-node-agent/utils/try"
+	dto "github.com/prometheus/client_model/go"
 	log "github.com/sirupsen/logrus"
+	"io"
 	"net/http"
 	_ "net/http/pprof"
 	"os"
 	"path"
+	"path/filepath"
+	"regexp"
 	"runtime"
+	"strconv"
 	"strings"
-	"fmt"
-	"io"
-	"strconv"  
-	"time" 
-	"regexp" 
-
-	"encoding/json"
-	dto "github.com/prometheus/client_model/go"
+	"time"
 
 	"github.com/coroot/coroot-node-agent/common"
 	"github.com/coroot/coroot-node-agent/containers"
@@ -122,13 +124,13 @@ type MetricData struct {
 }
 
 type PostData struct {
-	AccountID string	`json:"accountId"`
-	IP	string			`json:"ip"`
-	HostID int64		`json:"hostId"`
-	TimeStamp uint64	`json:"time_stamp"`
-	ServiceType uint64	`json:"service_type"`
-	HostName string 	`json:"host_name"`
-	Data []MetricData 	`json:"data"`
+	AccountID   string       `json:"accountId"`
+	IP          string       `json:"ip"`
+	HostID      int64        `json:"hostId"`
+	TimeStamp   uint64       `json:"time_stamp"`
+	ServiceType uint64       `json:"service_type"`
+	HostName    string       `json:"host_name"`
+	Data        []MetricData `json:"data"`
 }
 
 func main() {
@@ -208,6 +210,19 @@ func main() {
 	defer cr.Close()
 	log.Infoln("START_TRACE")
 
+	if *flags.RunInOmniagent {
+		//namedpipe初始化
+		npCtl, err := namedpipe.NewNamedPipeCtl(nil)
+		if err != nil {
+			log.Fatalf("get namedpipeCtl occurs error: %s", err.Error())
+		}
+		//监听&处理-熔断信号
+		npCtl.AcceptAndDisposeMsg(cr)
+	}
+
+	//heartbeat
+	try.GoParams(containers.DoHeartbeat, utils.CatchFn, filepath.Join(utils.GetRootPath(), "heartbeat"))
+
 	//profiling.Start()
 	//defer profiling.Stop()
 	// 创建一个/metrics路由处理函数
@@ -367,10 +382,10 @@ func main() {
 			return
 		}
 
-		// 创建正则表达式对象  
-		regex, err := regexp.Compile(`^process_.+_queries_total$`)  
-		if err != nil {  
-			return  
+		// 创建正则表达式对象
+		regex, err := regexp.Compile(`^process_.+_queries_total$`)
+		if err != nil {
+			return
 		}
 
 		var postData PostData
@@ -405,7 +420,7 @@ func main() {
 				metric.GetName() != "process_net_tcp_bytes_sent_total" &&
 				metric.GetName() != "process_net_tcp_data_latency" &&
 				metric.GetName() != "process_net_tcp_data_duration" &&
-				metric.GetName() != "process_net_tcp_est_time"{
+				metric.GetName() != "process_net_tcp_est_time" {
 				continue
 			}
 
@@ -456,42 +471,42 @@ func main() {
 		w.Header().Set("Content-Type", "application/json")
 		w.Write(jsonData)
 
-		// 创建请求  
-		req, err := http.NewRequest("POST", "http://10.0.7.115:18080/api/v2/ebpf/receive", bytes.NewBuffer(jsonData))  
-		if err != nil {  
-			fmt.Println("Error:", err)  
-			return  
-		}  
+		// 创建请求
+		req, err := http.NewRequest("POST", "http://10.0.7.115:18080/api/v2/ebpf/receive", bytes.NewBuffer(jsonData))
+		if err != nil {
+			fmt.Println("Error:", err)
+			return
+		}
 
-		// 添加 Content-Type header  
-		req.Header.Add("Content-Type", "application/json")  
+		// 添加 Content-Type header
+		req.Header.Add("Content-Type", "application/json")
 
-		// 添加一个自定义 header  
-		req.Header.Add("DataCount", strconv.Itoa(len(postData.Data)))  
-		req.Header.Add("Account-Id", strconv.Itoa(nodeInfo.AccountID))  
-		req.Header.Add("ip", nodeInfo.HostIp)  
+		// 添加一个自定义 header
+		req.Header.Add("DataCount", strconv.Itoa(len(postData.Data)))
+		req.Header.Add("Account-Id", strconv.Itoa(nodeInfo.AccountID))
+		req.Header.Add("ip", nodeInfo.HostIp)
 
-		// 创建 HTTP 客户端  
-		client := &http.Client{}  
+		// 创建 HTTP 客户端
+		client := &http.Client{}
 
-		// 发送 HTTP POST 请求  
-		response, err := client.Do(req)  
-		if err != nil {  
-			fmt.Println("Error:", err)  
-			return  
-		}  
-		defer response.Body.Close()  
+		// 发送 HTTP POST 请求
+		response, err := client.Do(req)
+		if err != nil {
+			fmt.Println("Error:", err)
+			return
+		}
+		defer response.Body.Close()
 
-		// 读取响应内容  
-		responseData, err := io.ReadAll(response.Body)  
-		if err != nil {  
-			fmt.Println("Error:", err)  
-			return  
-		}  
+		// 读取响应内容
+		responseData, err := io.ReadAll(response.Body)
+		if err != nil {
+			fmt.Println("Error:", err)
+			return
+		}
 
-		// 输出响应状态码和响应正文  
-		fmt.Println("Status Code:", response.StatusCode)  
-		fmt.Println("Response Body:", string(responseData)) 
+		// 输出响应状态码和响应正文
+		fmt.Println("Status Code:", response.StatusCode)
+		fmt.Println("Response Body:", string(responseData))
 	}
 
 	if err := prom.StartAgent(machineId); err != nil {

+ 1 - 0
node/apm_host_info.go

@@ -18,6 +18,7 @@ func NewNodeInfo(name, kv string) (*NodeInfoT, error) {
 		klog.Errorf("Failed to create node info from common ini: %v", err)
 	} else {
 		klog.Infof("run in omniagent.")
+		*flags.RunInOmniagent = true
 		return ni, err
 	}
 

+ 5 - 0
utils/enums/enums.go

@@ -10,6 +10,9 @@ const (
 	DaemonServiceFile = "omniagent"
 	CpuLimit          = "omniagent-cpulimit"
 
+	AgentNameEuspace  = DaemonProc
+	AgentNameOneAgent = "cw-oneagent"
+
 	TestApp = "eBPF-APP"
 	// windows
 	DaemonCtlProcEXE   = "omniagent-ctl.exe"
@@ -25,4 +28,6 @@ const (
 	DefaultLocalDataPort   = "18085"
 	DefaultLocalFilePort   = "18086"
 	DefaultVpcKey          = "identification"
+
+	TimeoutNamedPipe3S = 3
 )

+ 230 - 0
utils/namedpipe/name_pipe.go

@@ -0,0 +1,230 @@
+package namedpipe
+
+import (
+	"bufio"
+	"bytes"
+	"encoding/json"
+	"errors"
+	"fmt"
+	log "github.com/sirupsen/logrus"
+	"io"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+type NamedPipe interface {
+	AcceptRaw() ([]byte, error)
+	Accept() (MsgInfo, error)
+	WriteRaw(b []byte) (n int, err error)
+	Write(msg string, msgType string) (int, error)
+	FreeUp4EOF()
+}
+
+type namedPipe struct {
+	// opMtx *sync.Mutex
+	// quitC chan int
+	quitFlag uint32
+	wg       *sync.WaitGroup
+	fromType string
+	toType   string
+	isDaemon bool
+	single   bool
+	sendPipe *bufio.Writer
+	recvPipe *bufio.Reader
+	rootPath string
+}
+
+type MsgInfo struct {
+	From    string `json:"from"`
+	Time    int64  `json:"time"`
+	Type    string `json:"type"`
+	Content string `json:"content"`
+}
+
+const (
+	msgSeparator = '\n'
+
+	MsgTypeFusing = "fusing"
+	MsgTypeResume = "resume"
+)
+
+var (
+	PipeCloseError     = errors.New("RecvFromPipe,named-pipe was closed")
+	JsonUnmarshalError = errors.New("json unmarshal error")
+)
+
+/*
+newNamePipe
+
+	@Description: 返回命名管道对象
+	@param  agent_type: from
+	@param  to_type:    to
+	@param  isDaemon:   是否为daemon创建的
+	@param  single:     是否为单文件模式
+	@param  rootPath:   命名管道文件夹路径
+	@return *namedPipe: 命名管道对象
+	@return error:      错误信息
+*/
+func newNamePipe(agent_type, to_type string, isDaemon, single bool, rootPath string) (*namedPipe, error) {
+	return &namedPipe{
+		wg:       new(sync.WaitGroup),
+		fromType: agent_type,
+		toType:   to_type,
+		isDaemon: isDaemon,
+		single:   single,
+		rootPath: rootPath,
+	}, nil
+}
+
+/*
+Accept
+
+	@Description:    读取操作
+	@return MsgInfo: 消息对象
+	@return error:   错误信息
+*/
+func (np *namedPipe) Accept() (MsgInfo, error) {
+	mi := MsgInfo{}
+
+	// 是否已经关闭pipe资源
+	if atomic.LoadUint32(&np.quitFlag) == 0 {
+		np.wg.Add(1)
+		defer np.wg.Done()
+		defer func() {
+			if err := recover(); err != nil {
+				log.WithField("module", "namedPipe").Errorf("holy name pipe error, %v", err)
+			}
+		}()
+		// 信号量计数后,再判断一次quitFlag。防止信号量未计数时,已经关闭了pipe资源。
+		if atomic.LoadUint32(&np.quitFlag) == 0 {
+			recvPipe := np.recvPipe
+			if recvPipe == nil {
+				return mi, PipeCloseError
+			}
+			if msg, err := recvPipe.ReadString(msgSeparator); err == nil {
+				msg = strings.TrimRight(msg, string(msgSeparator))
+				if err := json.Unmarshal([]byte(msg), &mi); err != nil {
+					return mi, JsonUnmarshalError
+				}
+				return mi, nil
+			} else {
+				return mi, err
+			}
+		}
+	}
+	return mi, PipeCloseError
+}
+
+/*
+AcceptRaw
+
+	@Description:   读取原始消息
+	@return []byte: 原始消息
+	@return error:  错误信息
+*/
+func (np *namedPipe) AcceptRaw() ([]byte, error) {
+	var raw []byte
+	// 是否已经关闭pipe资源
+	if atomic.LoadUint32(&np.quitFlag) == 0 {
+		np.wg.Add(1)
+		defer np.wg.Done()
+		// 信号量计数后,再判断一次quitFlag。防止信号量未计数时,已经关闭了pipe资源。
+		if atomic.LoadUint32(&np.quitFlag) == 0 {
+			recvPipe := np.recvPipe
+			if recvPipe == nil {
+				return raw, errors.New("RecvFromPipe,named-pipe was closed")
+			}
+			if msg, err := recvPipe.ReadBytes(msgSeparator); err == nil {
+				msg = bytes.TrimRight(msg, string(msgSeparator))
+				return msg, nil
+			} else {
+				if err != io.EOF {
+					err = errors.New(fmt.Sprintf("RecvFromPipe,recv msg occours error: %s", err.Error()))
+				}
+				return raw, err
+			}
+		}
+	}
+	return raw, errors.New("RecvFromPipe,named-pipe was closed")
+}
+
+/*
+Write
+
+	@Description:    写入操作
+	@param  msg:     管道消息内容
+	@param  msgType: 管道消息类型
+	@return int:     写入字节数
+	@return error:   错误信息
+*/
+func (np *namedPipe) Write(msg string, msgType string) (int, error) {
+	// 是否已经关闭pipe资源
+	if atomic.LoadUint32(&np.quitFlag) == 0 {
+
+		np.wg.Add(1)
+		defer np.wg.Done()
+
+		if atomic.LoadUint32(&np.quitFlag) == 0 {
+			if !(msgType == MsgTypeResume || msgType == MsgTypeFusing) {
+				return 0, errors.New(fmt.Sprintf("WriteToPipe,invalid msg type: %s", msgType))
+			}
+			sendPipe := np.sendPipe
+			if sendPipe == nil {
+				return 0, errors.New("WriteToPipe,named-pipe was closed")
+			}
+			mi := MsgInfo{
+				From:    np.fromType,
+				Time:    time.Now().Unix(),
+				Type:    msgType,
+				Content: strings.Trim(msg, string(msgSeparator)),
+			}
+			byteSli, _ := json.Marshal(mi)
+			// msg =  string(byteSli)+ string(msgSeparator)
+			byteSli = append(byteSli, msgSeparator)
+			if n, err := sendPipe.Write(byteSli); err == nil {
+				if err := sendPipe.Flush(); err != nil {
+					return n, errors.New(fmt.Sprintf("WriteToPipe,Flush buffer occours error: %s", err.Error()))
+				}
+				return n, nil
+			} else {
+				return 0, errors.New(fmt.Sprintf("WriteToPipe,Write msg occours error: %s", err.Error()))
+			}
+		}
+	}
+	return 0, errors.New("WriteToPipe,named-pipe was closed")
+}
+
+/*
+WriteRaw
+
+	@Description:  原始消息 写入管道
+	@param  msg:   管道消息内容
+	@return int:   写入字节数
+	@return error: 错误信息
+*/
+func (np *namedPipe) WriteRaw(msg []byte) (int, error) {
+	// 是否已经关闭pipe资源
+	if atomic.LoadUint32(&np.quitFlag) == 0 {
+		np.wg.Add(1)
+		defer np.wg.Done()
+
+		if atomic.LoadUint32(&np.quitFlag) == 0 {
+			sendPipe := np.sendPipe
+			if sendPipe == nil {
+				return 0, errors.New("WriteToPipe,named-pipe was closed")
+			}
+			msg = append(msg, msgSeparator)
+			if n, err := sendPipe.Write(msg); err == nil {
+				if err := sendPipe.Flush(); err != nil {
+					return n, errors.New(fmt.Sprintf("WriteToPipe,Flush buffer occours error: %s", err.Error()))
+				}
+				return n, nil
+			} else {
+				return 0, errors.New(fmt.Sprintf("WriteToPipe,Write msg occours error: %s", err.Error()))
+			}
+		}
+	}
+	return 0, errors.New("WriteToPipe,named-pipe was closed")
+}

+ 187 - 0
utils/namedpipe/name_pipe_unix.go

@@ -0,0 +1,187 @@
+//go:build linux || darwin
+// +build linux darwin
+
+package namedpipe
+
+import (
+	"bufio"
+	"fmt"
+	log "github.com/sirupsen/logrus"
+	"os"
+	"sync/atomic"
+	"syscall"
+)
+
+const ONEAGENT = "cw-oneagent"
+const NETFLOW = "cw-netflow"
+const SERVERAGENT = "cw-serveragent"
+
+const MASTER = true
+const AGENT = false
+
+type NamedPipe4U struct {
+	*namedPipe
+	sendPipeFileHandler *os.File
+	recvPipeFileHandler *os.File
+	sendPipePath        string
+	recvPipePath        string
+}
+
+/*
+ListenNpipe
+
+	@Description: 创建命名管道文件,返回命名管道对象
+	@param  agent_type:   from
+	@param  to_type:      to
+	@param  isMaster:     是否为daemon创建的
+	@param  single:       是否为单文件模式
+	@param  rootPath:     命名管道文件夹路径
+	@return *NamedPipe4U: 命名管道对象
+	@return error:        错误信息
+*/
+func ListenNpipe(agent_type, to_type string, isMaster, single bool, rootPath string) (*NamedPipe4U, error) {
+	np, err := newNamePipe(agent_type, to_type, isMaster, single, rootPath)
+	if err != nil {
+		return nil, err
+	}
+	np4u := &NamedPipe4U{namedPipe: np}
+	if err := np4u.initPipe(); err != nil {
+		return nil, err
+	}
+	return np4u, nil
+}
+
+func (np *NamedPipe4U) getPipePath() (daemon2subPipe, sub2DaemonPipe string, err error) {
+
+	basePath := np.rootPath + "/"
+	//确认目录存在
+	if _, err := os.Stat(basePath); os.IsNotExist(err) {
+		if err := os.MkdirAll(basePath, 0755); err != nil {
+			panic(fmt.Sprintf("mkdir for named-pipe failed . path:%v ,error:%v", basePath, err.Error()))
+		}
+	}
+
+	if np.single {
+		singlePipe := basePath + np.fromType
+		return singlePipe, singlePipe, nil
+	}
+	daemon2subPipe = basePath + fmt.Sprintf("%s2%s", np.fromType, np.toType)
+	sub2DaemonPipe = basePath + fmt.Sprintf("%s2%s", np.toType, np.fromType)
+	return daemon2subPipe, sub2DaemonPipe, err
+}
+
+func (np *NamedPipe4U) initPipe() error {
+	var pipeFiles []string
+	daemon2subPipe, sub2DaemonPipe, err := np.getPipePath()
+	if err != nil {
+		return err
+	}
+
+	if np.single {
+		pipeFiles = []string{daemon2subPipe}
+	} else {
+		pipeFiles = []string{daemon2subPipe, sub2DaemonPipe}
+	}
+
+	for _, pf := range pipeFiles {
+		if _, err := os.Lstat(pf); os.IsNotExist(err) == true {
+			err = syscall.Mkfifo(pf, 0666)
+			if err != nil {
+				return err
+			}
+		} else if err != nil {
+			return err
+		}
+	}
+
+	//根据角色指定pipe的路径(角色不同路径不同)
+	if np.isDaemon {
+		//daemon进程Open顺序 和 net进程Open顺序 相反
+		np.sendPipePath = daemon2subPipe
+		np.recvPipePath = sub2DaemonPipe
+		//OpenFile过程可能会阻塞
+		if sf, err := os.OpenFile(np.sendPipePath, os.O_WRONLY|os.O_CREATE, os.ModeNamedPipe|0666); err == nil {
+			np.sendPipeFileHandler = sf
+			np.sendPipe = bufio.NewWriter(sf)
+		} else {
+			log.WithField("module", "namedPipe_Unix").Errorf("open send pipe(%s) error: %v", np.sendPipePath, err)
+			return err
+		}
+
+		if rf, err := os.OpenFile(np.recvPipePath, os.O_RDONLY|os.O_CREATE, os.ModeNamedPipe|0666); err == nil {
+			np.recvPipeFileHandler = rf
+			np.recvPipe = bufio.NewReader(rf)
+		} else {
+			log.WithField("module", "namedPipe_Unix").Errorf("open recv pipe(%s) error: %v", np.recvPipePath, err)
+			sf := np.sendPipeFileHandler
+			np.sendPipeFileHandler = nil
+			np.sendPipe = nil
+			if sf != nil {
+				if errClose := sf.Close(); errClose != nil {
+					log.WithField("module", "namedPipe_Unix").Errorf("close send pipe(%s) error:%v", np.sendPipePath, errClose)
+				}
+			}
+			return err
+		}
+	} else {
+		//net进程Open顺序和daemon进程Open顺序相反
+		np.sendPipePath = daemon2subPipe
+		np.recvPipePath = sub2DaemonPipe
+		//OpenFile过程可能会阻塞
+		if rf, err := os.OpenFile(np.recvPipePath, os.O_RDONLY|os.O_CREATE, os.ModeNamedPipe|0666); err == nil {
+			np.recvPipeFileHandler = rf
+			np.recvPipe = bufio.NewReader(rf)
+		} else {
+			log.WithField("module", "namedPipe_Unix").Errorf("open recv pipe(%s) error: %v", np.recvPipePath, err)
+			return err
+		}
+
+		if sf, err := os.OpenFile(np.sendPipePath, os.O_WRONLY|os.O_CREATE, os.ModeNamedPipe|0666); err == nil {
+			np.sendPipeFileHandler = sf
+			np.sendPipe = bufio.NewWriter(sf)
+		} else {
+			log.WithField("module", "namedPipe_Unix").Errorf("open send pipe(%s) error: %v", np.sendPipePath, err)
+			rf := np.recvPipeFileHandler
+			np.recvPipeFileHandler = nil
+			np.recvPipe = nil
+			if rf != nil {
+				if errClose := rf.Close(); errClose != nil {
+					log.WithField("module", "namedPipe_Unix").Errorf("close recv pipe(%s) error:%v", np.recvPipePath, errClose)
+				}
+			}
+			return err
+		}
+	}
+	return nil
+}
+
+// FreeUp4EOF only called when io.EOF occurs
+func (np *NamedPipe4U) FreeUp4EOF() {
+	if !atomic.CompareAndSwapUint32(&np.quitFlag, 0, 1) {
+		log.WithField("module", "namedPipe_Unix").Warnf("named pipe(%s)(%s) already free", np.sendPipePath, np.recvPipePath)
+		return
+	}
+	log.WithField("module", "namedPipe_Unix").Infof("named pipe free(%s)(%s) begin", np.sendPipePath, np.recvPipePath)
+
+	sendPipeFileHandler := np.sendPipeFileHandler
+	np.sendPipeFileHandler = nil
+	if sendPipeFileHandler != nil {
+		err := sendPipeFileHandler.Close()
+		if err != nil {
+			log.WithField("module", "namedPipe_Unix").Errorf("close send pipe(%s) failed. error:%v", np.sendPipePath, err)
+		}
+	}
+
+	recvPipeFileHandler := np.recvPipeFileHandler
+	np.recvPipeFileHandler = nil
+	if recvPipeFileHandler != nil {
+		err := recvPipeFileHandler.Close()
+		if err != nil {
+			log.WithField("module", "namedPipe_Unix").Errorf("close recv pipe(%s) failed. error:%v", np.recvPipePath, err)
+		}
+	}
+
+	np.sendPipe = nil
+	np.recvPipe = nil
+	log.WithField("module", "namedPipe_Unix").Infof("named pipe(%s)(%s) free end", np.sendPipePath, np.recvPipePath)
+}

+ 120 - 0
utils/namedpipe/name_pipe_windows.go

@@ -0,0 +1,120 @@
+//go:build windows
+// +build windows
+
+package namedpipe
+
+import (
+	"bufio"
+	"fmt"
+	log "github.com/sirupsen/logrus"
+	"net"
+	"sync/atomic"
+)
+
+const pipeNameDaemonAndSubPattern = `\\.\pipe\`
+const ONEAGENT = "cw-oneagent"
+const NETFLOW = "cw-netflow"
+const SERVERAGENT = "cw-serveragent"
+
+type NamedPipe4W struct {
+	*namedPipe
+	pipePath string
+	listener *npipe.PipeListener
+	conn     net.Conn
+}
+
+/*
+ListenNpipe
+
+	@Description: 创建命名管道文件,返回命名管道对象
+	@param  agent_type:   from
+	@param  to_type:      to
+	@param  isMaster:     是否为daemon创建的
+	@param  single:       是否为单文件模式
+	@param  rootPath:     命名管道文件夹路径
+	@return *NamedPipe4U: 命名管道对象
+	@return error:        错误信息
+*/
+func ListenNpipe(agent_type, to_type string, isMaster, single bool, rootPath string) (*NamedPipe4W, error) {
+
+	np, err := newNamePipe(agent_type, to_type, isMaster, single, rootPath)
+	if err != nil {
+		return nil, err
+	}
+	np4w := &NamedPipe4W{namedPipe: np}
+	if err := np4w.initPipe(); err != nil {
+		return nil, err
+	}
+	return np4w, nil
+}
+
+func (np *NamedPipe4W) getPipePath() (daemon2subPipe, sub2DaemonPipe string, err error) {
+	daemon2subPipe = fmt.Sprintf("%s%s2%s", pipeNameDaemonAndSubPattern, np.fromType, np.toType)
+	sub2DaemonPipe = fmt.Sprintf("%s%s2%s", pipeNameDaemonAndSubPattern, np.toType, np.fromType)
+	return daemon2subPipe, sub2DaemonPipe, nil
+}
+
+func (np *NamedPipe4W) initPipe() error {
+
+	daemon2subPipe, sub2DaemonPipe, err := np.getPipePath()
+
+	if err != nil {
+		return err
+	}
+
+	if np.isDaemon {
+		// daemon进程进行建连
+		ln, err := npipe.Listen(daemon2subPipe)
+		if err != nil {
+			return err
+		}
+		np.listener = ln
+		// 过程可能会阻塞
+		conn, err := ln.Accept()
+		if err != nil {
+			return err
+		}
+		np.conn = conn
+		np.recvPipe = bufio.NewReader(conn)
+		np.sendPipe = bufio.NewWriter(conn)
+	} else {
+		conn, err := npipe.Dial(sub2DaemonPipe)
+		if err != nil {
+			return err
+		}
+		np.conn = conn
+		np.sendPipe = bufio.NewWriter(conn)
+		np.recvPipe = bufio.NewReader(conn)
+	}
+	return nil
+}
+
+// only called when io.EOF occurs
+func (np *NamedPipe4W) FreeUp4EOF() {
+	if !atomic.CompareAndSwapUint32(&np.quitFlag, 0, 1) {
+		log.WithField("module", "namedPipe_Win").Warnf("named pipe(%s) already free", np.pipePath)
+		return
+	}
+	log.WithField("module", "namedPipe_Win").Infof("named pipe(%s) free begin", np.pipePath)
+	np.wg.Wait()
+
+	conn := np.conn
+	np.conn = nil
+	if conn != nil {
+		if err := conn.Close(); err != nil {
+			log.WithField("module", "namedPipe_Win").Errorf("close named pipe(%s) conn error: %v", np.pipePath, err)
+		}
+	}
+
+	listener := np.listener
+	np.listener = nil
+	if listener != nil {
+		if err := listener.Close(); err != nil {
+			log.WithField("module", "namedPipe_Win").Errorf("close named pipe(%s) listener error: %v", np.pipePath, err)
+		}
+	}
+
+	np.sendPipe = nil
+	np.recvPipe = nil
+	log.WithField("module", "namedPipe_Win").Infof("named pipe free(%s) end", np.pipePath)
+}

+ 205 - 0
utils/namedpipe/namedpipe_ctl.go

@@ -0,0 +1,205 @@
+package namedpipe
+
+import (
+	"fmt"
+	"github.com/coroot/coroot-node-agent/containers"
+	"github.com/coroot/coroot-node-agent/utils"
+	"github.com/coroot/coroot-node-agent/utils/enums"
+	log "github.com/sirupsen/logrus"
+	"io"
+	"path/filepath"
+	"runtime/debug"
+	"sync"
+	"time"
+)
+
+type NamedPipeCtl struct {
+	mtx       *sync.Mutex
+	stopChan  chan struct{}
+	isClosed  bool
+	namedPipe NamedPipe
+}
+
+func NewNamedPipeCtl(stopChan chan struct{}) (*NamedPipeCtl, error) {
+	ctl := &NamedPipeCtl{
+		mtx:      new(sync.Mutex),
+		stopChan: stopChan,
+	}
+	err := ctl.initNamedPipe(enums.TimeoutNamedPipe3S)
+	if err != nil {
+		return nil, err
+	}
+	return ctl, nil
+}
+
+func (ctl *NamedPipeCtl) initNamedPipe(withTimeout int) error {
+
+	if stop, reason := ctl.checkIfToStop(); stop {
+		log.Warnf("stop try to listen namedpipe. reason: %s", reason)
+		return fmt.Errorf("stop to listen namedpipe [%s]", reason)
+	}
+
+	if !ctl.mtx.TryLock() {
+		return fmt.Errorf("listen namedpipe is already in progress")
+	}
+
+	ctl.namedPipe = nil
+	errChan := make(chan error)
+
+	var pipeConnectTimeouted = false
+	var t *time.Timer
+	t = time.NewTimer(time.Duration(withTimeout) * time.Second)
+	if withTimeout <= 0 {
+		t.C = nil
+	}
+	defer utils.StopTimerWithDrainChan(t)
+
+	go func() {
+		defer ctl.recoverAndLogError("ListenNpipe")
+		defer ctl.mtx.Unlock()
+		log.Infof("namedpipectl to listen namedpipe")
+		//RootPath => /opt/cloudwise/omniagent/agents/euspace/current/
+		//namedpipe : euspace2cw-oneagent & cw-oneagent2euspace => /opt/cloudwise/omniagent/runtime/
+		p, err := ListenNpipe(enums.AgentNameEuspace, enums.AgentNameOneAgent, false, false, filepath.Join(filepath.Dir(filepath.Dir(filepath.Dir(utils.GetRootPath()))), "runtime"))
+		if err != nil || p == nil {
+			log.Errorf("namedpipectl to listen namedpipe,occurs error:%s ", err.Error())
+			errChan <- fmt.Errorf("namedpipectl to listen namedpipe error: %v", err.Error())
+		} else {
+
+			if pipeConnectTimeouted {
+				log.Infof("free up namedpipe. reason: namedpipectl do pipe connect timeouted")
+				if p != nil {
+					p.FreeUp4EOF()
+				}
+				return
+			}
+
+			if stop, reason := ctl.checkIfToStop(); stop {
+				log.Infof("to free up namedpipe. reason: %s", reason)
+				if p != nil {
+					p.FreeUp4EOF()
+				}
+				errChan <- fmt.Errorf("%s,freed namedpipe", reason)
+				return
+			}
+
+			log.Infof("namedpipectl listen namedpipe success")
+			ctl.namedPipe = p
+			errChan <- nil
+		}
+	}()
+
+	select {
+	case err := <-errChan:
+		return err
+	case <-t.C:
+		log.Errorf("namedpipectl to listen namedpipe timeout (%v s)", withTimeout)
+		//ctl.Close()
+		pipeConnectTimeouted = true
+		return fmt.Errorf("namedpipectl to listen namedpipe timeout")
+	case <-ctl.stopChan:
+		log.Infof("euspace stopping,close namedpipectl")
+		ctl.Close()
+		return fmt.Errorf("euspace was stoped")
+	}
+}
+
+func (ctl *NamedPipeCtl) Close() {
+	ctl.mtx.Lock()
+	defer ctl.mtx.Unlock()
+	if ctl.isClosed {
+		log.Warn("namedPipeCtl Close, has been closed")
+		return
+	}
+	ctl.isClosed = true
+	if ctl.namedPipe != nil {
+		ctl.namedPipe.FreeUp4EOF()
+		ctl.namedPipe = nil
+		log.Info("namedPipeCtl Close, close namedPipe success")
+	} else {
+		log.Warn("namedPipeCtl Close, namedPipe is nil, do nothing")
+	}
+}
+
+func (ctl *NamedPipeCtl) recoverAndLogError(target string) {
+	if i := recover(); i != nil {
+		//获取错误消息
+		eMsg := ""
+		switch v := i.(type) {
+		case string:
+			eMsg = v
+		case error:
+			eMsg = v.Error()
+		default:
+			eMsg = fmt.Sprintf("%+v", v)
+		}
+		sMsg := string(debug.Stack())
+		log.Errorf("goroutine[ %s ] panic: %s ,stack: %s", target, eMsg, sMsg)
+	}
+}
+
+func (ctl *NamedPipeCtl) checkIfToStop() (bool, string) {
+	if ctl.stopChan != nil {
+		select {
+		case <-ctl.stopChan:
+			return true, "euspace stopping"
+		default:
+		}
+	}
+	if ctl.isClosed {
+		return true, "namedPipeCtl closed"
+	}
+	return false, ""
+}
+
+func (ctl *NamedPipeCtl) AcceptAndDisposeMsg(reg *containers.Registry) {
+	log.Info("AcceptAndDisposeMsg, start")
+	go func() {
+		defer func() {
+			ctl.recoverAndLogError("AcceptAndDisposeMsg")
+			log.Info("AcceptAndDisposeMsg, stop")
+		}()
+
+		for {
+
+			if stop, reason := ctl.checkIfToStop(); stop {
+				log.Warnf("AcceptAndDisposeMsg,stop accept msg. reason: %s", reason)
+				return
+			}
+
+			if ctl.namedPipe == nil {
+				if err := ctl.initNamedPipe(0); err != nil {
+					log.Errorf("AcceptAndDisposeMsg,namedPipe is nil re-initNamedPipe failed [retry after 1 S], error: %s", err.Error())
+					time.Sleep(1 * time.Second)
+					continue
+				}
+			}
+
+			if msg, err := ctl.namedPipe.Accept(); err != nil {
+				if err == io.EOF {
+					log.Warnf("AcceptAndDisposeMsg, get io.EOF to free up namedPipe")
+					ctl.mtx.Lock()
+					ctl.namedPipe.FreeUp4EOF()
+					ctl.namedPipe = nil
+					ctl.mtx.Unlock()
+				} else {
+					log.Errorf("AcceptAndDisposeMsg occurs error: %s", err.Error())
+				}
+			} else {
+				if stop, reason := ctl.checkIfToStop(); stop {
+					log.Warnf("AcceptAndDisposeMsg,stop dispose msg.reason: %s, msg:%#v,", reason, msg)
+					return
+				}
+
+				switch msg.Type {
+				case MsgTypeFusing:
+					reg.DoFusing()
+				case MsgTypeResume:
+					if err := reg.DoResume(); err != nil {
+						log.Fatalf("AcceptAndDisposeMsg DoResume occurs error: %s", err.Error())
+					}
+				}
+			}
+		}
+	}()
+}

+ 19 - 0
utils/util.go

@@ -945,3 +945,22 @@ func GetSoPath(pid uint32, soname string, rootfs string) (string, error) {
 	}
 	return "", fmt.Errorf("library %s not found in process.", soname)
 }
+
+func StopTimerWithDrainChan(timer *time.Timer) {
+	if !timer.Stop() {
+		select {
+		case <-timer.C:
+		default:
+		}
+	}
+}
+
+func ResetTimerWithDrainChan(timer *time.Timer, d time.Duration) {
+	if !timer.Stop() {
+		select {
+		case <-timer.C:
+		default:
+		}
+	}
+	timer.Reset(d)
+}