This repository has been archived by the owner on Nov 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtail.go
370 lines (332 loc) · 8.15 KB
/
tail.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
// Package tail implements file tailing with fsnotify.
package tail
import (
"bufio"
"context"
"io"
"os"
"syscall"
"time"
"github.com/fsnotify/fsnotify"
"github.com/go-faster/errors"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// errStop is returned when the tail of a file has been marked to be stopped.
var errStop = errors.New("tail should now stop")
// Line of file.
type Line struct {
Data []byte // do not retain, reused while reading file
Offset int64 // is always the offset from start
}
// isBlank reports whether line is blank.
func (l *Line) isBlank() bool {
if l == nil {
return true
}
return len(l.Data) == 0
}
func (l *Line) final() bool {
if l.isBlank() {
return false
}
return l.Data[len(l.Data)-1] == '\n'
}
// Location returns corresponding Location for Offset.
//
// Mostly convenience helper for using as Config.Location.
func (l *Line) Location() Location {
if l == nil {
return Location{}
}
return Location{
Offset: l.Offset,
Whence: io.SeekStart,
}
}
// Location represents arguments to io.Seek.
//
// See https://golang.org/pkg/io/#SectionReader.Seek
type Location struct {
Offset int64
Whence int
}
// Config is used to specify how a file must be tailed.
type Config struct {
// Location sets starting file location.
Location *Location
// NotifyTimeout enables additional timeout for file changes waiting.
// Can be used to ensure that we never miss event even if newWatcher fails to
// deliver event.
// Optional.
NotifyTimeout time.Duration
// Follow file after reaching io.EOF, waiting for new lines.
Follow bool
// Initial internal buffer size, optional.
BufferSize int
// Logger to use, optional.
Logger *zap.Logger
// Tracker is optional custom *Tracker.
Tracker Tracker
}
// Handler is called on each log line.
//
// Implementation should not retain Line or Line.Data.
type Handler func(ctx context.Context, l *Line) error
// Tracker tracks file changes.
type Tracker interface {
watchFile(name string) error
watchCreate(name string) error
removeWatchName(name string) error
removeWatchCreate(name string) error
listenEvents(name string) <-chan fsnotify.Event
}
// Tailer implements file tailing.
//
// Use Tail() to start.
type Tailer struct {
cfg Config
name string
file *os.File
reader *bufio.Reader
proxy *offsetProxy
watcher *watcher
lg *zap.Logger
}
const (
minBufSize = 128 // 128 bytes
defaultBufSize = 1024 * 50 // 50kb
)
// File configures and creates new unstarted *Tailer.
//
// Use Tailer.Tail() to start tailing file.
func File(filename string, cfg Config) *Tailer {
if cfg.Logger == nil {
cfg.Logger = zap.NewNop()
}
if cfg.BufferSize <= minBufSize {
cfg.BufferSize = defaultBufSize
}
if cfg.Tracker == nil {
cfg.Tracker = defaultTracker
}
return &Tailer{
cfg: cfg,
name: filename,
lg: cfg.Logger,
watcher: newWatcher(cfg.Logger.Named("watch"), cfg.Tracker, filename),
}
}
type offsetProxy struct {
Reader io.Reader
Offset int64
}
func (o *offsetProxy) Read(p []byte) (n int, err error) {
n, err = o.Reader.Read(p)
o.Offset += int64(n)
return n, err
}
// offset returns the file's current offset.
func (t *Tailer) offset() int64 {
return t.proxy.Offset - int64(t.reader.Buffered())
}
func (t *Tailer) closeFile() {
if t.file == nil {
return
}
_ = t.file.Close()
t.file = nil
}
func (t *Tailer) openFile(ctx context.Context, loc Location) error {
t.closeFile()
for {
var err error
if t.file, err = os.Open(t.name); err != nil {
if os.IsNotExist(err) {
if e := t.lg.Check(zapcore.DebugLevel, "File does not exists"); e != nil {
e.Write(
zap.Error(err),
zap.String("tail.file", t.name),
)
}
if err := t.watcher.WaitExists(ctx); err != nil {
return errors.Wrap(err, "wait exists")
}
continue
}
return errors.Wrap(err, "open")
}
offset, err := t.file.Seek(loc.Offset, loc.Whence)
if err != nil {
return errors.Wrap(err, "seek")
}
t.proxy = &offsetProxy{
Reader: t.file,
Offset: offset,
}
return nil
}
}
func (t *Tailer) readLine(buf []byte) ([]byte, error) {
for {
line, isPrefix, err := t.reader.ReadLine()
buf = append(buf, line...)
if isPrefix {
continue
}
if err != nil {
return nil, err
}
return buf, nil
}
}
// Tail opens file and starts tailing it, reporting observed lines to Handler.
//
// Tail is blocking while calling Handler to reuse internal buffer and
// reduce allocations.
// Tail will call Handler in same sequence as lines are observed.
// See Handler for more info.
//
// Can be called multiple times, but not concurrently.
func (t *Tailer) Tail(ctx context.Context, h Handler) error {
if t == nil {
return errors.New("incorrect Tailer call: Tailer is nil")
}
defer t.closeFile()
{
loc := Location{
Offset: 0,
Whence: io.SeekCurrent,
}
if t.cfg.Location != nil {
loc = *t.cfg.Location
}
if err := t.openFile(ctx, loc); err != nil {
return errors.Wrap(err, "openFile")
}
}
if loc := t.cfg.Location; loc != nil {
// Seek requested.
if _, err := t.file.Seek(loc.Offset, loc.Whence); err != nil {
return errors.Wrap(err, "seek")
}
}
t.resetReader()
t.lg.Debug("Opened")
defer t.lg.Debug("Done")
// Reading line-by-line.
line := &Line{
// Pre-allocate some buffer.
// TODO(ernado): Limit buffer growth to prevent OOM
Data: make([]byte, 0, t.cfg.BufferSize),
}
// Reduce lock contention.
var done atomic.Bool
go func() {
<-ctx.Done()
done.Store(true)
}()
debugEnabled := t.lg.Core().Enabled(zapcore.DebugLevel)
for {
if done.Load() {
return ctx.Err()
}
// Grab the offset in case we need to back up in the event of a half-line.
offset := t.offset()
line.Offset = offset
var readErr error
if debugEnabled {
t.lg.Debug("Reading line", zap.Int64("offset", offset))
}
line.Data, readErr = t.readLine(line.Data)
switch readErr {
case io.EOF:
if debugEnabled {
t.lg.Debug("Got EOF")
}
if line.final() {
// Reporting only final lines, i.e. those ending with newline.
// Line can become final later.
if err := h(ctx, line); err != nil {
return errors.Wrap(err, "handle")
}
line.Data = line.Data[:0] // reset buffer
}
if !t.cfg.Follow {
// End of file reached, but not following.
// Stopping.
if !line.isBlank() && !line.final() {
// Reporting non-final line because we are not following
// and there are no chances for it to become final.
if err := h(ctx, line); err != nil {
return errors.Wrap(err, "handle")
}
}
return nil
}
if debugEnabled {
t.lg.Debug("Waiting for changes")
}
if err := t.waitForChanges(ctx, offset); err != nil {
if errors.Is(err, errStop) {
return nil
}
if errors.Is(err, context.DeadlineExceeded) {
continue
}
return errors.Wrap(err, "wait")
}
case nil:
if err := h(ctx, line); err != nil {
return errors.Wrap(err, "handle")
}
line.Data = line.Data[:0] // reset buffer
default:
return errors.Wrap(readErr, "read")
}
}
}
// waitForChanges waits until the file has been appended, deleted,
// moved or truncated.
//
// evTruncated files are always reopened.
func (t *Tailer) waitForChanges(ctx context.Context, pos int64) error {
if t.cfg.NotifyTimeout != 0 {
// Additional safeguard to ensure that we don't hang forever.
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, t.cfg.NotifyTimeout)
defer cancel()
}
if err := t.watcher.WatchEvents(ctx, pos, func(ctx context.Context, e event) error {
switch e {
case evModified:
t.lg.Debug("Modified")
return nil
case evDeleted:
t.lg.Debug("Stopping: deleted")
return errStop
case evTruncated:
t.lg.Info("Re-opening truncated file")
if err := t.openFile(ctx, Location{
Offset: 0,
Whence: io.SeekStart,
}); err != nil {
return errors.Wrap(err, "open file")
}
t.resetReader()
return nil
default:
return errors.Errorf("invalid event %v", e)
}
}); err != nil {
if os.IsNotExist(err) || errors.Is(err, syscall.ENOENT) {
return errStop
}
return errors.Wrap(err, "watch")
}
return nil
}
func (t *Tailer) resetReader() {
t.reader = bufio.NewReaderSize(t.proxy, t.cfg.BufferSize)
}