| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- package namedpipe
- import (
- "fmt"
- "github.com/coroot/coroot-node-agent/containers"
- "github.com/coroot/coroot-node-agent/utils"
- "github.com/coroot/coroot-node-agent/utils/enums"
- log "github.com/sirupsen/logrus"
- "io"
- "path/filepath"
- "runtime/debug"
- "sync"
- "time"
- )
- type NamedPipeCtl struct {
- mtx *sync.Mutex
- stopChan chan struct{}
- isClosed bool
- namedPipe NamedPipe
- }
- func NewNamedPipeCtl(stopChan chan struct{}) (*NamedPipeCtl, error) {
- ctl := &NamedPipeCtl{
- mtx: new(sync.Mutex),
- stopChan: stopChan,
- }
- err := ctl.initNamedPipe(enums.TimeoutNamedPipe3S)
- if err != nil {
- return nil, err
- }
- return ctl, nil
- }
- func (ctl *NamedPipeCtl) initNamedPipe(withTimeout int) error {
- if stop, reason := ctl.checkIfToStop(); stop {
- log.Warnf("stop try to listen namedpipe. reason: %s", reason)
- return fmt.Errorf("stop to listen namedpipe [%s]", reason)
- }
- if !ctl.mtx.TryLock() {
- return fmt.Errorf("listen namedpipe is already in progress")
- }
- ctl.namedPipe = nil
- errChan := make(chan error)
- var pipeConnectTimeouted = false
- var t *time.Timer
- t = time.NewTimer(time.Duration(withTimeout) * time.Second)
- if withTimeout <= 0 {
- t.C = nil
- }
- defer utils.StopTimerWithDrainChan(t)
- go func() {
- defer ctl.recoverAndLogError("ListenNpipe")
- defer ctl.mtx.Unlock()
- log.Infof("namedpipectl to listen namedpipe")
- //RootPath => /opt/cloudwise/omniagent/agents/euspace/current/
- //namedpipe : euspace2cw-oneagent & cw-oneagent2euspace => /opt/cloudwise/omniagent/runtime/
- p, err := ListenNpipe(enums.AgentNameEuspace, enums.AgentNameOneAgent, false, false, filepath.Join(filepath.Dir(filepath.Dir(filepath.Dir(utils.GetRootPath()))), "runtime"))
- if err != nil || p == nil {
- log.Errorf("namedpipectl to listen namedpipe,occurs error:%s ", err.Error())
- errChan <- fmt.Errorf("namedpipectl to listen namedpipe error: %v", err.Error())
- } else {
- if pipeConnectTimeouted {
- log.Infof("free up namedpipe. reason: namedpipectl do pipe connect timeouted")
- if p != nil {
- p.FreeUp4EOF()
- }
- return
- }
- if stop, reason := ctl.checkIfToStop(); stop {
- log.Infof("to free up namedpipe. reason: %s", reason)
- if p != nil {
- p.FreeUp4EOF()
- }
- errChan <- fmt.Errorf("%s,freed namedpipe", reason)
- return
- }
- log.Infof("namedpipectl listen namedpipe success")
- ctl.namedPipe = p
- errChan <- nil
- }
- }()
- select {
- case err := <-errChan:
- return err
- case <-t.C:
- log.Errorf("namedpipectl to listen namedpipe timeout (%v s)", withTimeout)
- //ctl.Close()
- pipeConnectTimeouted = true
- return fmt.Errorf("namedpipectl to listen namedpipe timeout")
- case <-ctl.stopChan:
- log.Infof("euspace stopping,close namedpipectl")
- ctl.Close()
- return fmt.Errorf("euspace was stoped")
- }
- }
- func (ctl *NamedPipeCtl) Close() {
- ctl.mtx.Lock()
- defer ctl.mtx.Unlock()
- if ctl.isClosed {
- log.Warn("namedPipeCtl Close, has been closed")
- return
- }
- ctl.isClosed = true
- if ctl.namedPipe != nil {
- ctl.namedPipe.FreeUp4EOF()
- ctl.namedPipe = nil
- log.Info("namedPipeCtl Close, close namedPipe success")
- } else {
- log.Warn("namedPipeCtl Close, namedPipe is nil, do nothing")
- }
- }
- func (ctl *NamedPipeCtl) recoverAndLogError(target string) {
- if i := recover(); i != nil {
- //获取错误消息
- eMsg := ""
- switch v := i.(type) {
- case string:
- eMsg = v
- case error:
- eMsg = v.Error()
- default:
- eMsg = fmt.Sprintf("%+v", v)
- }
- sMsg := string(debug.Stack())
- log.Errorf("goroutine[ %s ] panic: %s ,stack: %s", target, eMsg, sMsg)
- }
- }
- func (ctl *NamedPipeCtl) checkIfToStop() (bool, string) {
- if ctl.stopChan != nil {
- select {
- case <-ctl.stopChan:
- return true, "euspace stopping"
- default:
- }
- }
- if ctl.isClosed {
- return true, "namedPipeCtl closed"
- }
- return false, ""
- }
- func (ctl *NamedPipeCtl) AcceptAndDisposeMsg(reg *containers.Registry) {
- log.Info("AcceptAndDisposeMsg, start")
- go func() {
- defer func() {
- ctl.recoverAndLogError("AcceptAndDisposeMsg")
- log.Info("AcceptAndDisposeMsg, stop")
- }()
- for {
- if stop, reason := ctl.checkIfToStop(); stop {
- log.Warnf("AcceptAndDisposeMsg,stop accept msg. reason: %s", reason)
- return
- }
- if ctl.namedPipe == nil {
- if err := ctl.initNamedPipe(0); err != nil {
- log.Errorf("AcceptAndDisposeMsg,namedPipe is nil re-initNamedPipe failed [retry after 1 S], error: %s", err.Error())
- time.Sleep(1 * time.Second)
- continue
- }
- }
- if msg, err := ctl.namedPipe.Accept(); err != nil {
- if err == io.EOF {
- log.Warnf("AcceptAndDisposeMsg, get io.EOF to free up namedPipe")
- ctl.mtx.Lock()
- ctl.namedPipe.FreeUp4EOF()
- ctl.namedPipe = nil
- ctl.mtx.Unlock()
- } else {
- log.Errorf("AcceptAndDisposeMsg occurs error: %s", err.Error())
- }
- } else {
- if stop, reason := ctl.checkIfToStop(); stop {
- log.Warnf("AcceptAndDisposeMsg,stop dispose msg.reason: %s, msg:%#v,", reason, msg)
- return
- }
- switch msg.Type {
- case MsgTypeFusing:
- reg.DoFusing()
- case MsgTypeResume:
- if err := reg.DoResume(); err != nil {
- log.Fatalf("AcceptAndDisposeMsg DoResume occurs error: %s", err.Error())
- }
- }
- }
- }
- }()
- }
|