agent.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package prom
  2. import (
  3. "time"
  4. "github.com/coroot/coroot-node-agent/common"
  5. "github.com/coroot/coroot-node-agent/flags"
  6. "github.com/go-kit/log/level"
  7. "github.com/prometheus/client_golang/prometheus"
  8. promConfig "github.com/prometheus/common/config"
  9. "github.com/prometheus/common/model"
  10. "github.com/prometheus/prometheus/config"
  11. "github.com/prometheus/prometheus/discovery/targetgroup"
  12. "github.com/prometheus/prometheus/scrape"
  13. "github.com/prometheus/prometheus/storage"
  14. "github.com/prometheus/prometheus/storage/remote"
  15. "github.com/prometheus/prometheus/tsdb"
  16. "github.com/prometheus/prometheus/tsdb/agent"
  17. "k8s.io/klog/v2"
  18. )
  19. const (
  20. RemoteFlushDeadline = time.Minute
  21. jobName = "coroot-node-agent"
  22. RemoteWriteTimeout = 30 * time.Second
  23. )
  24. func StartAgent(machineId string) error {
  25. logger := level.NewFilter(Logger{}, level.AllowInfo())
  26. if *flags.MetricsEndpoint == nil {
  27. return nil
  28. }
  29. klog.Infoln("metrics remote write endpoint:", (*flags.MetricsEndpoint).String())
  30. cfg := config.DefaultConfig
  31. cfg.GlobalConfig.ScrapeInterval = model.Duration(*flags.ScrapeInterval)
  32. cfg.GlobalConfig.ScrapeTimeout = model.Duration(*flags.ScrapeInterval)
  33. cfg.RemoteWriteConfigs = append(cfg.RemoteWriteConfigs,
  34. &config.RemoteWriteConfig{
  35. URL: &promConfig.URL{URL: *flags.MetricsEndpoint},
  36. Headers: common.AuthHeaders(),
  37. RemoteTimeout: model.Duration(RemoteWriteTimeout),
  38. QueueConfig: config.DefaultQueueConfig,
  39. },
  40. )
  41. cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, &config.ScrapeConfig{
  42. JobName: jobName,
  43. HonorLabels: true,
  44. ScrapeClassicHistograms: true,
  45. MetricsPath: "/metrics",
  46. Scheme: "http",
  47. EnableCompression: false,
  48. })
  49. opts := agent.DefaultOptions()
  50. localStorage := &readyStorage{stats: tsdb.NewDBStats()}
  51. scraper := &readyScrapeManager{}
  52. remoteStorage := remote.NewStorage(logger, prometheus.DefaultRegisterer, localStorage.StartTime, *flags.WalDir, RemoteFlushDeadline, scraper)
  53. fanoutStorage := storage.NewFanout(logger, localStorage, remoteStorage)
  54. if err := remoteStorage.ApplyConfig(&cfg); err != nil {
  55. return err
  56. }
  57. scrapeManager, err := scrape.NewManager(nil, logger, fanoutStorage, prometheus.DefaultRegisterer)
  58. if err != nil {
  59. return err
  60. }
  61. if err = scrapeManager.ApplyConfig(&cfg); err != nil {
  62. return err
  63. }
  64. scraper.Set(scrapeManager)
  65. db, err := agent.Open(logger, prometheus.DefaultRegisterer, remoteStorage, *flags.WalDir, opts)
  66. if err != nil {
  67. return err
  68. }
  69. localStorage.Set(db, 0)
  70. db.SetWriteNotified(remoteStorage)
  71. tch := make(chan map[string][]*targetgroup.Group, 1)
  72. tch <- map[string][]*targetgroup.Group{
  73. jobName: {
  74. &targetgroup.Group{
  75. Targets: []model.LabelSet{
  76. {
  77. model.InstanceLabel: model.LabelValue(machineId),
  78. model.AddressLabel: model.LabelValue(*flags.ListenAddress),
  79. },
  80. },
  81. Labels: model.LabelSet{model.JobLabel: jobName},
  82. },
  83. },
  84. }
  85. go func() {
  86. if err = scrapeManager.Run(tch); err != nil {
  87. klog.Errorln(err)
  88. }
  89. }()
  90. return nil
  91. }