wrappers.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. package prom
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math"
  7. "sync"
  8. "k8s.io/klog/v2"
  9. "github.com/prometheus/prometheus/config"
  10. "github.com/prometheus/prometheus/model/exemplar"
  11. "github.com/prometheus/prometheus/model/histogram"
  12. "github.com/prometheus/prometheus/model/labels"
  13. "github.com/prometheus/prometheus/model/metadata"
  14. "github.com/prometheus/prometheus/scrape"
  15. "github.com/prometheus/prometheus/storage"
  16. "github.com/prometheus/prometheus/tsdb"
  17. "github.com/prometheus/prometheus/tsdb/agent"
  18. )
  19. // copy-pasted from Prometheus' main.go
  20. type readyStorage struct {
  21. mtx sync.RWMutex
  22. db storage.Storage
  23. startTimeMargin int64
  24. stats *tsdb.DBStats
  25. }
  26. func (s *readyStorage) ApplyConfig(conf *config.Config) error {
  27. return nil
  28. }
  29. func (s *readyStorage) Set(db storage.Storage, startTimeMargin int64) {
  30. s.mtx.Lock()
  31. defer s.mtx.Unlock()
  32. s.db = db
  33. s.startTimeMargin = startTimeMargin
  34. }
  35. func (s *readyStorage) get() storage.Storage {
  36. s.mtx.RLock()
  37. x := s.db
  38. s.mtx.RUnlock()
  39. return x
  40. }
  41. func (s *readyStorage) getStats() *tsdb.DBStats {
  42. s.mtx.RLock()
  43. x := s.stats
  44. s.mtx.RUnlock()
  45. return x
  46. }
  47. func (s *readyStorage) StartTime() (int64, error) {
  48. if x := s.get(); x != nil {
  49. switch db := x.(type) {
  50. case *agent.DB:
  51. return db.StartTime()
  52. default:
  53. panic(fmt.Sprintf("unknown storage type %T", db))
  54. }
  55. }
  56. return math.MaxInt64, tsdb.ErrNotReady
  57. }
  58. func (s *readyStorage) Querier(mint, maxt int64) (storage.Querier, error) {
  59. if x := s.get(); x != nil {
  60. return x.Querier(mint, maxt)
  61. }
  62. return nil, tsdb.ErrNotReady
  63. }
  64. func (s *readyStorage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
  65. if x := s.get(); x != nil {
  66. return x.ChunkQuerier(mint, maxt)
  67. }
  68. return nil, tsdb.ErrNotReady
  69. }
  70. func (s *readyStorage) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) {
  71. if x := s.get(); x != nil {
  72. switch db := x.(type) {
  73. case *agent.DB:
  74. return nil, agent.ErrUnsupported
  75. default:
  76. panic(fmt.Sprintf("unknown storage type %T", db))
  77. }
  78. }
  79. return nil, tsdb.ErrNotReady
  80. }
  81. func (s *readyStorage) Appender(ctx context.Context) storage.Appender {
  82. if x := s.get(); x != nil {
  83. return x.Appender(ctx)
  84. }
  85. return notReadyAppender{}
  86. }
  87. type notReadyAppender struct{}
  88. func (n notReadyAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
  89. return 0, tsdb.ErrNotReady
  90. }
  91. func (n notReadyAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
  92. return 0, tsdb.ErrNotReady
  93. }
  94. func (n notReadyAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
  95. return 0, tsdb.ErrNotReady
  96. }
  97. func (n notReadyAppender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) {
  98. return 0, tsdb.ErrNotReady
  99. }
  100. func (n notReadyAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
  101. return 0, tsdb.ErrNotReady
  102. }
  103. func (n notReadyAppender) Commit() error { return tsdb.ErrNotReady }
  104. func (n notReadyAppender) Rollback() error { return tsdb.ErrNotReady }
  105. func (s *readyStorage) Close() error {
  106. if x := s.get(); x != nil {
  107. return x.Close()
  108. }
  109. return nil
  110. }
  111. func (s *readyStorage) CleanTombstones() error {
  112. if x := s.get(); x != nil {
  113. switch db := x.(type) {
  114. case *agent.DB:
  115. return agent.ErrUnsupported
  116. default:
  117. panic(fmt.Sprintf("unknown storage type %T", db))
  118. }
  119. }
  120. return tsdb.ErrNotReady
  121. }
  122. func (s *readyStorage) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error {
  123. if x := s.get(); x != nil {
  124. switch db := x.(type) {
  125. case *agent.DB:
  126. return agent.ErrUnsupported
  127. default:
  128. panic(fmt.Sprintf("unknown storage type %T", db))
  129. }
  130. }
  131. return tsdb.ErrNotReady
  132. }
  133. func (s *readyStorage) Snapshot(dir string, withHead bool) error {
  134. if x := s.get(); x != nil {
  135. switch db := x.(type) {
  136. case *agent.DB:
  137. return agent.ErrUnsupported
  138. default:
  139. panic(fmt.Sprintf("unknown storage type %T", db))
  140. }
  141. }
  142. return tsdb.ErrNotReady
  143. }
  144. func (s *readyStorage) Stats(statsByLabelName string, limit int) (*tsdb.Stats, error) {
  145. if x := s.get(); x != nil {
  146. switch db := x.(type) {
  147. case *agent.DB:
  148. return nil, agent.ErrUnsupported
  149. default:
  150. panic(fmt.Sprintf("unknown storage type %T", db))
  151. }
  152. }
  153. return nil, tsdb.ErrNotReady
  154. }
  155. func (s *readyStorage) WALReplayStatus() (tsdb.WALReplayStatus, error) {
  156. if x := s.getStats(); x != nil {
  157. return x.Head.WALReplayStatus.GetWALReplayStatus(), nil
  158. }
  159. return tsdb.WALReplayStatus{}, tsdb.ErrNotReady
  160. }
  161. var ErrNotReady = errors.New("Scrape manager not ready")
  162. type readyScrapeManager struct {
  163. mtx sync.RWMutex
  164. m *scrape.Manager
  165. }
  166. func (rm *readyScrapeManager) Set(m *scrape.Manager) {
  167. rm.mtx.Lock()
  168. defer rm.mtx.Unlock()
  169. rm.m = m
  170. }
  171. func (rm *readyScrapeManager) Get() (*scrape.Manager, error) {
  172. rm.mtx.RLock()
  173. defer rm.mtx.RUnlock()
  174. if rm.m != nil {
  175. return rm.m, nil
  176. }
  177. return nil, ErrNotReady
  178. }
  179. type Logger struct{}
  180. func (l Logger) Log(v ...interface{}) error {
  181. klog.Infoln(v...)
  182. return nil
  183. }