//go:build linux || darwin // +build linux darwin package namedpipe import ( "bufio" "fmt" log "github.com/sirupsen/logrus" "os" "sync/atomic" "syscall" ) const ONEAGENT = "cw-oneagent" const NETFLOW = "cw-netflow" const SERVERAGENT = "cw-serveragent" const MASTER = true const AGENT = false type NamedPipe4U struct { *namedPipe sendPipeFileHandler *os.File recvPipeFileHandler *os.File sendPipePath string recvPipePath string } /* ListenNpipe @Description: 创建命名管道文件,返回命名管道对象 @param agent_type: from @param to_type: to @param isMaster: 是否为daemon创建的 @param single: 是否为单文件模式 @param rootPath: 命名管道文件夹路径 @return *NamedPipe4U: 命名管道对象 @return error: 错误信息 */ func ListenNpipe(agent_type, to_type string, isMaster, single bool, rootPath string) (*NamedPipe4U, error) { np, err := newNamePipe(agent_type, to_type, isMaster, single, rootPath) if err != nil { return nil, err } np4u := &NamedPipe4U{namedPipe: np} if err := np4u.initPipe(); err != nil { return nil, err } return np4u, nil } func (np *NamedPipe4U) getPipePath() (daemon2subPipe, sub2DaemonPipe string, err error) { basePath := np.rootPath + "/" //确认目录存在 if _, err := os.Stat(basePath); os.IsNotExist(err) { if err := os.MkdirAll(basePath, 0755); err != nil { panic(fmt.Sprintf("mkdir for named-pipe failed . path:%v ,error:%v", basePath, err.Error())) } } if np.single { singlePipe := basePath + np.fromType return singlePipe, singlePipe, nil } daemon2subPipe = basePath + fmt.Sprintf("%s2%s", np.fromType, np.toType) sub2DaemonPipe = basePath + fmt.Sprintf("%s2%s", np.toType, np.fromType) return daemon2subPipe, sub2DaemonPipe, err } func (np *NamedPipe4U) initPipe() error { var pipeFiles []string daemon2subPipe, sub2DaemonPipe, err := np.getPipePath() if err != nil { return err } if np.single { pipeFiles = []string{daemon2subPipe} } else { pipeFiles = []string{daemon2subPipe, sub2DaemonPipe} } for _, pf := range pipeFiles { if _, err := os.Lstat(pf); os.IsNotExist(err) == true { err = syscall.Mkfifo(pf, 0666) if err != nil { return err } } else if err != nil { return err } } //根据角色指定pipe的路径(角色不同路径不同) if np.isDaemon { //daemon进程Open顺序 和 net进程Open顺序 相反 np.sendPipePath = daemon2subPipe np.recvPipePath = sub2DaemonPipe //OpenFile过程可能会阻塞 if sf, err := os.OpenFile(np.sendPipePath, os.O_WRONLY|os.O_CREATE, os.ModeNamedPipe|0666); err == nil { np.sendPipeFileHandler = sf np.sendPipe = bufio.NewWriter(sf) } else { log.WithField("module", "namedPipe_Unix").Errorf("open send pipe(%s) error: %v", np.sendPipePath, err) return err } if rf, err := os.OpenFile(np.recvPipePath, os.O_RDONLY|os.O_CREATE, os.ModeNamedPipe|0666); err == nil { np.recvPipeFileHandler = rf np.recvPipe = bufio.NewReader(rf) } else { log.WithField("module", "namedPipe_Unix").Errorf("open recv pipe(%s) error: %v", np.recvPipePath, err) sf := np.sendPipeFileHandler np.sendPipeFileHandler = nil np.sendPipe = nil if sf != nil { if errClose := sf.Close(); errClose != nil { log.WithField("module", "namedPipe_Unix").Errorf("close send pipe(%s) error:%v", np.sendPipePath, errClose) } } return err } } else { //net进程Open顺序和daemon进程Open顺序相反 np.sendPipePath = daemon2subPipe np.recvPipePath = sub2DaemonPipe //OpenFile过程可能会阻塞 if rf, err := os.OpenFile(np.recvPipePath, os.O_RDONLY|os.O_CREATE, os.ModeNamedPipe|0666); err == nil { np.recvPipeFileHandler = rf np.recvPipe = bufio.NewReader(rf) } else { log.WithField("module", "namedPipe_Unix").Errorf("open recv pipe(%s) error: %v", np.recvPipePath, err) return err } if sf, err := os.OpenFile(np.sendPipePath, os.O_WRONLY|os.O_CREATE, os.ModeNamedPipe|0666); err == nil { np.sendPipeFileHandler = sf np.sendPipe = bufio.NewWriter(sf) } else { log.WithField("module", "namedPipe_Unix").Errorf("open send pipe(%s) error: %v", np.sendPipePath, err) rf := np.recvPipeFileHandler np.recvPipeFileHandler = nil np.recvPipe = nil if rf != nil { if errClose := rf.Close(); errClose != nil { log.WithField("module", "namedPipe_Unix").Errorf("close recv pipe(%s) error:%v", np.recvPipePath, errClose) } } return err } } return nil } // FreeUp4EOF only called when io.EOF occurs func (np *NamedPipe4U) FreeUp4EOF() { if !atomic.CompareAndSwapUint32(&np.quitFlag, 0, 1) { log.WithField("module", "namedPipe_Unix").Warnf("named pipe(%s)(%s) already free", np.sendPipePath, np.recvPipePath) return } log.WithField("module", "namedPipe_Unix").Infof("named pipe free(%s)(%s) begin", np.sendPipePath, np.recvPipePath) sendPipeFileHandler := np.sendPipeFileHandler np.sendPipeFileHandler = nil if sendPipeFileHandler != nil { err := sendPipeFileHandler.Close() if err != nil { log.WithField("module", "namedPipe_Unix").Errorf("close send pipe(%s) failed. error:%v", np.sendPipePath, err) } } recvPipeFileHandler := np.recvPipeFileHandler np.recvPipeFileHandler = nil if recvPipeFileHandler != nil { err := recvPipeFileHandler.Close() if err != nil { log.WithField("module", "namedPipe_Unix").Errorf("close recv pipe(%s) failed. error:%v", np.recvPipePath, err) } } np.sendPipe = nil np.recvPipe = nil log.WithField("module", "namedPipe_Unix").Infof("named pipe(%s)(%s) free end", np.sendPipePath, np.recvPipePath) }