agent.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package prom
  2. import (
  3. "time"
  4. "github.com/coroot/coroot-node-agent/common"
  5. "github.com/coroot/coroot-node-agent/flags"
  6. "github.com/go-kit/log/level"
  7. "github.com/prometheus/client_golang/prometheus"
  8. promConfig "github.com/prometheus/common/config"
  9. "github.com/prometheus/common/model"
  10. "github.com/prometheus/prometheus/config"
  11. "github.com/prometheus/prometheus/discovery/targetgroup"
  12. "github.com/prometheus/prometheus/scrape"
  13. "github.com/prometheus/prometheus/storage"
  14. "github.com/prometheus/prometheus/storage/remote"
  15. "github.com/prometheus/prometheus/tsdb"
  16. "github.com/prometheus/prometheus/tsdb/agent"
  17. "k8s.io/klog/v2"
  18. )
  19. const (
  20. RemoteFlushDeadline = time.Minute
  21. jobName = "coroot-node-agent"
  22. RemoteWriteTimeout = 30 * time.Second
  23. )
  24. func StartAgent(machineId string) error {
  25. logger := level.NewFilter(Logger{}, level.AllowInfo())
  26. if *flags.MetricsEndpoint == nil {
  27. return nil
  28. }
  29. klog.Infoln("metrics remote write endpoint:", (*flags.MetricsEndpoint).String())
  30. cfg := config.DefaultConfig
  31. cfg.GlobalConfig.ScrapeInterval = model.Duration(*flags.ScrapeInterval)
  32. cfg.GlobalConfig.ScrapeTimeout = model.Duration(*flags.ScrapeInterval)
  33. cfg.RemoteWriteConfigs = append(cfg.RemoteWriteConfigs,
  34. &config.RemoteWriteConfig{
  35. URL: &promConfig.URL{URL: *flags.MetricsEndpoint},
  36. Headers: common.AuthHeaders(),
  37. RemoteTimeout: model.Duration(RemoteWriteTimeout),
  38. QueueConfig: config.DefaultQueueConfig,
  39. HTTPClientConfig: promConfig.HTTPClientConfig{
  40. TLSConfig: promConfig.TLSConfig{InsecureSkipVerify: *flags.InsecureSkipVerify},
  41. },
  42. },
  43. )
  44. cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, &config.ScrapeConfig{
  45. JobName: jobName,
  46. HonorLabels: true,
  47. ScrapeClassicHistograms: true,
  48. MetricsPath: "/metrics",
  49. Scheme: "http",
  50. EnableCompression: false,
  51. })
  52. opts := agent.DefaultOptions()
  53. localStorage := &readyStorage{stats: tsdb.NewDBStats()}
  54. scraper := &readyScrapeManager{}
  55. remoteStorage := remote.NewStorage(logger, prometheus.DefaultRegisterer, localStorage.StartTime, *flags.WalDir, RemoteFlushDeadline, scraper)
  56. fanoutStorage := storage.NewFanout(logger, localStorage, remoteStorage)
  57. if err := remoteStorage.ApplyConfig(&cfg); err != nil {
  58. return err
  59. }
  60. scrapeManager, err := scrape.NewManager(nil, logger, fanoutStorage, prometheus.DefaultRegisterer)
  61. if err != nil {
  62. return err
  63. }
  64. if err = scrapeManager.ApplyConfig(&cfg); err != nil {
  65. return err
  66. }
  67. scraper.Set(scrapeManager)
  68. db, err := agent.Open(logger, prometheus.DefaultRegisterer, remoteStorage, *flags.WalDir, opts)
  69. if err != nil {
  70. return err
  71. }
  72. localStorage.Set(db, 0)
  73. db.SetWriteNotified(remoteStorage)
  74. tch := make(chan map[string][]*targetgroup.Group, 1)
  75. tch <- map[string][]*targetgroup.Group{
  76. jobName: {
  77. &targetgroup.Group{
  78. Targets: []model.LabelSet{
  79. {
  80. model.InstanceLabel: model.LabelValue(machineId),
  81. model.AddressLabel: model.LabelValue(*flags.ListenAddress),
  82. },
  83. },
  84. Labels: model.LabelSet{model.JobLabel: jobName},
  85. },
  86. },
  87. }
  88. go func() {
  89. if err = scrapeManager.Run(tch); err != nil {
  90. klog.Errorln(err)
  91. }
  92. }()
  93. return nil
  94. }