| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- package namedpipe
- import (
- "bufio"
- "bytes"
- "encoding/json"
- "errors"
- "fmt"
- log "github.com/sirupsen/logrus"
- "io"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- )
- type NamedPipe interface {
- AcceptRaw() ([]byte, error)
- Accept() (MsgInfo, error)
- WriteRaw(b []byte) (n int, err error)
- Write(msg string, msgType string) (int, error)
- FreeUp4EOF()
- }
- type namedPipe struct {
- // opMtx *sync.Mutex
- // quitC chan int
- quitFlag uint32
- wg *sync.WaitGroup
- fromType string
- toType string
- isDaemon bool
- single bool
- sendPipe *bufio.Writer
- recvPipe *bufio.Reader
- rootPath string
- }
- type MsgInfo struct {
- From string `json:"from"`
- Time int64 `json:"time"`
- Type string `json:"type"`
- Content string `json:"content"`
- }
- const (
- msgSeparator = '\n'
- MsgTypeFusing = "fusing"
- MsgTypeResume = "resume"
- )
- var (
- PipeCloseError = errors.New("RecvFromPipe,named-pipe was closed")
- JsonUnmarshalError = errors.New("json unmarshal error")
- )
- /*
- newNamePipe
- @Description: 返回命名管道对象
- @param agent_type: from
- @param to_type: to
- @param isDaemon: 是否为daemon创建的
- @param single: 是否为单文件模式
- @param rootPath: 命名管道文件夹路径
- @return *namedPipe: 命名管道对象
- @return error: 错误信息
- */
- func newNamePipe(agent_type, to_type string, isDaemon, single bool, rootPath string) (*namedPipe, error) {
- return &namedPipe{
- wg: new(sync.WaitGroup),
- fromType: agent_type,
- toType: to_type,
- isDaemon: isDaemon,
- single: single,
- rootPath: rootPath,
- }, nil
- }
- /*
- Accept
- @Description: 读取操作
- @return MsgInfo: 消息对象
- @return error: 错误信息
- */
- func (np *namedPipe) Accept() (MsgInfo, error) {
- mi := MsgInfo{}
- // 是否已经关闭pipe资源
- if atomic.LoadUint32(&np.quitFlag) == 0 {
- np.wg.Add(1)
- defer np.wg.Done()
- defer func() {
- if err := recover(); err != nil {
- log.WithField("module", "namedPipe").Errorf("holy name pipe error, %v", err)
- }
- }()
- // 信号量计数后,再判断一次quitFlag。防止信号量未计数时,已经关闭了pipe资源。
- if atomic.LoadUint32(&np.quitFlag) == 0 {
- recvPipe := np.recvPipe
- if recvPipe == nil {
- return mi, PipeCloseError
- }
- if msg, err := recvPipe.ReadString(msgSeparator); err == nil {
- msg = strings.TrimRight(msg, string(msgSeparator))
- if err := json.Unmarshal([]byte(msg), &mi); err != nil {
- return mi, JsonUnmarshalError
- }
- return mi, nil
- } else {
- return mi, err
- }
- }
- }
- return mi, PipeCloseError
- }
- /*
- AcceptRaw
- @Description: 读取原始消息
- @return []byte: 原始消息
- @return error: 错误信息
- */
- func (np *namedPipe) AcceptRaw() ([]byte, error) {
- var raw []byte
- // 是否已经关闭pipe资源
- if atomic.LoadUint32(&np.quitFlag) == 0 {
- np.wg.Add(1)
- defer np.wg.Done()
- // 信号量计数后,再判断一次quitFlag。防止信号量未计数时,已经关闭了pipe资源。
- if atomic.LoadUint32(&np.quitFlag) == 0 {
- recvPipe := np.recvPipe
- if recvPipe == nil {
- return raw, errors.New("RecvFromPipe,named-pipe was closed")
- }
- if msg, err := recvPipe.ReadBytes(msgSeparator); err == nil {
- msg = bytes.TrimRight(msg, string(msgSeparator))
- return msg, nil
- } else {
- if err != io.EOF {
- err = errors.New(fmt.Sprintf("RecvFromPipe,recv msg occours error: %s", err.Error()))
- }
- return raw, err
- }
- }
- }
- return raw, errors.New("RecvFromPipe,named-pipe was closed")
- }
- /*
- Write
- @Description: 写入操作
- @param msg: 管道消息内容
- @param msgType: 管道消息类型
- @return int: 写入字节数
- @return error: 错误信息
- */
- func (np *namedPipe) Write(msg string, msgType string) (int, error) {
- // 是否已经关闭pipe资源
- if atomic.LoadUint32(&np.quitFlag) == 0 {
- np.wg.Add(1)
- defer np.wg.Done()
- if atomic.LoadUint32(&np.quitFlag) == 0 {
- if !(msgType == MsgTypeResume || msgType == MsgTypeFusing) {
- return 0, errors.New(fmt.Sprintf("WriteToPipe,invalid msg type: %s", msgType))
- }
- sendPipe := np.sendPipe
- if sendPipe == nil {
- return 0, errors.New("WriteToPipe,named-pipe was closed")
- }
- mi := MsgInfo{
- From: np.fromType,
- Time: time.Now().Unix(),
- Type: msgType,
- Content: strings.Trim(msg, string(msgSeparator)),
- }
- byteSli, _ := json.Marshal(mi)
- // msg = string(byteSli)+ string(msgSeparator)
- byteSli = append(byteSli, msgSeparator)
- if n, err := sendPipe.Write(byteSli); err == nil {
- if err := sendPipe.Flush(); err != nil {
- return n, errors.New(fmt.Sprintf("WriteToPipe,Flush buffer occours error: %s", err.Error()))
- }
- return n, nil
- } else {
- return 0, errors.New(fmt.Sprintf("WriteToPipe,Write msg occours error: %s", err.Error()))
- }
- }
- }
- return 0, errors.New("WriteToPipe,named-pipe was closed")
- }
- /*
- WriteRaw
- @Description: 原始消息 写入管道
- @param msg: 管道消息内容
- @return int: 写入字节数
- @return error: 错误信息
- */
- func (np *namedPipe) WriteRaw(msg []byte) (int, error) {
- // 是否已经关闭pipe资源
- if atomic.LoadUint32(&np.quitFlag) == 0 {
- np.wg.Add(1)
- defer np.wg.Done()
- if atomic.LoadUint32(&np.quitFlag) == 0 {
- sendPipe := np.sendPipe
- if sendPipe == nil {
- return 0, errors.New("WriteToPipe,named-pipe was closed")
- }
- msg = append(msg, msgSeparator)
- if n, err := sendPipe.Write(msg); err == nil {
- if err := sendPipe.Flush(); err != nil {
- return n, errors.New(fmt.Sprintf("WriteToPipe,Flush buffer occours error: %s", err.Error()))
- }
- return n, nil
- } else {
- return 0, errors.New(fmt.Sprintf("WriteToPipe,Write msg occours error: %s", err.Error()))
- }
- }
- }
- return 0, errors.New("WriteToPipe,named-pipe was closed")
- }
|