package namedpipe import ( "bufio" "bytes" "encoding/json" "errors" "fmt" log "github.com/sirupsen/logrus" "io" "strings" "sync" "sync/atomic" "time" ) type NamedPipe interface { AcceptRaw() ([]byte, error) Accept() (MsgInfo, error) WriteRaw(b []byte) (n int, err error) Write(msg string, msgType string) (int, error) FreeUp4EOF() } type namedPipe struct { // opMtx *sync.Mutex // quitC chan int quitFlag uint32 wg *sync.WaitGroup fromType string toType string isDaemon bool single bool sendPipe *bufio.Writer recvPipe *bufio.Reader rootPath string } type MsgInfo struct { From string `json:"from"` Time int64 `json:"time"` Type string `json:"type"` Content string `json:"content"` } const ( msgSeparator = '\n' MsgTypeFusing = "fusing" MsgTypeResume = "resume" ) var ( PipeCloseError = errors.New("RecvFromPipe,named-pipe was closed") JsonUnmarshalError = errors.New("json unmarshal error") ) /* newNamePipe @Description: 返回命名管道对象 @param agent_type: from @param to_type: to @param isDaemon: 是否为daemon创建的 @param single: 是否为单文件模式 @param rootPath: 命名管道文件夹路径 @return *namedPipe: 命名管道对象 @return error: 错误信息 */ func newNamePipe(agent_type, to_type string, isDaemon, single bool, rootPath string) (*namedPipe, error) { return &namedPipe{ wg: new(sync.WaitGroup), fromType: agent_type, toType: to_type, isDaemon: isDaemon, single: single, rootPath: rootPath, }, nil } /* Accept @Description: 读取操作 @return MsgInfo: 消息对象 @return error: 错误信息 */ func (np *namedPipe) Accept() (MsgInfo, error) { mi := MsgInfo{} // 是否已经关闭pipe资源 if atomic.LoadUint32(&np.quitFlag) == 0 { np.wg.Add(1) defer np.wg.Done() defer func() { if err := recover(); err != nil { log.WithField("module", "namedPipe").Errorf("holy name pipe error, %v", err) } }() // 信号量计数后,再判断一次quitFlag。防止信号量未计数时,已经关闭了pipe资源。 if atomic.LoadUint32(&np.quitFlag) == 0 { recvPipe := np.recvPipe if recvPipe == nil { return mi, PipeCloseError } if msg, err := recvPipe.ReadString(msgSeparator); err == nil { msg = strings.TrimRight(msg, string(msgSeparator)) if err := json.Unmarshal([]byte(msg), &mi); err != nil { return mi, JsonUnmarshalError } return mi, nil } else { return mi, err } } } return mi, PipeCloseError } /* AcceptRaw @Description: 读取原始消息 @return []byte: 原始消息 @return error: 错误信息 */ func (np *namedPipe) AcceptRaw() ([]byte, error) { var raw []byte // 是否已经关闭pipe资源 if atomic.LoadUint32(&np.quitFlag) == 0 { np.wg.Add(1) defer np.wg.Done() // 信号量计数后,再判断一次quitFlag。防止信号量未计数时,已经关闭了pipe资源。 if atomic.LoadUint32(&np.quitFlag) == 0 { recvPipe := np.recvPipe if recvPipe == nil { return raw, errors.New("RecvFromPipe,named-pipe was closed") } if msg, err := recvPipe.ReadBytes(msgSeparator); err == nil { msg = bytes.TrimRight(msg, string(msgSeparator)) return msg, nil } else { if err != io.EOF { err = errors.New(fmt.Sprintf("RecvFromPipe,recv msg occours error: %s", err.Error())) } return raw, err } } } return raw, errors.New("RecvFromPipe,named-pipe was closed") } /* Write @Description: 写入操作 @param msg: 管道消息内容 @param msgType: 管道消息类型 @return int: 写入字节数 @return error: 错误信息 */ func (np *namedPipe) Write(msg string, msgType string) (int, error) { // 是否已经关闭pipe资源 if atomic.LoadUint32(&np.quitFlag) == 0 { np.wg.Add(1) defer np.wg.Done() if atomic.LoadUint32(&np.quitFlag) == 0 { if !(msgType == MsgTypeResume || msgType == MsgTypeFusing) { return 0, errors.New(fmt.Sprintf("WriteToPipe,invalid msg type: %s", msgType)) } sendPipe := np.sendPipe if sendPipe == nil { return 0, errors.New("WriteToPipe,named-pipe was closed") } mi := MsgInfo{ From: np.fromType, Time: time.Now().Unix(), Type: msgType, Content: strings.Trim(msg, string(msgSeparator)), } byteSli, _ := json.Marshal(mi) // msg = string(byteSli)+ string(msgSeparator) byteSli = append(byteSli, msgSeparator) if n, err := sendPipe.Write(byteSli); err == nil { if err := sendPipe.Flush(); err != nil { return n, errors.New(fmt.Sprintf("WriteToPipe,Flush buffer occours error: %s", err.Error())) } return n, nil } else { return 0, errors.New(fmt.Sprintf("WriteToPipe,Write msg occours error: %s", err.Error())) } } } return 0, errors.New("WriteToPipe,named-pipe was closed") } /* WriteRaw @Description: 原始消息 写入管道 @param msg: 管道消息内容 @return int: 写入字节数 @return error: 错误信息 */ func (np *namedPipe) WriteRaw(msg []byte) (int, error) { // 是否已经关闭pipe资源 if atomic.LoadUint32(&np.quitFlag) == 0 { np.wg.Add(1) defer np.wg.Done() if atomic.LoadUint32(&np.quitFlag) == 0 { sendPipe := np.sendPipe if sendPipe == nil { return 0, errors.New("WriteToPipe,named-pipe was closed") } msg = append(msg, msgSeparator) if n, err := sendPipe.Write(msg); err == nil { if err := sendPipe.Flush(); err != nil { return n, errors.New(fmt.Sprintf("WriteToPipe,Flush buffer occours error: %s", err.Error())) } return n, nil } else { return 0, errors.New(fmt.Sprintf("WriteToPipe,Write msg occours error: %s", err.Error())) } } } return 0, errors.New("WriteToPipe,named-pipe was closed") }