journald_reader.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package logs
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. "time"
  7. "github.com/coreos/go-systemd/v22/sdjournal"
  8. "github.com/coroot/logparser"
  9. klog "github.com/sirupsen/logrus"
  10. )
  11. type JournaldReader struct {
  12. journal *sdjournal.Journal
  13. subscribers map[string]chan<- logparser.LogEntry
  14. until chan time.Time
  15. lock sync.Mutex
  16. }
  17. func NewJournaldReader(journalPaths ...string) (*JournaldReader, error) {
  18. r := &JournaldReader{
  19. until: make(chan time.Time),
  20. subscribers: map[string]chan<- logparser.LogEntry{},
  21. }
  22. var err error
  23. for _, journalPath := range journalPaths {
  24. if r.journal, err = sdjournal.NewJournalFromDir(journalPath); err != nil {
  25. continue
  26. }
  27. usage, err := r.journal.GetUsage()
  28. if err != nil {
  29. continue
  30. }
  31. if usage == 0 {
  32. r.journal = nil
  33. continue
  34. }
  35. if err = r.journal.SeekRealtimeUsec(uint64(time.Now().Add(time.Millisecond).UnixNano() / 1000)); err != nil {
  36. return nil, err
  37. }
  38. //klog.Infof("systemd journal found in %s", journalPath)
  39. break
  40. }
  41. if r.journal == nil {
  42. return nil, fmt.Errorf("systemd journal not found in %s", strings.Join(journalPaths, ","))
  43. }
  44. go r.follow()
  45. return r, nil
  46. }
  47. func (r *JournaldReader) follow() {
  48. for {
  49. c, err := r.journal.Next()
  50. if err != nil {
  51. klog.Errorln("failed to read journal:", err)
  52. return
  53. }
  54. if c <= 0 {
  55. r.journal.Wait(time.Millisecond * 100)
  56. continue
  57. }
  58. e, err := r.journal.GetEntry()
  59. if err != nil {
  60. klog.Errorf("failed to read entry from journal")
  61. return
  62. }
  63. msg := e.Fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE]
  64. if msg == "" {
  65. continue
  66. }
  67. le := logparser.LogEntry{
  68. Timestamp: time.UnixMicro(int64(e.RealtimeTimestamp)),
  69. Content: msg,
  70. Level: logparser.LevelByPriority(e.Fields[sdjournal.SD_JOURNAL_FIELD_PRIORITY]),
  71. }
  72. r.lock.Lock()
  73. ch, ok := r.subscribers[e.Fields[sdjournal.SD_JOURNAL_FIELD_SYSTEMD_CGROUP]]
  74. r.lock.Unlock()
  75. if !ok {
  76. continue
  77. }
  78. ch <- le
  79. }
  80. }
  81. func (r *JournaldReader) Subscribe(cgroup string, ch chan<- logparser.LogEntry) error {
  82. r.lock.Lock()
  83. defer r.lock.Unlock()
  84. if _, ok := r.subscribers[cgroup]; ok {
  85. return fmt.Errorf(`duplicate subscriber for cgroup %s`, cgroup)
  86. }
  87. r.subscribers[cgroup] = ch
  88. return nil
  89. }
  90. func (r *JournaldReader) Unsubscribe(cgroup string) {
  91. r.lock.Lock()
  92. defer r.lock.Unlock()
  93. if _, ok := r.subscribers[cgroup]; !ok {
  94. klog.Warning("unknown subscriber for cgroup", cgroup)
  95. return
  96. }
  97. delete(r.subscribers, cgroup)
  98. }
  99. func (r *JournaldReader) Close() {
  100. _ = r.journal.Close()
  101. }