namedpipe_ctl.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package namedpipe
  2. import (
  3. "fmt"
  4. "github.com/coroot/coroot-node-agent/containers"
  5. "github.com/coroot/coroot-node-agent/utils"
  6. "github.com/coroot/coroot-node-agent/utils/enums"
  7. log "github.com/sirupsen/logrus"
  8. "io"
  9. "path/filepath"
  10. "runtime/debug"
  11. "sync"
  12. "time"
  13. )
  14. type NamedPipeCtl struct {
  15. mtx *sync.Mutex
  16. stopChan chan struct{}
  17. isClosed bool
  18. namedPipe NamedPipe
  19. }
  20. func NewNamedPipeCtl(stopChan chan struct{}) (*NamedPipeCtl, error) {
  21. ctl := &NamedPipeCtl{
  22. mtx: new(sync.Mutex),
  23. stopChan: stopChan,
  24. }
  25. err := ctl.initNamedPipe(enums.TimeoutNamedPipe3S)
  26. if err != nil {
  27. return nil, err
  28. }
  29. return ctl, nil
  30. }
  31. func (ctl *NamedPipeCtl) initNamedPipe(withTimeout int) error {
  32. if stop, reason := ctl.checkIfToStop(); stop {
  33. log.Warnf("stop try to listen namedpipe. reason: %s", reason)
  34. return fmt.Errorf("stop to listen namedpipe [%s]", reason)
  35. }
  36. if !ctl.mtx.TryLock() {
  37. return fmt.Errorf("listen namedpipe is already in progress")
  38. }
  39. ctl.namedPipe = nil
  40. errChan := make(chan error)
  41. var pipeConnectTimeouted = false
  42. var t *time.Timer
  43. t = time.NewTimer(time.Duration(withTimeout) * time.Second)
  44. if withTimeout <= 0 {
  45. t.C = nil
  46. }
  47. defer utils.StopTimerWithDrainChan(t)
  48. go func() {
  49. defer ctl.recoverAndLogError("ListenNpipe")
  50. defer ctl.mtx.Unlock()
  51. log.Infof("namedpipectl to listen namedpipe")
  52. //RootPath => /opt/cloudwise/omniagent/agents/euspace/current/
  53. //namedpipe : euspace2cw-oneagent & cw-oneagent2euspace => /opt/cloudwise/omniagent/runtime/
  54. p, err := ListenNpipe(enums.AgentNameEuspace, enums.AgentNameOneAgent, false, false, filepath.Join(filepath.Dir(filepath.Dir(filepath.Dir(utils.GetRootPath()))), "runtime"))
  55. if err != nil || p == nil {
  56. log.Errorf("namedpipectl to listen namedpipe,occurs error:%s ", err.Error())
  57. errChan <- fmt.Errorf("namedpipectl to listen namedpipe error: %v", err.Error())
  58. } else {
  59. if pipeConnectTimeouted {
  60. log.Infof("free up namedpipe. reason: namedpipectl do pipe connect timeouted")
  61. if p != nil {
  62. p.FreeUp4EOF()
  63. }
  64. return
  65. }
  66. if stop, reason := ctl.checkIfToStop(); stop {
  67. log.Infof("to free up namedpipe. reason: %s", reason)
  68. if p != nil {
  69. p.FreeUp4EOF()
  70. }
  71. errChan <- fmt.Errorf("%s,freed namedpipe", reason)
  72. return
  73. }
  74. log.Infof("namedpipectl listen namedpipe success")
  75. ctl.namedPipe = p
  76. errChan <- nil
  77. }
  78. }()
  79. select {
  80. case err := <-errChan:
  81. return err
  82. case <-t.C:
  83. log.Errorf("namedpipectl to listen namedpipe timeout (%v s)", withTimeout)
  84. //ctl.Close()
  85. pipeConnectTimeouted = true
  86. return fmt.Errorf("namedpipectl to listen namedpipe timeout")
  87. case <-ctl.stopChan:
  88. log.Infof("euspace stopping,close namedpipectl")
  89. ctl.Close()
  90. return fmt.Errorf("euspace was stoped")
  91. }
  92. }
  93. func (ctl *NamedPipeCtl) Close() {
  94. ctl.mtx.Lock()
  95. defer ctl.mtx.Unlock()
  96. if ctl.isClosed {
  97. log.Warn("namedPipeCtl Close, has been closed")
  98. return
  99. }
  100. ctl.isClosed = true
  101. if ctl.namedPipe != nil {
  102. ctl.namedPipe.FreeUp4EOF()
  103. ctl.namedPipe = nil
  104. log.Info("namedPipeCtl Close, close namedPipe success")
  105. } else {
  106. log.Warn("namedPipeCtl Close, namedPipe is nil, do nothing")
  107. }
  108. }
  109. func (ctl *NamedPipeCtl) recoverAndLogError(target string) {
  110. if i := recover(); i != nil {
  111. //获取错误消息
  112. eMsg := ""
  113. switch v := i.(type) {
  114. case string:
  115. eMsg = v
  116. case error:
  117. eMsg = v.Error()
  118. default:
  119. eMsg = fmt.Sprintf("%+v", v)
  120. }
  121. sMsg := string(debug.Stack())
  122. log.Errorf("goroutine[ %s ] panic: %s ,stack: %s", target, eMsg, sMsg)
  123. }
  124. }
  125. func (ctl *NamedPipeCtl) checkIfToStop() (bool, string) {
  126. if ctl.stopChan != nil {
  127. select {
  128. case <-ctl.stopChan:
  129. return true, "euspace stopping"
  130. default:
  131. }
  132. }
  133. if ctl.isClosed {
  134. return true, "namedPipeCtl closed"
  135. }
  136. return false, ""
  137. }
  138. func (ctl *NamedPipeCtl) AcceptAndDisposeMsg(reg *containers.Registry) {
  139. log.Info("AcceptAndDisposeMsg, start")
  140. go func() {
  141. defer func() {
  142. ctl.recoverAndLogError("AcceptAndDisposeMsg")
  143. log.Info("AcceptAndDisposeMsg, stop")
  144. }()
  145. for {
  146. if stop, reason := ctl.checkIfToStop(); stop {
  147. log.Warnf("AcceptAndDisposeMsg,stop accept msg. reason: %s", reason)
  148. return
  149. }
  150. if ctl.namedPipe == nil {
  151. if err := ctl.initNamedPipe(0); err != nil {
  152. log.Errorf("AcceptAndDisposeMsg,namedPipe is nil re-initNamedPipe failed [retry after 1 S], error: %s", err.Error())
  153. time.Sleep(1 * time.Second)
  154. continue
  155. }
  156. }
  157. if msg, err := ctl.namedPipe.Accept(); err != nil {
  158. if err == io.EOF {
  159. log.Warnf("AcceptAndDisposeMsg, get io.EOF to free up namedPipe")
  160. ctl.mtx.Lock()
  161. ctl.namedPipe.FreeUp4EOF()
  162. ctl.namedPipe = nil
  163. ctl.mtx.Unlock()
  164. } else {
  165. log.Errorf("AcceptAndDisposeMsg occurs error: %s", err.Error())
  166. }
  167. } else {
  168. if stop, reason := ctl.checkIfToStop(); stop {
  169. log.Warnf("AcceptAndDisposeMsg,stop dispose msg.reason: %s, msg:%#v,", reason, msg)
  170. return
  171. }
  172. switch msg.Type {
  173. case MsgTypeFusing:
  174. reg.DoFusing()
  175. case MsgTypeResume:
  176. if err := reg.DoResume(); err != nil {
  177. log.Fatalf("AcceptAndDisposeMsg DoResume occurs error: %s", err.Error())
  178. }
  179. }
  180. }
  181. }
  182. }()
  183. }