journald_reader.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package logs
  2. import (
  3. "fmt"
  4. "github.com/coroot/coroot-node-agent/utils"
  5. "github.com/coroot/coroot-node-agent/utils/try"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/coreos/go-systemd/v22/sdjournal"
  10. "github.com/coroot/logparser"
  11. klog "github.com/sirupsen/logrus"
  12. )
  13. type JournaldReader struct {
  14. journal *sdjournal.Journal
  15. subscribers map[string]chan<- logparser.LogEntry
  16. until chan time.Time
  17. lock sync.Mutex
  18. }
  19. func NewJournaldReader(journalPaths ...string) (*JournaldReader, error) {
  20. r := &JournaldReader{
  21. until: make(chan time.Time),
  22. subscribers: map[string]chan<- logparser.LogEntry{},
  23. }
  24. var err error
  25. for _, journalPath := range journalPaths {
  26. if r.journal, err = sdjournal.NewJournalFromDir(journalPath); err != nil {
  27. continue
  28. }
  29. usage, err := r.journal.GetUsage()
  30. if err != nil {
  31. continue
  32. }
  33. if usage == 0 {
  34. r.journal = nil
  35. continue
  36. }
  37. if err = r.journal.SeekRealtimeUsec(uint64(time.Now().Add(time.Millisecond).UnixNano() / 1000)); err != nil {
  38. return nil, err
  39. }
  40. //klog.Infof("systemd journal found in %s", journalPath)
  41. break
  42. }
  43. if r.journal == nil {
  44. return nil, fmt.Errorf("systemd journal not found in %s", strings.Join(journalPaths, ","))
  45. }
  46. //go r.follow()
  47. try.Go(r.follow, utils.CatchFn)
  48. return r, nil
  49. }
  50. func (r *JournaldReader) follow() {
  51. for {
  52. c, err := r.journal.Next()
  53. if err != nil {
  54. klog.Errorln("failed to read journal:", err)
  55. return
  56. }
  57. if c <= 0 {
  58. r.journal.Wait(time.Millisecond * 100)
  59. continue
  60. }
  61. e, err := r.journal.GetEntry()
  62. if err != nil {
  63. klog.Errorf("failed to read entry from journal")
  64. return
  65. }
  66. msg := e.Fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE]
  67. if msg == "" {
  68. continue
  69. }
  70. le := logparser.LogEntry{
  71. Timestamp: time.UnixMicro(int64(e.RealtimeTimestamp)),
  72. Content: msg,
  73. Level: logparser.LevelByPriority(e.Fields[sdjournal.SD_JOURNAL_FIELD_PRIORITY]),
  74. }
  75. r.lock.Lock()
  76. ch, ok := r.subscribers[e.Fields[sdjournal.SD_JOURNAL_FIELD_SYSTEMD_CGROUP]]
  77. r.lock.Unlock()
  78. if !ok {
  79. continue
  80. }
  81. ch <- le
  82. }
  83. }
  84. func (r *JournaldReader) Subscribe(cgroup string, ch chan<- logparser.LogEntry) error {
  85. r.lock.Lock()
  86. defer r.lock.Unlock()
  87. if _, ok := r.subscribers[cgroup]; ok {
  88. return fmt.Errorf(`duplicate subscriber for cgroup %s`, cgroup)
  89. }
  90. r.subscribers[cgroup] = ch
  91. return nil
  92. }
  93. func (r *JournaldReader) Unsubscribe(cgroup string) {
  94. r.lock.Lock()
  95. defer r.lock.Unlock()
  96. if _, ok := r.subscribers[cgroup]; !ok {
  97. klog.Warning("unknown subscriber for cgroup", cgroup)
  98. return
  99. }
  100. delete(r.subscribers, cgroup)
  101. }
  102. func (r *JournaldReader) Close() {
  103. _ = r.journal.Close()
  104. }