tail_reader.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package logs
  2. import (
  3. "bufio"
  4. "context"
  5. "github.com/coroot/logparser"
  6. "io"
  7. "k8s.io/klog/v2"
  8. "os"
  9. "strings"
  10. "time"
  11. )
  12. var (
  13. tailPollInterval = time.Second
  14. )
  15. type TailReader struct {
  16. fileName string
  17. ch chan<- logparser.LogEntry
  18. file *os.File
  19. info os.FileInfo
  20. reader *bufio.Reader
  21. stop context.CancelFunc
  22. stopped chan struct{}
  23. }
  24. func NewTailReader(fileName string, ch chan<- logparser.LogEntry) (*TailReader, error) {
  25. ctx, cancel := context.WithCancel(context.Background())
  26. r := &TailReader{
  27. fileName: fileName,
  28. ch: ch,
  29. stop: cancel,
  30. stopped: make(chan struct{}),
  31. }
  32. var err error
  33. if r.file, err = os.Open(fileName); err != nil {
  34. return nil, err
  35. }
  36. if r.info, err = r.file.Stat(); err != nil {
  37. return nil, err
  38. }
  39. if _, err = r.file.Seek(0, io.SeekEnd); err != nil {
  40. return nil, err
  41. }
  42. r.reader = bufio.NewReader(r.file)
  43. go func() {
  44. var prefix string
  45. for {
  46. select {
  47. case <-ctx.Done():
  48. r.stopped <- struct{}{}
  49. return
  50. default:
  51. line, err := r.reader.ReadString('\n')
  52. if err != nil {
  53. prefix = line
  54. r.poll(ctx)
  55. continue
  56. }
  57. if prefix != "" {
  58. line = prefix + line
  59. prefix = ""
  60. }
  61. r.ch <- logparser.LogEntry{Content: strings.TrimSuffix(line, "\n"), Level: logparser.LevelUnknown}
  62. }
  63. }
  64. }()
  65. return r, nil
  66. }
  67. func (r *TailReader) Stop() {
  68. klog.Infoln("stopping tail reader for", r.fileName)
  69. r.stop()
  70. <-r.stopped
  71. if r.file != nil {
  72. _ = r.file.Close()
  73. }
  74. }
  75. func (r *TailReader) poll(ctx context.Context) {
  76. ticker := time.NewTicker(tailPollInterval)
  77. defer ticker.Stop()
  78. for {
  79. select {
  80. case <-ctx.Done():
  81. return
  82. case <-ticker.C:
  83. if info, err := os.Stat(r.fileName); err != nil {
  84. if r.file != nil {
  85. _ = r.file.Close()
  86. r.file = nil
  87. }
  88. } else {
  89. if r.file == nil {
  90. f, err := os.Open(r.fileName)
  91. if err != nil {
  92. continue
  93. }
  94. r.file = f
  95. r.info = info
  96. r.reader = bufio.NewReader(r.file)
  97. return
  98. }
  99. if r.moved(info) || r.truncated(info) || r.appended(info) {
  100. r.info = info
  101. return
  102. }
  103. }
  104. }
  105. }
  106. }
  107. func (r *TailReader) moved(info os.FileInfo) bool {
  108. if !os.SameFile(r.info, info) {
  109. f, err := os.Open(r.fileName)
  110. if err != nil {
  111. r.file = nil
  112. return false
  113. }
  114. _ = r.file.Close()
  115. r.file = f
  116. r.reader = bufio.NewReader(r.file)
  117. return true
  118. }
  119. return false
  120. }
  121. func (r *TailReader) truncated(info os.FileInfo) bool {
  122. if r.file == nil {
  123. return false
  124. }
  125. if info.Size() < r.info.Size() {
  126. if _, err := r.file.Seek(0, io.SeekStart); err == nil {
  127. return true
  128. }
  129. }
  130. return false
  131. }
  132. func (r *TailReader) appended(info os.FileInfo) bool {
  133. if r.file == nil {
  134. return false
  135. }
  136. if info.Size() > r.info.Size() {
  137. return true
  138. }
  139. return false
  140. }