name_pipe.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package namedpipe
  2. import (
  3. "bufio"
  4. "bytes"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. log "github.com/sirupsen/logrus"
  9. "io"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. )
  15. type NamedPipe interface {
  16. AcceptRaw() ([]byte, error)
  17. Accept() (MsgInfo, error)
  18. WriteRaw(b []byte) (n int, err error)
  19. Write(msg string, msgType string) (int, error)
  20. FreeUp4EOF()
  21. }
  22. type namedPipe struct {
  23. // opMtx *sync.Mutex
  24. // quitC chan int
  25. quitFlag uint32
  26. wg *sync.WaitGroup
  27. fromType string
  28. toType string
  29. isDaemon bool
  30. single bool
  31. sendPipe *bufio.Writer
  32. recvPipe *bufio.Reader
  33. rootPath string
  34. }
  35. type MsgInfo struct {
  36. From string `json:"from"`
  37. Time int64 `json:"time"`
  38. Type string `json:"type"`
  39. Content string `json:"content"`
  40. }
  41. const (
  42. msgSeparator = '\n'
  43. MsgTypeFusing = "fusing"
  44. MsgTypeResume = "resume"
  45. )
  46. var (
  47. PipeCloseError = errors.New("RecvFromPipe,named-pipe was closed")
  48. JsonUnmarshalError = errors.New("json unmarshal error")
  49. )
  50. /*
  51. newNamePipe
  52. @Description: 返回命名管道对象
  53. @param agent_type: from
  54. @param to_type: to
  55. @param isDaemon: 是否为daemon创建的
  56. @param single: 是否为单文件模式
  57. @param rootPath: 命名管道文件夹路径
  58. @return *namedPipe: 命名管道对象
  59. @return error: 错误信息
  60. */
  61. func newNamePipe(agent_type, to_type string, isDaemon, single bool, rootPath string) (*namedPipe, error) {
  62. return &namedPipe{
  63. wg: new(sync.WaitGroup),
  64. fromType: agent_type,
  65. toType: to_type,
  66. isDaemon: isDaemon,
  67. single: single,
  68. rootPath: rootPath,
  69. }, nil
  70. }
  71. /*
  72. Accept
  73. @Description: 读取操作
  74. @return MsgInfo: 消息对象
  75. @return error: 错误信息
  76. */
  77. func (np *namedPipe) Accept() (MsgInfo, error) {
  78. mi := MsgInfo{}
  79. // 是否已经关闭pipe资源
  80. if atomic.LoadUint32(&np.quitFlag) == 0 {
  81. np.wg.Add(1)
  82. defer np.wg.Done()
  83. defer func() {
  84. if err := recover(); err != nil {
  85. log.WithField("module", "namedPipe").Errorf("holy name pipe error, %v", err)
  86. }
  87. }()
  88. // 信号量计数后,再判断一次quitFlag。防止信号量未计数时,已经关闭了pipe资源。
  89. if atomic.LoadUint32(&np.quitFlag) == 0 {
  90. recvPipe := np.recvPipe
  91. if recvPipe == nil {
  92. return mi, PipeCloseError
  93. }
  94. if msg, err := recvPipe.ReadString(msgSeparator); err == nil {
  95. msg = strings.TrimRight(msg, string(msgSeparator))
  96. if err := json.Unmarshal([]byte(msg), &mi); err != nil {
  97. return mi, JsonUnmarshalError
  98. }
  99. return mi, nil
  100. } else {
  101. return mi, err
  102. }
  103. }
  104. }
  105. return mi, PipeCloseError
  106. }
  107. /*
  108. AcceptRaw
  109. @Description: 读取原始消息
  110. @return []byte: 原始消息
  111. @return error: 错误信息
  112. */
  113. func (np *namedPipe) AcceptRaw() ([]byte, error) {
  114. var raw []byte
  115. // 是否已经关闭pipe资源
  116. if atomic.LoadUint32(&np.quitFlag) == 0 {
  117. np.wg.Add(1)
  118. defer np.wg.Done()
  119. // 信号量计数后,再判断一次quitFlag。防止信号量未计数时,已经关闭了pipe资源。
  120. if atomic.LoadUint32(&np.quitFlag) == 0 {
  121. recvPipe := np.recvPipe
  122. if recvPipe == nil {
  123. return raw, errors.New("RecvFromPipe,named-pipe was closed")
  124. }
  125. if msg, err := recvPipe.ReadBytes(msgSeparator); err == nil {
  126. msg = bytes.TrimRight(msg, string(msgSeparator))
  127. return msg, nil
  128. } else {
  129. if err != io.EOF {
  130. err = errors.New(fmt.Sprintf("RecvFromPipe,recv msg occours error: %s", err.Error()))
  131. }
  132. return raw, err
  133. }
  134. }
  135. }
  136. return raw, errors.New("RecvFromPipe,named-pipe was closed")
  137. }
  138. /*
  139. Write
  140. @Description: 写入操作
  141. @param msg: 管道消息内容
  142. @param msgType: 管道消息类型
  143. @return int: 写入字节数
  144. @return error: 错误信息
  145. */
  146. func (np *namedPipe) Write(msg string, msgType string) (int, error) {
  147. // 是否已经关闭pipe资源
  148. if atomic.LoadUint32(&np.quitFlag) == 0 {
  149. np.wg.Add(1)
  150. defer np.wg.Done()
  151. if atomic.LoadUint32(&np.quitFlag) == 0 {
  152. if !(msgType == MsgTypeResume || msgType == MsgTypeFusing) {
  153. return 0, errors.New(fmt.Sprintf("WriteToPipe,invalid msg type: %s", msgType))
  154. }
  155. sendPipe := np.sendPipe
  156. if sendPipe == nil {
  157. return 0, errors.New("WriteToPipe,named-pipe was closed")
  158. }
  159. mi := MsgInfo{
  160. From: np.fromType,
  161. Time: time.Now().Unix(),
  162. Type: msgType,
  163. Content: strings.Trim(msg, string(msgSeparator)),
  164. }
  165. byteSli, _ := json.Marshal(mi)
  166. // msg = string(byteSli)+ string(msgSeparator)
  167. byteSli = append(byteSli, msgSeparator)
  168. if n, err := sendPipe.Write(byteSli); err == nil {
  169. if err := sendPipe.Flush(); err != nil {
  170. return n, errors.New(fmt.Sprintf("WriteToPipe,Flush buffer occours error: %s", err.Error()))
  171. }
  172. return n, nil
  173. } else {
  174. return 0, errors.New(fmt.Sprintf("WriteToPipe,Write msg occours error: %s", err.Error()))
  175. }
  176. }
  177. }
  178. return 0, errors.New("WriteToPipe,named-pipe was closed")
  179. }
  180. /*
  181. WriteRaw
  182. @Description: 原始消息 写入管道
  183. @param msg: 管道消息内容
  184. @return int: 写入字节数
  185. @return error: 错误信息
  186. */
  187. func (np *namedPipe) WriteRaw(msg []byte) (int, error) {
  188. // 是否已经关闭pipe资源
  189. if atomic.LoadUint32(&np.quitFlag) == 0 {
  190. np.wg.Add(1)
  191. defer np.wg.Done()
  192. if atomic.LoadUint32(&np.quitFlag) == 0 {
  193. sendPipe := np.sendPipe
  194. if sendPipe == nil {
  195. return 0, errors.New("WriteToPipe,named-pipe was closed")
  196. }
  197. msg = append(msg, msgSeparator)
  198. if n, err := sendPipe.Write(msg); err == nil {
  199. if err := sendPipe.Flush(); err != nil {
  200. return n, errors.New(fmt.Sprintf("WriteToPipe,Flush buffer occours error: %s", err.Error()))
  201. }
  202. return n, nil
  203. } else {
  204. return 0, errors.New(fmt.Sprintf("WriteToPipe,Write msg occours error: %s", err.Error()))
  205. }
  206. }
  207. }
  208. return 0, errors.New("WriteToPipe,named-pipe was closed")
  209. }