| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- //go:build linux || darwin
- // +build linux darwin
- package namedpipe
- import (
- "bufio"
- "fmt"
- log "github.com/sirupsen/logrus"
- "os"
- "sync/atomic"
- "syscall"
- )
- const ONEAGENT = "cw-oneagent"
- const NETFLOW = "cw-netflow"
- const SERVERAGENT = "cw-serveragent"
- const MASTER = true
- const AGENT = false
- type NamedPipe4U struct {
- *namedPipe
- sendPipeFileHandler *os.File
- recvPipeFileHandler *os.File
- sendPipePath string
- recvPipePath string
- }
- /*
- ListenNpipe
- @Description: 创建命名管道文件,返回命名管道对象
- @param agent_type: from
- @param to_type: to
- @param isMaster: 是否为daemon创建的
- @param single: 是否为单文件模式
- @param rootPath: 命名管道文件夹路径
- @return *NamedPipe4U: 命名管道对象
- @return error: 错误信息
- */
- func ListenNpipe(agent_type, to_type string, isMaster, single bool, rootPath string) (*NamedPipe4U, error) {
- np, err := newNamePipe(agent_type, to_type, isMaster, single, rootPath)
- if err != nil {
- return nil, err
- }
- np4u := &NamedPipe4U{namedPipe: np}
- if err := np4u.initPipe(); err != nil {
- return nil, err
- }
- return np4u, nil
- }
- func (np *NamedPipe4U) getPipePath() (daemon2subPipe, sub2DaemonPipe string, err error) {
- basePath := np.rootPath + "/"
- //确认目录存在
- if _, err := os.Stat(basePath); os.IsNotExist(err) {
- if err := os.MkdirAll(basePath, 0755); err != nil {
- panic(fmt.Sprintf("mkdir for named-pipe failed . path:%v ,error:%v", basePath, err.Error()))
- }
- }
- if np.single {
- singlePipe := basePath + np.fromType
- return singlePipe, singlePipe, nil
- }
- daemon2subPipe = basePath + fmt.Sprintf("%s2%s", np.fromType, np.toType)
- sub2DaemonPipe = basePath + fmt.Sprintf("%s2%s", np.toType, np.fromType)
- return daemon2subPipe, sub2DaemonPipe, err
- }
- func (np *NamedPipe4U) initPipe() error {
- var pipeFiles []string
- daemon2subPipe, sub2DaemonPipe, err := np.getPipePath()
- if err != nil {
- return err
- }
- if np.single {
- pipeFiles = []string{daemon2subPipe}
- } else {
- pipeFiles = []string{daemon2subPipe, sub2DaemonPipe}
- }
- for _, pf := range pipeFiles {
- if _, err := os.Lstat(pf); os.IsNotExist(err) == true {
- err = syscall.Mkfifo(pf, 0666)
- if err != nil {
- return err
- }
- } else if err != nil {
- return err
- }
- }
- //根据角色指定pipe的路径(角色不同路径不同)
- if np.isDaemon {
- //daemon进程Open顺序 和 net进程Open顺序 相反
- np.sendPipePath = daemon2subPipe
- np.recvPipePath = sub2DaemonPipe
- //OpenFile过程可能会阻塞
- if sf, err := os.OpenFile(np.sendPipePath, os.O_WRONLY|os.O_CREATE, os.ModeNamedPipe|0666); err == nil {
- np.sendPipeFileHandler = sf
- np.sendPipe = bufio.NewWriter(sf)
- } else {
- log.WithField("module", "namedPipe_Unix").Errorf("open send pipe(%s) error: %v", np.sendPipePath, err)
- return err
- }
- if rf, err := os.OpenFile(np.recvPipePath, os.O_RDONLY|os.O_CREATE, os.ModeNamedPipe|0666); err == nil {
- np.recvPipeFileHandler = rf
- np.recvPipe = bufio.NewReader(rf)
- } else {
- log.WithField("module", "namedPipe_Unix").Errorf("open recv pipe(%s) error: %v", np.recvPipePath, err)
- sf := np.sendPipeFileHandler
- np.sendPipeFileHandler = nil
- np.sendPipe = nil
- if sf != nil {
- if errClose := sf.Close(); errClose != nil {
- log.WithField("module", "namedPipe_Unix").Errorf("close send pipe(%s) error:%v", np.sendPipePath, errClose)
- }
- }
- return err
- }
- } else {
- //net进程Open顺序和daemon进程Open顺序相反
- np.sendPipePath = daemon2subPipe
- np.recvPipePath = sub2DaemonPipe
- //OpenFile过程可能会阻塞
- if rf, err := os.OpenFile(np.recvPipePath, os.O_RDONLY|os.O_CREATE, os.ModeNamedPipe|0666); err == nil {
- np.recvPipeFileHandler = rf
- np.recvPipe = bufio.NewReader(rf)
- } else {
- log.WithField("module", "namedPipe_Unix").Errorf("open recv pipe(%s) error: %v", np.recvPipePath, err)
- return err
- }
- if sf, err := os.OpenFile(np.sendPipePath, os.O_WRONLY|os.O_CREATE, os.ModeNamedPipe|0666); err == nil {
- np.sendPipeFileHandler = sf
- np.sendPipe = bufio.NewWriter(sf)
- } else {
- log.WithField("module", "namedPipe_Unix").Errorf("open send pipe(%s) error: %v", np.sendPipePath, err)
- rf := np.recvPipeFileHandler
- np.recvPipeFileHandler = nil
- np.recvPipe = nil
- if rf != nil {
- if errClose := rf.Close(); errClose != nil {
- log.WithField("module", "namedPipe_Unix").Errorf("close recv pipe(%s) error:%v", np.recvPipePath, errClose)
- }
- }
- return err
- }
- }
- return nil
- }
- // FreeUp4EOF only called when io.EOF occurs
- func (np *NamedPipe4U) FreeUp4EOF() {
- if !atomic.CompareAndSwapUint32(&np.quitFlag, 0, 1) {
- log.WithField("module", "namedPipe_Unix").Warnf("named pipe(%s)(%s) already free", np.sendPipePath, np.recvPipePath)
- return
- }
- log.WithField("module", "namedPipe_Unix").Infof("named pipe free(%s)(%s) begin", np.sendPipePath, np.recvPipePath)
- sendPipeFileHandler := np.sendPipeFileHandler
- np.sendPipeFileHandler = nil
- if sendPipeFileHandler != nil {
- err := sendPipeFileHandler.Close()
- if err != nil {
- log.WithField("module", "namedPipe_Unix").Errorf("close send pipe(%s) failed. error:%v", np.sendPipePath, err)
- }
- }
- recvPipeFileHandler := np.recvPipeFileHandler
- np.recvPipeFileHandler = nil
- if recvPipeFileHandler != nil {
- err := recvPipeFileHandler.Close()
- if err != nil {
- log.WithField("module", "namedPipe_Unix").Errorf("close recv pipe(%s) failed. error:%v", np.recvPipePath, err)
- }
- }
- np.sendPipe = nil
- np.recvPipe = nil
- log.WithField("module", "namedPipe_Unix").Infof("named pipe(%s)(%s) free end", np.sendPipePath, np.recvPipePath)
- }
|