config.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package config
  2. import (
  3. "fmt"
  4. "github.com/coroot/coroot-node-agent/flags"
  5. log "github.com/sirupsen/logrus"
  6. "github.com/spf13/viper"
  7. "strings"
  8. "sync"
  9. )
  10. type Config struct {
  11. m *sync.RWMutex
  12. subscribes map[scbType]chan struct{}
  13. //subscribes map[scbType]chan<- struct{}
  14. euspaceCfg *euspaceCfg
  15. omniCommCfg *omniCommonCfg
  16. watchingConfig bool
  17. }
  18. type scbType string
  19. const (
  20. scbTypeUndefined scbType = ""
  21. ScbTypeLogLevelChg scbType = "logLevelChange"
  22. ScbTypeNetDataIntervalChg scbType = "netDataIntervalChange"
  23. ScbTypeConfigServerChg scbType = "configServerChg"
  24. ScbTypeDataServerChg scbType = "dataServerChg"
  25. ScbTypeTracesEndpointChg scbType = "tracesEndpointChg"
  26. )
  27. var Cfg *Config
  28. func init() {
  29. //fmt.Println("\n")
  30. //fmt.Printf("------- *flags.LogLevel :%s \n", *flags.LogLevel)
  31. //fmt.Printf("------- *flags.DataServer :%s \n", *flags.DataServer)
  32. //fmt.Printf("------- *flags.ConfigServer :%s \n", *flags.ConfigServer)
  33. //fmt.Println("\n")
  34. var err error
  35. Cfg, err = New()
  36. if err != nil {
  37. log.Fatalf(err.Error())
  38. }
  39. //fmt.Println("\n")
  40. //fmt.Printf("cfg --- logLevel :%s \n", Cfg.GetString("logLevel"))
  41. //fmt.Printf("cfg --- netDataInterval :%d \n", Cfg.GetInt("netDataInterval"))
  42. //fmt.Printf("cfg --- configServer :%s \n", Cfg.GetString("configServer"))
  43. //fmt.Printf("cfg --- dataServer :%s \n", Cfg.GetString("dataServer"))
  44. }
  45. func New() (*Config, error) {
  46. /*
  47. 1. Viper uses the following precedence order: explicit call to `Set` > flag > env > Config > key/value store > default
  48. 2. 按照现有业务实现,设计为:命令行参数(*flags.xxx)优先级高于配置文件中的项 。
  49. 3. 会将euspace的配置 和 omniagent-common.ini的配置进行合并,然后同步到配置文件。(命令行参数会覆盖上下文的配置,但不会被同步到配置文件)
  50. */
  51. ec, err := initEuspaceCfg()
  52. if err != nil {
  53. return nil, fmt.Errorf("init config,load `euspace` config failed : %s", err.Error())
  54. }
  55. omc, err := initOmniCommonCfg()
  56. if err != nil {
  57. return nil, fmt.Errorf("init config,load `omniagent` config failed : %s", err.Error())
  58. }
  59. cfg := &Config{
  60. m: new(sync.RWMutex),
  61. subscribes: make(map[scbType]chan struct{}),
  62. //subscribes: make(map[scbType]chan<- struct{}),
  63. euspaceCfg: ec,
  64. omniCommCfg: omc,
  65. }
  66. //merge and sync omniagent common.ini Config
  67. if err = cfg.mergeAndSyncOmniCommConf(); err != nil {
  68. return nil, fmt.Errorf("init config,%s", err.Error())
  69. }
  70. //merge the flags
  71. cfg.mergeFlags()
  72. //simple check
  73. theCfgSvr := cfg.euspaceCfg.runtimeV.GetString("configServer")
  74. theDataSvr := cfg.euspaceCfg.runtimeV.GetString("dataServer")
  75. if theCfgSvr == "" || theDataSvr == "" {
  76. return nil, fmt.Errorf("init config,the config-server[%s] or the data-server[%s] is not set", theCfgSvr, theDataSvr)
  77. }
  78. if !strings.HasPrefix(theCfgSvr, "http") {
  79. cfg.euspaceCfg.runtimeV.Set("configServer", fmt.Sprintf("http://%s", theCfgSvr))
  80. }
  81. if !strings.HasPrefix(theDataSvr, "http") {
  82. cfg.euspaceCfg.runtimeV.Set("dataServer", fmt.Sprintf("http://%s", theDataSvr))
  83. }
  84. // set flags.TracesEndpoint
  85. /*if *flags.TracesEndpoint == nil {
  86. dataServer, err := url.Parse(theDataSvr)
  87. if err == nil && dataServer != nil {
  88. *flags.TracesEndpoint = dataServer.JoinPath(*flags.ServerPrefix + "/api/v2/data/receive")
  89. }
  90. }*/
  91. //start watch
  92. cfg.watchConfig()
  93. return cfg, nil
  94. }
  95. func (c *Config) mergeAndSyncOmniCommConf() error {
  96. hasChange := false
  97. if c.omniCommCfg != nil {
  98. cfgSrv := c.euspaceCfg.runtimeV.GetString("configServer")
  99. omniCfgSvr := c.omniCommCfg.getConfigServer()
  100. if omniCfgSvr != "" && omniCfgSvr != cfgSrv {
  101. hasChange = true
  102. c.euspaceCfg.runtimeV.Set("configServer", omniCfgSvr)
  103. }
  104. dataSvr := c.euspaceCfg.runtimeV.GetString("dataServer")
  105. omniDataSvr := c.omniCommCfg.getDataServer()
  106. if omniDataSvr != "" && omniDataSvr != dataSvr {
  107. hasChange = true
  108. c.euspaceCfg.runtimeV.Set("dataServer", omniDataSvr)
  109. }
  110. // set cfgRecordLast
  111. if err := c.omniCommCfg.commonSecV.Unmarshal(c.omniCommCfg.cfgRecordLast); err != nil {
  112. return fmt.Errorf("`omniCommCfg.commonSecV` unmarshal `cfgRecordLast` failed : %s", err.Error())
  113. }
  114. //fmt.Printf("------ c.omniCommCfg.cfgRecordLast:%#v \n", c.omniCommCfg.cfgRecordLast)
  115. }
  116. // set cfgRecordLast
  117. if err := c.euspaceCfg.runtimeV.Unmarshal(c.euspaceCfg.cfgRecordLast); err != nil {
  118. return fmt.Errorf("`euspaceCfg.runtimeV` unmarshal `cfgRecordLast` failed : %s", err.Error())
  119. }
  120. //fmt.Printf("------ c.euspaceCfg.cfgRecordLast:%#v \n", c.euspaceCfg.cfgRecordLast)
  121. //sync Config
  122. if hasChange {
  123. if err := c.euspaceCfg.syncConfToFile(c.euspaceCfg.cfgRecordLast); err != nil {
  124. return err
  125. }
  126. }
  127. return nil
  128. }
  129. func (c *Config) mergeFlags() {
  130. if *flags.LogLevel != "" {
  131. c.euspaceCfg.runtimeV.Set("logLevel", *flags.LogLevel)
  132. }
  133. if *flags.ConfigServer != "" {
  134. c.euspaceCfg.runtimeV.Set("configServer", *flags.ConfigServer)
  135. }
  136. if *flags.DataServer != "" {
  137. c.euspaceCfg.runtimeV.Set("dataServer", *flags.DataServer)
  138. }
  139. }
  140. func (c *Config) watchConfig() {
  141. c.m.Lock()
  142. defer c.m.Unlock()
  143. if c.watchingConfig {
  144. return
  145. }
  146. c.watchingConfig = true
  147. //watch euspace Config
  148. c.euspaceCfg.watchConfig(c)
  149. //watch omniagent-common.ini
  150. if c.omniCommCfg != nil && c.omniCommCfg.omniCommonV != nil {
  151. c.omniCommCfg.watchConfig(c)
  152. }
  153. }
  154. func (c *Config) Get(key string) any {
  155. c.m.RLock()
  156. defer c.m.RUnlock()
  157. return c.euspaceCfg.runtimeV.Get(key)
  158. }
  159. func (c *Config) Sub(key string) *viper.Viper {
  160. c.m.RLock()
  161. defer c.m.RUnlock()
  162. return c.euspaceCfg.runtimeV.Sub(key)
  163. }
  164. func (c *Config) GetString(key string) string {
  165. c.m.RLock()
  166. defer c.m.RUnlock()
  167. return c.euspaceCfg.runtimeV.GetString(key)
  168. }
  169. func (c *Config) GetBool(key string) bool {
  170. c.m.RLock()
  171. defer c.m.RUnlock()
  172. return c.euspaceCfg.runtimeV.GetBool(key)
  173. }
  174. func (c *Config) GetInt(key string) int {
  175. c.m.RLock()
  176. defer c.m.RUnlock()
  177. return c.euspaceCfg.runtimeV.GetInt(key)
  178. }
  179. func (c *Config) GetInt32(key string) int32 {
  180. c.m.RLock()
  181. defer c.m.RUnlock()
  182. return c.euspaceCfg.runtimeV.GetInt32(key)
  183. }
  184. func (c *Config) GetInt64(key string) int64 {
  185. c.m.RLock()
  186. defer c.m.RUnlock()
  187. return c.euspaceCfg.runtimeV.GetInt64(key)
  188. }
  189. func (c *Config) GetFloat64(key string) float64 {
  190. c.m.RLock()
  191. defer c.m.RUnlock()
  192. return c.euspaceCfg.runtimeV.GetFloat64(key)
  193. }
  194. func (c *Config) GetConfigFileUsed() string {
  195. c.m.RLock()
  196. defer c.m.RUnlock()
  197. return c.euspaceCfg.runtimeV.ConfigFileUsed()
  198. }
  199. func (c *Config) SubscribeConfigChange(typ scbType) <-chan struct{} {
  200. c.m.Lock()
  201. defer c.m.Unlock()
  202. if typ == scbTypeUndefined {
  203. return nil
  204. }
  205. ch, ok := c.subscribes[typ]
  206. if !ok {
  207. ch = make(chan struct{})
  208. c.subscribes[typ] = ch
  209. }
  210. return ch
  211. }
  212. func (c *Config) noticeSubscribers(typ scbType) {
  213. c.m.RLock()
  214. defer c.m.RUnlock()
  215. if ch, ok := c.subscribes[typ]; ok {
  216. ch <- struct{}{}
  217. }
  218. }
  219. func (c *Config) Close() {
  220. c.m.Lock()
  221. defer c.m.Unlock()
  222. for typ, ch := range c.subscribes {
  223. close(ch)
  224. delete(c.subscribes, typ)
  225. }
  226. }