name_pipe_unix.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. //go:build linux || darwin
  2. // +build linux darwin
  3. package namedpipe
  4. import (
  5. "bufio"
  6. "fmt"
  7. log "github.com/sirupsen/logrus"
  8. "os"
  9. "sync/atomic"
  10. "syscall"
  11. )
  12. const ONEAGENT = "cw-oneagent"
  13. const NETFLOW = "cw-netflow"
  14. const SERVERAGENT = "cw-serveragent"
  15. const MASTER = true
  16. const AGENT = false
  17. type NamedPipe4U struct {
  18. *namedPipe
  19. sendPipeFileHandler *os.File
  20. recvPipeFileHandler *os.File
  21. sendPipePath string
  22. recvPipePath string
  23. }
  24. /*
  25. ListenNpipe
  26. @Description: 创建命名管道文件,返回命名管道对象
  27. @param agent_type: from
  28. @param to_type: to
  29. @param isMaster: 是否为daemon创建的
  30. @param single: 是否为单文件模式
  31. @param rootPath: 命名管道文件夹路径
  32. @return *NamedPipe4U: 命名管道对象
  33. @return error: 错误信息
  34. */
  35. func ListenNpipe(agent_type, to_type string, isMaster, single bool, rootPath string) (*NamedPipe4U, error) {
  36. np, err := newNamePipe(agent_type, to_type, isMaster, single, rootPath)
  37. if err != nil {
  38. return nil, err
  39. }
  40. np4u := &NamedPipe4U{namedPipe: np}
  41. if err := np4u.initPipe(); err != nil {
  42. return nil, err
  43. }
  44. return np4u, nil
  45. }
  46. func (np *NamedPipe4U) getPipePath() (daemon2subPipe, sub2DaemonPipe string, err error) {
  47. basePath := np.rootPath + "/"
  48. //确认目录存在
  49. if _, err := os.Stat(basePath); os.IsNotExist(err) {
  50. if err := os.MkdirAll(basePath, 0755); err != nil {
  51. panic(fmt.Sprintf("mkdir for named-pipe failed . path:%v ,error:%v", basePath, err.Error()))
  52. }
  53. }
  54. if np.single {
  55. singlePipe := basePath + np.fromType
  56. return singlePipe, singlePipe, nil
  57. }
  58. daemon2subPipe = basePath + fmt.Sprintf("%s2%s", np.fromType, np.toType)
  59. sub2DaemonPipe = basePath + fmt.Sprintf("%s2%s", np.toType, np.fromType)
  60. return daemon2subPipe, sub2DaemonPipe, err
  61. }
  62. func (np *NamedPipe4U) initPipe() error {
  63. var pipeFiles []string
  64. daemon2subPipe, sub2DaemonPipe, err := np.getPipePath()
  65. if err != nil {
  66. return err
  67. }
  68. if np.single {
  69. pipeFiles = []string{daemon2subPipe}
  70. } else {
  71. pipeFiles = []string{daemon2subPipe, sub2DaemonPipe}
  72. }
  73. for _, pf := range pipeFiles {
  74. if _, err := os.Lstat(pf); os.IsNotExist(err) == true {
  75. err = syscall.Mkfifo(pf, 0666)
  76. if err != nil {
  77. return err
  78. }
  79. } else if err != nil {
  80. return err
  81. }
  82. }
  83. //根据角色指定pipe的路径(角色不同路径不同)
  84. if np.isDaemon {
  85. //daemon进程Open顺序 和 net进程Open顺序 相反
  86. np.sendPipePath = daemon2subPipe
  87. np.recvPipePath = sub2DaemonPipe
  88. //OpenFile过程可能会阻塞
  89. if sf, err := os.OpenFile(np.sendPipePath, os.O_WRONLY|os.O_CREATE, os.ModeNamedPipe|0666); err == nil {
  90. np.sendPipeFileHandler = sf
  91. np.sendPipe = bufio.NewWriter(sf)
  92. } else {
  93. log.WithField("module", "namedPipe_Unix").Errorf("open send pipe(%s) error: %v", np.sendPipePath, err)
  94. return err
  95. }
  96. if rf, err := os.OpenFile(np.recvPipePath, os.O_RDONLY|os.O_CREATE, os.ModeNamedPipe|0666); err == nil {
  97. np.recvPipeFileHandler = rf
  98. np.recvPipe = bufio.NewReader(rf)
  99. } else {
  100. log.WithField("module", "namedPipe_Unix").Errorf("open recv pipe(%s) error: %v", np.recvPipePath, err)
  101. sf := np.sendPipeFileHandler
  102. np.sendPipeFileHandler = nil
  103. np.sendPipe = nil
  104. if sf != nil {
  105. if errClose := sf.Close(); errClose != nil {
  106. log.WithField("module", "namedPipe_Unix").Errorf("close send pipe(%s) error:%v", np.sendPipePath, errClose)
  107. }
  108. }
  109. return err
  110. }
  111. } else {
  112. //net进程Open顺序和daemon进程Open顺序相反
  113. np.sendPipePath = daemon2subPipe
  114. np.recvPipePath = sub2DaemonPipe
  115. //OpenFile过程可能会阻塞
  116. if rf, err := os.OpenFile(np.recvPipePath, os.O_RDONLY|os.O_CREATE, os.ModeNamedPipe|0666); err == nil {
  117. np.recvPipeFileHandler = rf
  118. np.recvPipe = bufio.NewReader(rf)
  119. } else {
  120. log.WithField("module", "namedPipe_Unix").Errorf("open recv pipe(%s) error: %v", np.recvPipePath, err)
  121. return err
  122. }
  123. if sf, err := os.OpenFile(np.sendPipePath, os.O_WRONLY|os.O_CREATE, os.ModeNamedPipe|0666); err == nil {
  124. np.sendPipeFileHandler = sf
  125. np.sendPipe = bufio.NewWriter(sf)
  126. } else {
  127. log.WithField("module", "namedPipe_Unix").Errorf("open send pipe(%s) error: %v", np.sendPipePath, err)
  128. rf := np.recvPipeFileHandler
  129. np.recvPipeFileHandler = nil
  130. np.recvPipe = nil
  131. if rf != nil {
  132. if errClose := rf.Close(); errClose != nil {
  133. log.WithField("module", "namedPipe_Unix").Errorf("close recv pipe(%s) error:%v", np.recvPipePath, errClose)
  134. }
  135. }
  136. return err
  137. }
  138. }
  139. return nil
  140. }
  141. // FreeUp4EOF only called when io.EOF occurs
  142. func (np *NamedPipe4U) FreeUp4EOF() {
  143. if !atomic.CompareAndSwapUint32(&np.quitFlag, 0, 1) {
  144. log.WithField("module", "namedPipe_Unix").Warnf("named pipe(%s)(%s) already free", np.sendPipePath, np.recvPipePath)
  145. return
  146. }
  147. log.WithField("module", "namedPipe_Unix").Infof("named pipe free(%s)(%s) begin", np.sendPipePath, np.recvPipePath)
  148. sendPipeFileHandler := np.sendPipeFileHandler
  149. np.sendPipeFileHandler = nil
  150. if sendPipeFileHandler != nil {
  151. err := sendPipeFileHandler.Close()
  152. if err != nil {
  153. log.WithField("module", "namedPipe_Unix").Errorf("close send pipe(%s) failed. error:%v", np.sendPipePath, err)
  154. }
  155. }
  156. recvPipeFileHandler := np.recvPipeFileHandler
  157. np.recvPipeFileHandler = nil
  158. if recvPipeFileHandler != nil {
  159. err := recvPipeFileHandler.Close()
  160. if err != nil {
  161. log.WithField("module", "namedPipe_Unix").Errorf("close recv pipe(%s) failed. error:%v", np.recvPipePath, err)
  162. }
  163. }
  164. np.sendPipe = nil
  165. np.recvPipe = nil
  166. log.WithField("module", "namedPipe_Unix").Infof("named pipe(%s)(%s) free end", np.sendPipePath, np.recvPipePath)
  167. }