tail_reader.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package logs
  2. import (
  3. "bufio"
  4. "context"
  5. "github.com/coroot/coroot-node-agent/utils"
  6. "github.com/coroot/coroot-node-agent/utils/try"
  7. "io"
  8. "os"
  9. "strings"
  10. "time"
  11. "github.com/coroot/logparser"
  12. klog "github.com/sirupsen/logrus"
  13. )
  14. var (
  15. tailPollInterval = time.Second
  16. )
  17. type TailReader struct {
  18. fileName string
  19. ch chan<- logparser.LogEntry
  20. file *os.File
  21. info os.FileInfo
  22. reader *bufio.Reader
  23. stop context.CancelFunc
  24. stopped chan struct{}
  25. }
  26. func NewTailReader(fileName string, ch chan<- logparser.LogEntry) (*TailReader, error) {
  27. ctx, cancel := context.WithCancel(context.Background())
  28. r := &TailReader{
  29. fileName: fileName,
  30. ch: ch,
  31. stop: cancel,
  32. stopped: make(chan struct{}),
  33. }
  34. var err error
  35. if r.file, err = os.Open(fileName); err != nil {
  36. return nil, err
  37. }
  38. if r.info, err = r.file.Stat(); err != nil {
  39. return nil, err
  40. }
  41. if _, err = r.file.Seek(0, io.SeekEnd); err != nil {
  42. return nil, err
  43. }
  44. r.reader = bufio.NewReader(r.file)
  45. try.Go(func() {
  46. var prefix string
  47. for {
  48. select {
  49. case <-ctx.Done():
  50. r.stopped <- struct{}{}
  51. return
  52. default:
  53. line, err := r.reader.ReadString('\n')
  54. if err != nil {
  55. prefix = line
  56. r.poll(ctx)
  57. continue
  58. }
  59. if prefix != "" {
  60. line = prefix + line
  61. prefix = ""
  62. }
  63. r.ch <- logparser.LogEntry{
  64. Timestamp: time.Now(),
  65. Content: strings.TrimSuffix(line, "\n"),
  66. Level: logparser.LevelUnknown,
  67. }
  68. }
  69. }
  70. }, utils.CatchFn)
  71. return r, nil
  72. }
  73. func (r *TailReader) Stop() {
  74. klog.Infoln("stopping tail reader for", r.fileName)
  75. r.stop()
  76. <-r.stopped
  77. if r.file != nil {
  78. _ = r.file.Close()
  79. }
  80. }
  81. func (r *TailReader) poll(ctx context.Context) {
  82. ticker := time.NewTicker(tailPollInterval)
  83. defer ticker.Stop()
  84. for {
  85. select {
  86. case <-ctx.Done():
  87. return
  88. case <-ticker.C:
  89. if info, err := os.Stat(r.fileName); err != nil {
  90. if r.file != nil {
  91. _ = r.file.Close()
  92. r.file = nil
  93. }
  94. } else {
  95. if r.file == nil {
  96. f, err := os.Open(r.fileName)
  97. if err != nil {
  98. continue
  99. }
  100. r.file = f
  101. r.info = info
  102. r.reader = bufio.NewReader(r.file)
  103. return
  104. }
  105. if r.moved(info) || r.truncated(info) || r.appended(info) {
  106. r.info = info
  107. return
  108. }
  109. }
  110. }
  111. }
  112. }
  113. func (r *TailReader) moved(info os.FileInfo) bool {
  114. if !os.SameFile(r.info, info) {
  115. f, err := os.Open(r.fileName)
  116. if err != nil {
  117. r.file = nil
  118. return false
  119. }
  120. _ = r.file.Close()
  121. r.file = f
  122. r.reader = bufio.NewReader(r.file)
  123. return true
  124. }
  125. return false
  126. }
  127. func (r *TailReader) truncated(info os.FileInfo) bool {
  128. if r.file == nil {
  129. return false
  130. }
  131. if info.Size() < r.info.Size() {
  132. if _, err := r.file.Seek(0, io.SeekStart); err == nil {
  133. return true
  134. }
  135. }
  136. return false
  137. }
  138. func (r *TailReader) appended(info os.FileInfo) bool {
  139. if r.file == nil {
  140. return false
  141. }
  142. if info.Size() > r.info.Size() {
  143. return true
  144. }
  145. return false
  146. }