journald_reader.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package logs
  2. import (
  3. "fmt"
  4. "github.com/coreos/go-systemd/v22/sdjournal"
  5. "github.com/coroot/logparser"
  6. "k8s.io/klog/v2"
  7. "strings"
  8. "sync"
  9. "time"
  10. )
  11. type JournaldReader struct {
  12. journal *sdjournal.Journal
  13. subscribers map[string]chan<- logparser.LogEntry
  14. until chan time.Time
  15. lock sync.Mutex
  16. }
  17. func NewJournaldReader(journalPaths ...string) (*JournaldReader, error) {
  18. r := &JournaldReader{
  19. until: make(chan time.Time),
  20. subscribers: map[string]chan<- logparser.LogEntry{},
  21. }
  22. var err error
  23. for _, journalPath := range journalPaths {
  24. if r.journal, err = sdjournal.NewJournalFromDir(journalPath); err != nil {
  25. continue
  26. }
  27. usage, err := r.journal.GetUsage()
  28. if err != nil {
  29. continue
  30. }
  31. if usage == 0 {
  32. r.journal = nil
  33. continue
  34. }
  35. if err = r.journal.SeekRealtimeUsec(uint64(time.Now().Add(time.Millisecond).UnixNano() / 1000)); err != nil {
  36. return nil, err
  37. }
  38. //klog.Infof("systemd journal found in %s", journalPath)
  39. break
  40. }
  41. if r.journal == nil {
  42. return nil, fmt.Errorf("systemd journal not found in %s", strings.Join(journalPaths, ","))
  43. }
  44. go r.follow()
  45. return r, nil
  46. }
  47. func (r *JournaldReader) follow() {
  48. for {
  49. c, err := r.journal.Next()
  50. if err != nil {
  51. klog.Errorln("failed to read journal:", err)
  52. return
  53. }
  54. if c <= 0 {
  55. r.journal.Wait(time.Millisecond * 100)
  56. continue
  57. }
  58. e, err := r.journal.GetEntry()
  59. if err != nil {
  60. klog.Errorf("failed to read entry from journal")
  61. return
  62. }
  63. msg := e.Fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE]
  64. if msg == "" {
  65. continue
  66. }
  67. le := logparser.LogEntry{
  68. Content: msg,
  69. Level: logparser.LevelByPriority(e.Fields[sdjournal.SD_JOURNAL_FIELD_PRIORITY]),
  70. }
  71. r.lock.Lock()
  72. ch, ok := r.subscribers[e.Fields[sdjournal.SD_JOURNAL_FIELD_SYSTEMD_CGROUP]]
  73. r.lock.Unlock()
  74. if !ok {
  75. continue
  76. }
  77. ch <- le
  78. }
  79. }
  80. func (r *JournaldReader) Subscribe(cgroup string, ch chan<- logparser.LogEntry) error {
  81. r.lock.Lock()
  82. defer r.lock.Unlock()
  83. if _, ok := r.subscribers[cgroup]; ok {
  84. return fmt.Errorf(`duplicate subscriber for cgroup %s`, cgroup)
  85. }
  86. r.subscribers[cgroup] = ch
  87. return nil
  88. }
  89. func (r *JournaldReader) Unsubscribe(cgroup string) {
  90. r.lock.Lock()
  91. defer r.lock.Unlock()
  92. if _, ok := r.subscribers[cgroup]; !ok {
  93. klog.Warning("unknown subscriber for cgroup", cgroup)
  94. return
  95. }
  96. delete(r.subscribers, cgroup)
  97. }
  98. func (r *JournaldReader) Close() {
  99. _ = r.journal.Close()
  100. }