package namedpipe import ( "fmt" "github.com/coroot/coroot-node-agent/containers" "github.com/coroot/coroot-node-agent/utils" "github.com/coroot/coroot-node-agent/utils/enums" log "github.com/sirupsen/logrus" "io" "path/filepath" "runtime/debug" "sync" "time" ) type NamedPipeCtl struct { mtx *sync.Mutex stopChan chan struct{} isClosed bool namedPipe NamedPipe } func NewNamedPipeCtl(stopChan chan struct{}) (*NamedPipeCtl, error) { ctl := &NamedPipeCtl{ mtx: new(sync.Mutex), stopChan: stopChan, } err := ctl.initNamedPipe(enums.TimeoutNamedPipe3S) if err != nil { return nil, err } return ctl, nil } func (ctl *NamedPipeCtl) initNamedPipe(withTimeout int) error { if stop, reason := ctl.checkIfToStop(); stop { log.Warnf("stop try to listen namedpipe. reason: %s", reason) return fmt.Errorf("stop to listen namedpipe [%s]", reason) } if !ctl.mtx.TryLock() { return fmt.Errorf("listen namedpipe is already in progress") } ctl.namedPipe = nil errChan := make(chan error) var pipeConnectTimeouted = false var t *time.Timer t = time.NewTimer(time.Duration(withTimeout) * time.Second) if withTimeout <= 0 { t.C = nil } defer utils.StopTimerWithDrainChan(t) go func() { defer ctl.recoverAndLogError("ListenNpipe") defer ctl.mtx.Unlock() log.Infof("namedpipectl to listen namedpipe") //RootPath => /opt/cloudwise/omniagent/agents/euspace/current/ //namedpipe : euspace2cw-oneagent & cw-oneagent2euspace => /opt/cloudwise/omniagent/runtime/ p, err := ListenNpipe(enums.AgentNameEuspace, enums.AgentNameOneAgent, false, false, filepath.Join(filepath.Dir(filepath.Dir(filepath.Dir(utils.GetRootPath()))), "runtime")) if err != nil || p == nil { log.Errorf("namedpipectl to listen namedpipe,occurs error:%s ", err.Error()) errChan <- fmt.Errorf("namedpipectl to listen namedpipe error: %v", err.Error()) } else { if pipeConnectTimeouted { log.Infof("free up namedpipe. reason: namedpipectl do pipe connect timeouted") if p != nil { p.FreeUp4EOF() } return } if stop, reason := ctl.checkIfToStop(); stop { log.Infof("to free up namedpipe. reason: %s", reason) if p != nil { p.FreeUp4EOF() } errChan <- fmt.Errorf("%s,freed namedpipe", reason) return } log.Infof("namedpipectl listen namedpipe success") ctl.namedPipe = p errChan <- nil } }() select { case err := <-errChan: return err case <-t.C: log.Errorf("namedpipectl to listen namedpipe timeout (%v s)", withTimeout) //ctl.Close() pipeConnectTimeouted = true return fmt.Errorf("namedpipectl to listen namedpipe timeout") case <-ctl.stopChan: log.Infof("euspace stopping,close namedpipectl") ctl.Close() return fmt.Errorf("euspace was stoped") } } func (ctl *NamedPipeCtl) Close() { ctl.mtx.Lock() defer ctl.mtx.Unlock() if ctl.isClosed { log.Warn("namedPipeCtl Close, has been closed") return } ctl.isClosed = true if ctl.namedPipe != nil { ctl.namedPipe.FreeUp4EOF() ctl.namedPipe = nil log.Info("namedPipeCtl Close, close namedPipe success") } else { log.Warn("namedPipeCtl Close, namedPipe is nil, do nothing") } } func (ctl *NamedPipeCtl) recoverAndLogError(target string) { if i := recover(); i != nil { //获取错误消息 eMsg := "" switch v := i.(type) { case string: eMsg = v case error: eMsg = v.Error() default: eMsg = fmt.Sprintf("%+v", v) } sMsg := string(debug.Stack()) log.Errorf("goroutine[ %s ] panic: %s ,stack: %s", target, eMsg, sMsg) } } func (ctl *NamedPipeCtl) checkIfToStop() (bool, string) { if ctl.stopChan != nil { select { case <-ctl.stopChan: return true, "euspace stopping" default: } } if ctl.isClosed { return true, "namedPipeCtl closed" } return false, "" } func (ctl *NamedPipeCtl) AcceptAndDisposeMsg(reg *containers.Registry) { log.Info("AcceptAndDisposeMsg, start") go func() { defer func() { ctl.recoverAndLogError("AcceptAndDisposeMsg") log.Info("AcceptAndDisposeMsg, stop") }() for { if stop, reason := ctl.checkIfToStop(); stop { log.Warnf("AcceptAndDisposeMsg,stop accept msg. reason: %s", reason) return } if ctl.namedPipe == nil { if err := ctl.initNamedPipe(0); err != nil { log.Errorf("AcceptAndDisposeMsg,namedPipe is nil re-initNamedPipe failed [retry after 1 S], error: %s", err.Error()) time.Sleep(1 * time.Second) continue } } if msg, err := ctl.namedPipe.Accept(); err != nil { if err == io.EOF { log.Warnf("AcceptAndDisposeMsg, get io.EOF to free up namedPipe") ctl.mtx.Lock() ctl.namedPipe.FreeUp4EOF() ctl.namedPipe = nil ctl.mtx.Unlock() } else { log.Errorf("AcceptAndDisposeMsg occurs error: %s", err.Error()) } } else { if stop, reason := ctl.checkIfToStop(); stop { log.Warnf("AcceptAndDisposeMsg,stop dispose msg.reason: %s, msg:%#v,", reason, msg) return } switch msg.Type { case MsgTypeFusing: if reg.IsFuseException { log.Errorf("AcceptAndDisposeMsg fuse exception, will not to call DoFusing") } else { if err := reg.DoFusing(); err != nil { //todo save the err to the status-page instruct log.Errorf("AcceptAndDisposeMsg DoFusing occurs error: %s", err.Error()) } else { log.Infof("AcceptAndDisposeMsg DoFusing finish") } } case MsgTypeResume: if reg.IsFuseException { log.Errorf("AcceptAndDisposeMsg fuse exception, will not to call DoResume") } else { if err := reg.DoResume(); err != nil { //todo save the err to the status-page instruct log.Errorf("AcceptAndDisposeMsg DoResume occurs error: %s", err.Error()) } else { log.Infof("AcceptAndDisposeMsg DoResume finish") } } } } } }() }