agent.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package prom
  2. import (
  3. "time"
  4. "github.com/coroot/coroot-node-agent/common"
  5. "github.com/coroot/coroot-node-agent/flags"
  6. "github.com/prometheus/client_golang/prometheus"
  7. promConfig "github.com/prometheus/common/config"
  8. "github.com/prometheus/common/model"
  9. "github.com/prometheus/prometheus/config"
  10. "github.com/prometheus/prometheus/discovery/targetgroup"
  11. "github.com/prometheus/prometheus/scrape"
  12. "github.com/prometheus/prometheus/storage"
  13. "github.com/prometheus/prometheus/storage/remote"
  14. "github.com/prometheus/prometheus/tsdb"
  15. "github.com/prometheus/prometheus/tsdb/agent"
  16. "k8s.io/klog/v2"
  17. )
  18. const (
  19. RemoteFlushDeadline = time.Minute
  20. jobName = "coroot-node-agent"
  21. RemoteWriteTimeout = 30 * time.Second
  22. )
  23. func StartAgent(machineId string) error {
  24. l := Logger{}
  25. if *flags.MetricsEndpoint == nil {
  26. return nil
  27. }
  28. klog.Infoln("Metrics remote write endpoint:", (*flags.MetricsEndpoint).String())
  29. cfg := config.DefaultConfig
  30. cfg.GlobalConfig.ScrapeInterval = model.Duration(*flags.ScrapeInterval)
  31. cfg.GlobalConfig.ScrapeTimeout = model.Duration(*flags.ScrapeInterval)
  32. cfg.RemoteWriteConfigs = append(cfg.RemoteWriteConfigs,
  33. &config.RemoteWriteConfig{
  34. URL: &promConfig.URL{URL: *flags.MetricsEndpoint},
  35. Headers: common.AuthHeaders(),
  36. RemoteTimeout: model.Duration(RemoteWriteTimeout),
  37. QueueConfig: config.DefaultQueueConfig,
  38. },
  39. )
  40. cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, &config.ScrapeConfig{
  41. JobName: jobName,
  42. HonorLabels: true,
  43. ScrapeClassicHistograms: true,
  44. MetricsPath: "/metrics",
  45. Scheme: "http",
  46. EnableCompression: false,
  47. })
  48. opts := agent.DefaultOptions()
  49. localStorage := &readyStorage{stats: tsdb.NewDBStats()}
  50. scraper := &readyScrapeManager{}
  51. remoteStorage := remote.NewStorage(l, prometheus.DefaultRegisterer, localStorage.StartTime, *flags.WalDir, RemoteFlushDeadline, scraper)
  52. fanoutStorage := storage.NewFanout(l, localStorage, remoteStorage)
  53. if err := remoteStorage.ApplyConfig(&cfg); err != nil {
  54. return err
  55. }
  56. scrapeManager, err := scrape.NewManager(nil, l, fanoutStorage, prometheus.DefaultRegisterer)
  57. if err != nil {
  58. return err
  59. }
  60. if err = scrapeManager.ApplyConfig(&cfg); err != nil {
  61. return err
  62. }
  63. scraper.Set(scrapeManager)
  64. db, err := agent.Open(l, prometheus.DefaultRegisterer, remoteStorage, *flags.WalDir, opts)
  65. if err != nil {
  66. return err
  67. }
  68. localStorage.Set(db, 0)
  69. db.SetWriteNotified(remoteStorage)
  70. tch := make(chan map[string][]*targetgroup.Group, 1)
  71. tch <- map[string][]*targetgroup.Group{
  72. jobName: {
  73. &targetgroup.Group{
  74. Targets: []model.LabelSet{
  75. {
  76. model.InstanceLabel: model.LabelValue(machineId),
  77. model.AddressLabel: model.LabelValue(*flags.ListenAddress),
  78. },
  79. },
  80. Labels: model.LabelSet{model.JobLabel: jobName},
  81. },
  82. },
  83. }
  84. go func() {
  85. if err = scrapeManager.Run(tch); err != nil {
  86. klog.Errorln(err)
  87. }
  88. }()
  89. return nil
  90. }