tail_reader.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package logs
  2. import (
  3. "bufio"
  4. "context"
  5. "io"
  6. "os"
  7. "strings"
  8. "time"
  9. "github.com/coroot/logparser"
  10. "k8s.io/klog/v2"
  11. )
  12. var (
  13. tailPollInterval = time.Second
  14. )
  15. type TailReader struct {
  16. fileName string
  17. ch chan<- logparser.LogEntry
  18. file *os.File
  19. info os.FileInfo
  20. reader *bufio.Reader
  21. stop context.CancelFunc
  22. stopped chan struct{}
  23. }
  24. func NewTailReader(fileName string, ch chan<- logparser.LogEntry) (*TailReader, error) {
  25. ctx, cancel := context.WithCancel(context.Background())
  26. r := &TailReader{
  27. fileName: fileName,
  28. ch: ch,
  29. stop: cancel,
  30. stopped: make(chan struct{}),
  31. }
  32. var err error
  33. if r.file, err = os.Open(fileName); err != nil {
  34. return nil, err
  35. }
  36. if r.info, err = r.file.Stat(); err != nil {
  37. return nil, err
  38. }
  39. if _, err = r.file.Seek(0, io.SeekEnd); err != nil {
  40. return nil, err
  41. }
  42. r.reader = bufio.NewReader(r.file)
  43. go func() {
  44. var prefix string
  45. for {
  46. select {
  47. case <-ctx.Done():
  48. r.stopped <- struct{}{}
  49. return
  50. default:
  51. line, err := r.reader.ReadString('\n')
  52. if err != nil {
  53. prefix = line
  54. r.poll(ctx)
  55. continue
  56. }
  57. if prefix != "" {
  58. line = prefix + line
  59. prefix = ""
  60. }
  61. r.ch <- logparser.LogEntry{
  62. Timestamp: time.Now(),
  63. Content: strings.TrimSuffix(line, "\n"),
  64. Level: logparser.LevelUnknown,
  65. }
  66. }
  67. }
  68. }()
  69. return r, nil
  70. }
  71. func (r *TailReader) Stop() {
  72. klog.Infoln("stopping tail reader for", r.fileName)
  73. r.stop()
  74. <-r.stopped
  75. if r.file != nil {
  76. _ = r.file.Close()
  77. }
  78. }
  79. func (r *TailReader) poll(ctx context.Context) {
  80. ticker := time.NewTicker(tailPollInterval)
  81. defer ticker.Stop()
  82. for {
  83. select {
  84. case <-ctx.Done():
  85. return
  86. case <-ticker.C:
  87. if info, err := os.Stat(r.fileName); err != nil {
  88. if r.file != nil {
  89. _ = r.file.Close()
  90. r.file = nil
  91. }
  92. } else {
  93. if r.file == nil {
  94. f, err := os.Open(r.fileName)
  95. if err != nil {
  96. continue
  97. }
  98. r.file = f
  99. r.info = info
  100. r.reader = bufio.NewReader(r.file)
  101. return
  102. }
  103. if r.moved(info) || r.truncated(info) || r.appended(info) {
  104. r.info = info
  105. return
  106. }
  107. }
  108. }
  109. }
  110. }
  111. func (r *TailReader) moved(info os.FileInfo) bool {
  112. if !os.SameFile(r.info, info) {
  113. f, err := os.Open(r.fileName)
  114. if err != nil {
  115. r.file = nil
  116. return false
  117. }
  118. _ = r.file.Close()
  119. r.file = f
  120. r.reader = bufio.NewReader(r.file)
  121. return true
  122. }
  123. return false
  124. }
  125. func (r *TailReader) truncated(info os.FileInfo) bool {
  126. if r.file == nil {
  127. return false
  128. }
  129. if info.Size() < r.info.Size() {
  130. if _, err := r.file.Seek(0, io.SeekStart); err == nil {
  131. return true
  132. }
  133. }
  134. return false
  135. }
  136. func (r *TailReader) appended(info os.FileInfo) bool {
  137. if r.file == nil {
  138. return false
  139. }
  140. if info.Size() > r.info.Size() {
  141. return true
  142. }
  143. return false
  144. }