diff --git a/filewatcher/filewatcher.go b/filewatcher/filewatcher.go index 3b7f0c40f..75cae4658 100644 --- a/filewatcher/filewatcher.go +++ b/filewatcher/filewatcher.go @@ -2,29 +2,22 @@ package filewatcher import ( "context" - "errors" - "fmt" "io" "io/fs" - "os" - "path/filepath" - "sync" - "sync/atomic" "time" - "github.com/fsnotify/fsnotify" - - "github.com/reddit/baseplate.go/internal/limitopen" + v2 "github.com/reddit/baseplate.go/filewatcher/v2" + "github.com/reddit/baseplate.go/filewatcher/v2/fwtest" "github.com/reddit/baseplate.go/log" ) // DefaultFSEventsDelay is the default FSEventsDelay used when creating a new // FileWatcher. -const DefaultFSEventsDelay = 1 * time.Second +const DefaultFSEventsDelay = v2.DefaultFSEventsDelay // DefaultPollingInterval is the default PollingInterval used when creating a // new FileWatcher. -const DefaultPollingInterval = 30 * time.Second +const DefaultPollingInterval = v2.DefaultPollingInterval // FileWatcher loads and parses data from a file or directory, and watches for // changes in order to refresh its stored data. @@ -47,14 +40,14 @@ type FileWatcher interface { // // It's intentionally defined as a variable instead of constant, so that the // caller can tweak its value when needed. -var InitialReadInterval = time.Second / 2 +var InitialReadInterval = v2.DefaultInitialReadInterval // DefaultMaxFileSize is the default MaxFileSize used when it's <= 0. // // It's 10 MiB, with hard limit multiplier of 10. const ( - DefaultMaxFileSize = 10 << 20 - HardLimitMultiplier = 10 + DefaultMaxFileSize = v2.DefaultMaxFileSize + HardLimitMultiplier = v2.HardLimitMultiplier ) // A Parser is a callback function to be called when a watched file has its @@ -62,7 +55,7 @@ const ( // // Please note that Parser should always return the consistent type. // Inconsistent type will cause panic, as does returning nil data and nil error. -type Parser func(f io.Reader) (data any, err error) +type Parser = v2.Parser[any] // A DirParser is a callback function that will be called when the watched // directory has its content changed or is read for the first time. @@ -72,26 +65,19 @@ type Parser func(f io.Reader) (data any, err error) // as does returning nil data and nil error. // // Use WrapDirParser to wrap it into a Parser to be used with FileWatcher. -type DirParser func(dir fs.FS) (data any, err error) +type DirParser = v2.DirParser[any] // WrapDirParser wraps a DirParser for a directory to a Parser. // // When using FileWatcher to watch a directory instead of a single file, // you MUST use WrapDirParser instead of any other Parser implementations. func WrapDirParser(dp DirParser) Parser { - return func(r io.Reader) (data any, err error) { - path := string(r.(fakeDirectoryReader)) - dir := os.DirFS(path) - return dp(dir) - } + return v2.WrapDirParser(dp) } // Result is the return type of New. Use Get function to get the actual data. type Result struct { - data atomic.Value - - ctx context.Context - cancel context.CancelFunc + result *v2.Result[any] } // Get returns the latest parsed data from the FileWatcher. @@ -99,7 +85,7 @@ type Result struct { // Although the type is any, // it's guaranteed to be whatever actual type is implemented inside Parser. func (r *Result) Get() any { - return r.data.Load().(*atomicData).data + return r.result.Get() } // Stop stops the FileWatcher. @@ -110,132 +96,7 @@ func (r *Result) Get() any { // It's OK to call Stop multiple times. // Calls after the first one are essentially no-op. func (r *Result) Stop() { - r.cancel() -} - -func getMtime(path string) (time.Time, error) { - stat, err := os.Stat(path) - if err != nil { - return time.Time{}, err - } - return stat.ModTime(), nil -} - -func (r *Result) watcherLoop( - watcher *fsnotify.Watcher, - path string, - parser Parser, - softLimit, hardLimit int64, - logger log.Wrapper, - pollingInterval time.Duration, - fsEventsDelay time.Duration, -) { - var lock sync.Mutex - forceReload := func() { - // make sure we don't run forceReload concurrently - lock.Lock() - defer lock.Unlock() - - d, mtime, files, err := openAndParse(path, parser, softLimit, hardLimit) - if err != nil { - logger.Log(context.Background(), err.Error()) - } else { - r.data.Store(&atomicData{ - data: d, - mtime: mtime, - }) - // remove all previously watched files - for _, path := range watcher.WatchList() { - watcher.Remove(path) - } - // then read all new files to watch - for _, path := range files { - if err := watcher.Add(path); err != nil { - logger.Log(context.Background(), fmt.Sprintf( - "filewatcher: failed to watch file %q: %v", - path, - err, - )) - } - } - } - } - - reload := func() { - mtime, err := getMtime(path) - if err != nil { - logger.Log(context.Background(), fmt.Sprintf( - "filewatcher: failed to get mtime for %q: %v", - path, - err, - )) - return - } - if r.data.Load().(*atomicData).mtime.Before(mtime) { - forceReload() - } - } - - var tickerChan <-chan time.Time - if pollingInterval > 0 { - ticker := time.NewTicker(pollingInterval) - defer ticker.Stop() - tickerChan = ticker.C - } - var timer *time.Timer - for { - select { - case <-r.ctx.Done(): - watcher.Close() - return - - case err := <-watcher.Errors: - logger.Log(context.Background(), "filewatcher: watcher error: "+err.Error()) - - case ev := <-watcher.Events: - // When both r.ctx.Done() and watcher.Events are unblocked, there's no - // guarantee which case would be picked, so do an additional ctx check - // here to make sure we don't spam the log with i/o errors (which mainly - // happen in tests) - if r.ctx.Err() != nil { - continue - } - - switch ev.Op { - default: - // Ignore uninterested events. - case fsnotify.Create, fsnotify.Write, fsnotify.Rename, fsnotify.Remove: - // Use fsEventDelay to avoid calling forceReload repetively when a burst - // of fs events happens (for example, when multiple files within the - // directory changed). - if timer == nil { - timer = time.AfterFunc(fsEventsDelay, forceReload) - } else { - timer.Reset(fsEventsDelay) - } - } - - case <-tickerChan: - // When both r.ctx.Done() and tickerChan are unblocked, there's no - // guarantee which case would be picked, so do an additional ctx check - // here to make sure we don't spam the log with i/o errors (which mainly - // happen in tests) - if r.ctx.Err() != nil { - continue - } - - reload() - } - } -} - -// The actual data held in Result.data. -type atomicData struct { - // actual parsed data - data any - - // other metadata - mtime time.Time + r.result.Close() } var ( @@ -256,10 +117,14 @@ type Config struct { // either returned by parser or by the underlying file system watcher. // Please note that this does not include errors returned by the first parser // call, which will be returned directly. + // + // Deprecated: Errors will be logged via slog at error level instead. Logger log.Wrapper `yaml:"logger"` // Optional. When <=0 DefaultMaxFileSize will be used instead. // + // This is completely ignored when DirParser is used. + // // This is the soft limit, // we will also auto add a hard limit which is 10x (see HardLimitMultiplier) // of soft limit. @@ -294,54 +159,6 @@ type Config struct { FSEventsDelay time.Duration `yaml:"fsEventsDelay"` } -type fakeDirectoryReader string - -func (fakeDirectoryReader) Read([]byte) (int, error) { - return 0, errors.New("filewatcher: you are most likely watching a directory without using DirParser") -} - -func openAndParse(path string, parser Parser, limit, hardLimit int64) (data any, mtime time.Time, files []string, _ error) { - stats, err := os.Stat(path) - if err != nil { - return nil, time.Time{}, nil, fmt.Errorf("filewatcher: i/o error: %w", err) - } - mtime = stats.ModTime() - files = []string{ - // Note: We need to also watch the parent directory, - // because only watching the file won't give us CREATE events, - // which will happen with atomic renames. - filepath.Dir(path), - path, - } - - var reader io.Reader - if stats.IsDir() { - reader = fakeDirectoryReader(path) - if err := filepath.Walk(path, func(p string, _ fs.FileInfo, err error) error { - if err == nil { - files = append(files, p) - } - return nil - }); err != nil { - return nil, time.Time{}, nil, fmt.Errorf("filewatcher: i/o error: %w", err) - } - } else { - // file - f, err := limitopen.OpenWithLimit(path, limit, hardLimit) - if err != nil { - return nil, time.Time{}, nil, fmt.Errorf("filewatcher: i/o error: %w", err) - } - defer f.Close() - reader = f - } - - d, err := parser(reader) - if err != nil { - return nil, time.Time{}, nil, fmt.Errorf("filewatcher: parser error: %w", err) - } - return d, mtime, files, nil -} - // New creates a new FileWatcher. // // If the path is not available at the time of calling, @@ -353,91 +170,23 @@ func openAndParse(path string, parser Parser, limit, hardLimit int64) (data any, // Please note that this does not include errors returned by the first parser // call, which will be returned directly. func New(ctx context.Context, cfg Config) (*Result, error) { - limit := cfg.MaxFileSize - if limit <= 0 { - limit = DefaultMaxFileSize + opts := []v2.Option{ + v2.WithInitialReadInterval(InitialReadInterval), } - hardLimit := limit * HardLimitMultiplier - - var data any - var mtime time.Time - var files []string - - var lastErr error - for { - select { - default: - case <-ctx.Done(): - return nil, fmt.Errorf( - "filewatcher: context canceled while waiting for file(s) under %q to load: %w, last err: %w", - cfg.Path, - ctx.Err(), - lastErr, - ) - } - - var err error - data, mtime, files, err = openAndParse(cfg.Path, cfg.Parser, limit, hardLimit) - if errors.Is(err, fs.ErrNotExist) { - lastErr = err - time.Sleep(InitialReadInterval) - continue - } - if err != nil { - return nil, err - } - break + if cfg.MaxFileSize > 0 { + opts = append(opts, v2.WithFileSizeLimit(cfg.MaxFileSize)) } - - watcher, err := fsnotify.NewWatcher() - if err != nil { - return nil, err + if cfg.PollingInterval != 0 { + opts = append(opts, v2.WithPollingInterval(cfg.PollingInterval)) } - for _, path := range files { - if err := watcher.Add(path); err != nil { - return nil, fmt.Errorf( - "filewatcher: failed to watch %q: %w", - path, - err, - ) - } + if cfg.FSEventsDelay > 0 { + opts = append(opts, v2.WithFSEventsDelay(cfg.FSEventsDelay)) } - - res := new(Result) - res.data.Store(&atomicData{ - data: data, - mtime: mtime, - }) - res.ctx, res.cancel = context.WithCancel(context.Background()) - - if cfg.PollingInterval == 0 { - cfg.PollingInterval = DefaultPollingInterval - } - if cfg.FSEventsDelay <= 0 { - cfg.FSEventsDelay = DefaultFSEventsDelay - } - go res.watcherLoop( - watcher, - cfg.Path, - cfg.Parser, - limit, - hardLimit, - cfg.Logger, - cfg.PollingInterval, - cfg.FSEventsDelay, - ) - - return res, nil -} - -// NewMockFilewatcher returns a pointer to a new MockFileWatcher object -// initialized with the given io.Reader and Parser. -func NewMockFilewatcher(r io.Reader, parser Parser) (*MockFileWatcher, error) { - fw := &MockFileWatcher{parser: parser} - if err := fw.Update(r); err != nil { + result, err := v2.New(ctx, cfg.Path, cfg.Parser, opts...) + if err != nil { return nil, err } - return fw, nil + return &Result{result: result}, nil } // MockFileWatcher is an implementation of FileWatcher that does not actually read @@ -445,8 +194,17 @@ func NewMockFilewatcher(r io.Reader, parser Parser) (*MockFileWatcher, error) { // with NewMockFilewatcher. It provides an additional Update method that allows // you to update this data after it has been created. type MockFileWatcher struct { - data atomic.Value - parser Parser + fake *fwtest.FakeFileWatcher[any] +} + +// NewMockFilewatcher returns a pointer to a new MockFileWatcher object +// initialized with the given io.Reader and Parser. +func NewMockFilewatcher(r io.Reader, parser Parser) (*MockFileWatcher, error) { + fake, err := fwtest.NewFakeFilewatcher(r, parser) + if err != nil { + return nil, err + } + return &MockFileWatcher{fake: fake}, nil } // Update updates the data of the MockFileWatcher using the given io.Reader and @@ -454,17 +212,12 @@ type MockFileWatcher struct { // // This method is not threadsafe. func (fw *MockFileWatcher) Update(r io.Reader) error { - data, err := fw.parser(r) - if err != nil { - return err - } - fw.data.Store(data) - return nil + return fw.fake.Update(r) } // Get returns the parsed data. func (fw *MockFileWatcher) Get() any { - return fw.data.Load() + return fw.fake.Get() } // Stop is a no-op. @@ -473,8 +226,7 @@ func (fw *MockFileWatcher) Stop() {} // MockDirWatcher is an implementation of FileWatcher for testing with watching // directories. type MockDirWatcher struct { - data atomic.Value - parser DirParser + fake *fwtest.FakeDirWatcher[any] } // NewMockDirWatcher creates a MockDirWatcher with the initial data and the @@ -482,26 +234,21 @@ type MockDirWatcher struct { // // It provides Update function to update the data after it's been created. func NewMockDirWatcher(dir fs.FS, parser DirParser) (*MockDirWatcher, error) { - dw := &MockDirWatcher{parser: parser} - if err := dw.Update(dir); err != nil { + fake, err := fwtest.NewFakeDirWatcher(dir, parser) + if err != nil { return nil, err } - return dw, nil + return &MockDirWatcher{fake: fake}, nil } // Update updates the data stored in this MockDirWatcher. func (dw *MockDirWatcher) Update(dir fs.FS) error { - data, err := dw.parser(dir) - if err != nil { - return err - } - dw.data.Store(data) - return nil + return dw.fake.Update(dir) } // Get implements FileWatcher by returning the last updated data. func (dw *MockDirWatcher) Get() any { - return dw.data.Load() + return dw.fake.Get() } // Stop is a no-op. diff --git a/filewatcher/filewatcher_test.go b/filewatcher/filewatcher_test.go index dc5870e93..c5828ce7d 100644 --- a/filewatcher/filewatcher_test.go +++ b/filewatcher/filewatcher_test.go @@ -17,7 +17,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/reddit/baseplate.go/filewatcher" - "github.com/reddit/baseplate.go/log" ) const fsEventsDelayForTests = 10 * time.Millisecond @@ -107,7 +106,6 @@ func TestFileWatcher(t *testing.T) { filewatcher.Config{ Path: path, Parser: parser, - Logger: log.TestWrapper(t), PollingInterval: c.interval, FSEventsDelay: fsEventsDelayForTests, }, @@ -152,7 +150,6 @@ func TestFileWatcherTimeout(t *testing.T) { filewatcher.Config{ Path: path, Parser: parser, - Logger: log.TestWrapper(t), FSEventsDelay: fsEventsDelayForTests, }, ) @@ -196,7 +193,6 @@ func TestFileWatcherRename(t *testing.T) { filewatcher.Config{ Path: path, Parser: parser, - Logger: log.TestWrapper(t), PollingInterval: writeDelay, FSEventsDelay: fsEventsDelayForTests, }, @@ -230,11 +226,6 @@ func TestParserFailure(t *testing.T) { } return value, nil } - var loggerCalled atomic.Int64 - logger := func(_ context.Context, msg string) { - loggerCalled.Store(1) - t.Log(msg) - } dir := t.TempDir() path := filepath.Join(dir, "foo") @@ -247,7 +238,6 @@ func TestParserFailure(t *testing.T) { filewatcher.Config{ Path: path, Parser: parser, - Logger: logger, PollingInterval: -1, // disable polling as we need exact numbers of parser calls in this test FSEventsDelay: fsEventsDelayForTests, }, @@ -270,9 +260,6 @@ func TestParserFailure(t *testing.T) { } // Give it some time to handle the file content change time.Sleep(500 * time.Millisecond) - if loggerCalled.Load() == 0 { - t.Error("Expected logger being called") - } value = data.Get().(int64) if value != expected { t.Errorf("data.Get().(int64) expected %d, got %d", expected, value) @@ -363,7 +350,6 @@ func TestFileWatcherDir(t *testing.T) { filewatcher.Config{ Path: dir, Parser: filewatcher.WrapDirParser(parser), - Logger: log.TestWrapper(t), PollingInterval: -1, // disable polling FSEventsDelay: fsEventsDelayForTests, }, @@ -430,22 +416,6 @@ func limitedParser(t *testing.T, expectedSize int64) filewatcher.Parser { } } -type logWrapper struct { - called atomic.Int64 -} - -func (w *logWrapper) wrapper(tb testing.TB) log.Wrapper { - return func(_ context.Context, msg string) { - tb.Helper() - tb.Logf("logger called with msg: %q", msg) - w.called.Add(1) - } -} - -func (w *logWrapper) getCalled() int64 { - return w.called.Load() -} - func TestParserSizeLimit(t *testing.T) { interval := fsEventsDelayForTests backupInitialReadInterval := filewatcher.InitialReadInterval @@ -477,13 +447,11 @@ func TestParserSizeLimit(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), timeout) t.Cleanup(cancel) - var wrapper logWrapper data, err := filewatcher.New( ctx, filewatcher.Config{ Path: path, Parser: limitedParser(t, size), - Logger: wrapper.wrapper(t), MaxFileSize: limit, PollingInterval: writeDelay, FSEventsDelay: fsEventsDelayForTests, @@ -501,13 +469,6 @@ func TestParserSizeLimit(t *testing.T) { // We expect the second parse would fail because of the data is beyond the // hard limit, so the data should still be expectedPayload compareBytesData(t, data.Get(), expectedPayload) - // Since we expect the second parse would fail, we also expect the logger to - // be called at least once. - // The logger could be called twice because of reload triggered by polling. - const expectedCalledMin = 1 - if called := wrapper.getCalled(); called < expectedCalledMin { - t.Errorf("Expected log.Wrapper to be called at least %d times, actual %d", expectedCalledMin, called) - } } func TestMockFileWatcher(t *testing.T) { diff --git a/filewatcher/v2/doc.go b/filewatcher/v2/doc.go new file mode 100644 index 000000000..6b9b12d69 --- /dev/null +++ b/filewatcher/v2/doc.go @@ -0,0 +1,3 @@ +// Package filewatcher provides a go implementation of baseplate's FileWatcher: +// https://baseplate.readthedocs.io/en/stable/api/baseplate/lib/file_watcher.html +package filewatcher diff --git a/filewatcher/v2/doc_test.go b/filewatcher/v2/doc_test.go new file mode 100644 index 000000000..80fc71c4e --- /dev/null +++ b/filewatcher/v2/doc_test.go @@ -0,0 +1,41 @@ +package filewatcher_test + +import ( + "context" + "encoding/json" + "io" + "time" + + "github.com/reddit/baseplate.go/filewatcher/v2" + "github.com/reddit/baseplate.go/log" +) + +// This example demonstrates how to use filewatcher. +func Example() { + const ( + // The path to the file. + path = "/opt/data.json" + // Timeout on the initial read. + timeout = time.Second * 30 + ) + + // The type of the parsed data + type dataType map[string]any + + // Wrap a json decoder as parser + parser := func(f io.Reader) (dataType, error) { + var data dataType + err := json.NewDecoder(f).Decode(&data) + return data, err + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + data, err := filewatcher.New(ctx, path, parser) + if err != nil { + log.Fatal(err) + } + + // Whenever you need to use the parsed data, just call data.Get(): + _ = data.Get() +} diff --git a/filewatcher/v2/filewatcher.go b/filewatcher/v2/filewatcher.go new file mode 100644 index 000000000..1eff8819d --- /dev/null +++ b/filewatcher/v2/filewatcher.go @@ -0,0 +1,444 @@ +package filewatcher + +import ( + "context" + "errors" + "fmt" + "io" + "io/fs" + "log/slog" + "os" + "path/filepath" + "sync" + "sync/atomic" + "time" + + "github.com/fsnotify/fsnotify" + + "github.com/reddit/baseplate.go/internal/limitopen" +) + +// Default option values +const ( + // DefaultFSEventsDelay is the default FSEventsDelay used when creating a new + // FileWatcher. + DefaultFSEventsDelay = 1 * time.Second + + // DefaultPollingInterval is the default PollingInterval used when creating a + // new FileWatcher. + DefaultPollingInterval = 30 * time.Second + + // DefaultInitialReadInterval is the default InitialReadInterval used when + // creating a new FileWatcher. + DefaultInitialReadInterval = time.Second / 2 + + // DefaultMaxFileSize is the default MaxFileSize used when it's <= 0. + // + // It's 10 MiB, with hard limit multiplier of 10. + DefaultMaxFileSize = 10 << 20 + HardLimitMultiplier = 10 +) + +// FileWatcher loads and parses data from a file or directory, and watches for +// changes in order to refresh its stored data. +type FileWatcher[T any] interface { + // Close stops the FileWatcher + // + // After Close is called you won't get any updates on the file content, + // but you can still call Get to get the last content before stopping. + // + // It's OK to call Close multiple times. + // Calls after the first one are essentially no-op. + // + // Close never return an error. + io.Closer + + // Get returns the latest, parsed data from the FileWatcher. + Get() T +} + +// A Parser is a callback function to be called when a watched file has its +// content changed, or is read for the first time. +type Parser[T any] func(f io.Reader) (data T, err error) + +// A DirParser is a callback function that will be called when the watched +// directory has its content changed or is read for the first time. +// +// Use WrapDirParser to wrap it into a Parser to be used with FileWatcher. +type DirParser[T any] func(dir fs.FS) (data T, err error) + +// WrapDirParser wraps a DirParser for a directory to a Parser. +// +// When using FileWatcher to watch a directory instead of a single file, +// you MUST use WrapDirParser instead of any other Parser implementations. +func WrapDirParser[T any](dp DirParser[T]) Parser[T] { + return func(r io.Reader) (data T, err error) { + path := string(r.(fakeDirectoryReader)) + dir := os.DirFS(path) + return dp(dir) + } +} + +// Result is the return type of New. Use Get function to get the actual data. +type Result[T any] struct { + data atomic.Pointer[dataAt[T]] + + ctx context.Context + cancel context.CancelFunc +} + +// Get returns the latest parsed data from the FileWatcher. +func (r *Result[T]) Get() T { + return r.data.Load().data +} + +// Close stops the FileWatcher. +// +// After Close is called you won't get any updates on the file content, +// but you can still call Get to get the last content before stopping. +// +// It's OK to call Close multiple times. +// Calls after the first one are essentially no-op. +// +// Close never returns an error. +func (r *Result[T]) Close() error { + r.cancel() + return nil +} + +func getMtime(path string) (time.Time, error) { + stat, err := os.Stat(path) + if err != nil { + return time.Time{}, err + } + return stat.ModTime(), nil +} + +func (r *Result[T]) watcherLoop( + watcher *fsnotify.Watcher, + path string, + parser Parser[T], + softLimit, hardLimit int64, + pollingInterval time.Duration, + fsEventsDelay time.Duration, +) { + var lock sync.Mutex + forceReload := func() { + // make sure we don't run forceReload concurrently + lock.Lock() + defer lock.Unlock() + + d, mtime, files, err := openAndParse(path, parser, softLimit, hardLimit) + if err != nil { + slog.ErrorContext(r.ctx, "filewatcher: openAndParse returned error", "err", err) + return + } + r.data.Store(&dataAt[T]{ + data: d, + mtime: mtime, + }) + // remove all previously watched files + for _, path := range watcher.WatchList() { + watcher.Remove(path) + } + // then read all new files to watch + for _, path := range files { + if err := watcher.Add(path); err != nil { + slog.ErrorContext(r.ctx, "filewatcher: failed to watch file", "err", err, "path", path) + } + } + } + + reload := func() { + mtime, err := getMtime(path) + if err != nil { + slog.ErrorContext(r.ctx, "filewatcher: failed to get mtime for file", "err", err, "path", path) + return + } + if r.data.Load().mtime.Before(mtime) { + forceReload() + } + } + + var tickerChan <-chan time.Time + if pollingInterval > 0 { + ticker := time.NewTicker(pollingInterval) + defer ticker.Stop() + tickerChan = ticker.C + } + var timer *time.Timer // only populated with time.AfterFunc + for { + select { + case <-r.ctx.Done(): + watcher.Close() + return + + case err := <-watcher.Errors: + slog.ErrorContext(r.ctx, "filewatcher: watcher error", "err", err) + + case ev := <-watcher.Events: + // When both r.ctx.Done() and watcher.Events are unblocked, there's no + // guarantee which case would be picked, so do an additional ctx check + // here to make sure we don't spam the log with i/o errors (which mainly + // happen in tests) + if r.ctx.Err() != nil { + continue + } + + switch ev.Op { + default: + // Ignore uninterested events. + case fsnotify.Create, fsnotify.Write, fsnotify.Rename, fsnotify.Remove: + // Use fsEventDelay to avoid calling forceReload repetively when a burst + // of fs events happens (for example, when multiple files within the + // directory changed). + if timer == nil { + timer = time.AfterFunc(fsEventsDelay, forceReload) + } else { + // Appropriate here without the additional check because timer was + // created via time.AfterFunc not time.NewTimer. + // See the discussion here: + // https://github.com/reddit/baseplate.go/pull/654#discussion_r1610983058 + timer.Reset(fsEventsDelay) + } + } + + case <-tickerChan: + // When both r.ctx.Done() and tickerChan are unblocked, there's no + // guarantee which case would be picked, so do an additional ctx check + // here to make sure we don't spam the log with i/o errors (which mainly + // happen in tests) + if r.ctx.Err() != nil { + continue + } + + reload() + } + } +} + +// The actual data and mtime held in Result.data. +type dataAt[T any] struct { + // actual parsed data + data T + + // other metadata + mtime time.Time +} + +var ( + _ FileWatcher[any] = (*Result[any])(nil) +) + +type fakeDirectoryReader string + +func (fakeDirectoryReader) Read([]byte) (int, error) { + return 0, errors.New("filewatcher: you are most likely watching a directory without using DirParser") +} + +func openAndParse[T any](path string, parser Parser[T], limit, hardLimit int64) (data T, mtime time.Time, files []string, _ error) { + var zero T + stats, err := os.Stat(path) + if err != nil { + return zero, time.Time{}, nil, fmt.Errorf("filewatcher: i/o error: %w", err) + } + mtime = stats.ModTime() + files = []string{ + // Note: We need to also watch the parent directory, + // because only watching the file won't give us CREATE events, + // which will happen with atomic renames. + filepath.Dir(path), + path, + } + + var reader io.Reader + if stats.IsDir() { + reader = fakeDirectoryReader(path) + if err := filepath.Walk(path, func(p string, _ fs.FileInfo, err error) error { + if err == nil { + files = append(files, p) + } + return nil + }); err != nil { + return zero, time.Time{}, nil, fmt.Errorf("filewatcher: i/o error: %w", err) + } + } else { + // file + f, err := limitopen.OpenWithLimit(path, limit, hardLimit) + if err != nil { + return zero, time.Time{}, nil, fmt.Errorf("filewatcher: i/o error: %w", err) + } + defer f.Close() + reader = f + } + + d, err := parser(reader) + if err != nil { + return zero, time.Time{}, nil, fmt.Errorf("filewatcher: parser error: %w", err) + } + return d, mtime, files, nil +} + +type opts struct { + fsEventsDelay time.Duration + pollingInterval time.Duration + initialReadInterval time.Duration + + fileSizeLimit int64 +} + +// Option used in New. +type Option func(*opts) + +// WithOptions is a sugar to curry zero or more options. +func WithOptions(options ...Option) Option { + return func(o *opts) { + for _, opt := range options { + opt(o) + } + } +} + +// WithFSEventsDelay sets the delay between receiving the fs events and actually +// reading and parsing the changes. +// +// It's used to avoid short bursts of fs events (for example, when watching a +// directory) causing reading and parsing repetively. +// +// Defaut to DefaultFSEventsDelay. +func WithFSEventsDelay(delay time.Duration) Option { + return func(o *opts) { + o.fsEventsDelay = delay + } +} + +// WithPollingInterval sets the interval to check file changes proactively. +// +// Default to DefaultPollingInterval. +// To disable polling completely, set it to a negative value. +// +// Without polling, filewatcher relies solely on fs events from the parent +// directory. This works for most cases but will not work in the cases that +// the parent directory will be remount upon change +// (for example, k8s ConfigMap). +func WithPollingInterval(interval time.Duration) Option { + return func(o *opts) { + o.pollingInterval = interval + } +} + +// WithInitialReadInterval sets the interval to keep retrying to open the file +// when creating a new FileWatcher, when the file was not initially available. +// +// Default to DefaultInitialReadInterval. +func WithInitialReadInterval(interval time.Duration) Option { + return func(o *opts) { + o.initialReadInterval = interval + } +} + +// WithFileSizeLimit sets the soft file size limit, with the hard limit being +// 10x (see HardLimitMultiplier) of the set soft limit. +// +// This is completely ignored when DirParser is used. +// +// If the soft limit is violated, +// the violation will be reported via slog at error level and prometheus +// counter of limitopen_softlimit_violation_total, +// but it does not stop the normal parsing process. +// +// If the hard limit is violated, +// The loading of the file will fail immediately. +// +// Default to DefaultMaxFileSize. +func WithFileSizeLimit(limit int64) Option { + return func(o *opts) { + o.fileSizeLimit = limit + } +} + +func defaultOptions() Option { + return WithOptions( + WithFSEventsDelay(DefaultFSEventsDelay), + WithPollingInterval(DefaultPollingInterval), + WithInitialReadInterval(DefaultInitialReadInterval), + WithFileSizeLimit(DefaultMaxFileSize), + ) +} + +// New creates a new FileWatcher. +// +// If the path is not available at the time of calling, +// it blocks until the file becomes available, or context is cancelled, +// whichever comes first. +func New[T any](ctx context.Context, path string, parser Parser[T], options ...Option) (*Result[T], error) { + var opt opts + WithOptions( + defaultOptions(), + WithOptions(options...), + )(&opt) + hardLimit := opt.fileSizeLimit * HardLimitMultiplier + + var data T + var mtime time.Time + var files []string + + var lastErr error + for { + select { + default: + case <-ctx.Done(): + return nil, fmt.Errorf( + "filewatcher: context canceled while waiting for file(s) under %q to load: %w, last err: %w", + path, + ctx.Err(), + lastErr, + ) + } + + var err error + data, mtime, files, err = openAndParse(path, parser, opt.fileSizeLimit, hardLimit) + if errors.Is(err, fs.ErrNotExist) { + lastErr = err + time.Sleep(opt.initialReadInterval) + continue + } + if err != nil { + return nil, err + } + break + } + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + for _, path := range files { + if err := watcher.Add(path); err != nil { + return nil, fmt.Errorf( + "filewatcher: failed to watch %q: %w", + path, + err, + ) + } + } + + res := new(Result[T]) + res.data.Store(&dataAt[T]{ + data: data, + mtime: mtime, + }) + res.ctx, res.cancel = context.WithCancel(context.Background()) + + go res.watcherLoop( + watcher, + path, + parser, + opt.fileSizeLimit, + hardLimit, + opt.pollingInterval, + opt.fsEventsDelay, + ) + + return res, nil +} diff --git a/filewatcher/v2/filewatcher_test.go b/filewatcher/v2/filewatcher_test.go new file mode 100644 index 000000000..03a31c59a --- /dev/null +++ b/filewatcher/v2/filewatcher_test.go @@ -0,0 +1,524 @@ +package filewatcher_test + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "io/fs" + "log/slog" + "os" + "path/filepath" + "sync/atomic" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + "github.com/reddit/baseplate.go/filewatcher/v2" +) + +const fsEventsDelayForTests = 10 * time.Millisecond + +func parser(f io.Reader) ([]byte, error) { + return io.ReadAll(f) +} + +// Fails the test when called with level >= error +type failSlogHandler struct { + slog.Handler + + tb testing.TB +} + +func (fsh failSlogHandler) Handle(ctx context.Context, r slog.Record) error { + if r.Level >= slog.LevelError { + fsh.tb.Errorf("slog called at %v level with: %q", r.Level, r.Message) + } else { + fsh.tb.Logf("slog called at %v level with: %q", r.Level, r.Message) + } + return nil +} + +// Counts the number of calls +type countingSlogHandler struct { + slog.Handler + + tb testing.TB + count atomic.Int64 +} + +func (csh *countingSlogHandler) Handle(ctx context.Context, r slog.Record) error { + csh.count.Add(1) + if csh.tb != nil { + csh.tb.Logf("slog called at %v level with: %q", r.Level, r.Message) + return nil + } + return csh.Handler.Handle(ctx, r) +} + +func swapSlog(tb testing.TB, logger *slog.Logger) { + backupLogger := slog.Default() + tb.Cleanup(func() { + slog.SetDefault(backupLogger) + }) + slog.SetDefault(logger) +} + +// writeFile does atomic write/overwrite (write to a tmp file, then use rename +// to overwrite the desired path) instead of in-pleace write/overwrite +// (open/truncate open the file, write to it, close the file). +// +// filewatcher is designed to handle atomic writes/overwrites, not in-place +// ones. Doing in-place write will cause the filewatcher to be triggered twice +// (once the file is created/truncated, once when closing the file), which would +// cause some of the tests to fail flakily on CI. +func writeFile(tb testing.TB, path string, content []byte) { + tb.Helper() + + tmpPath := filepath.Join(tb.TempDir(), "file") + if err := os.WriteFile(tmpPath, content, 0644); err != nil { + tb.Fatalf("Unable to write file: %v", err) + } + if err := os.Rename(tmpPath, path); err != nil { + tb.Fatalf("Unable to rename file: %v", err) + } +} + +func compareBytesData(t *testing.T, data []byte, expected []byte) { + t.Helper() + + if string(data) != string(expected) { + t.Errorf("*data expected to be %q, got %q", expected, data) + } +} + +func TestFileWatcher(t *testing.T) { + for _, c := range []struct { + label string + interval time.Duration + }{ + { + label: "with-polling", + interval: filewatcher.DefaultPollingInterval, + }, + { + label: "no-polling", + interval: -1, + }, + } { + t.Run(c.label, func(t *testing.T) { + swapSlog(t, slog.New(failSlogHandler{ + tb: t, + Handler: slog.Default().Handler(), + })) + + interval := fsEventsDelayForTests + writeDelay := interval * 10 + timeout := writeDelay * 20 + + payload1 := []byte("Hello, world!") + payload2 := []byte("Bye, world!") + payload3 := []byte("Hello, world, again!") + + dir := t.TempDir() + path := filepath.Join(dir, "foo") + + // Delay writing the file + go func() { + time.Sleep(writeDelay) + writeFile(t, path, payload1) + }() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + t.Cleanup(cancel) + data, err := filewatcher.New( + ctx, + path, + parser, + filewatcher.WithPollingInterval(c.interval), + filewatcher.WithFSEventsDelay(fsEventsDelayForTests), + filewatcher.WithInitialReadInterval(interval), + ) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + data.Close() + }) + compareBytesData(t, data.Get(), payload1) + + writeFile(t, path, payload2) + // Give it some time to handle the file content change + time.Sleep(500 * time.Millisecond) + compareBytesData(t, data.Get(), payload2) + + writeFile(t, path, payload3) + // Give it some time to handle the file content change + time.Sleep(500 * time.Millisecond) + compareBytesData(t, data.Get(), payload3) + }) + } +} + +func TestFileWatcherTimeout(t *testing.T) { + swapSlog(t, slog.New(failSlogHandler{ + tb: t, + Handler: slog.Default().Handler(), + })) + + interval := fsEventsDelayForTests + round := interval * 20 + timeout := round * 4 + + dir := t.TempDir() + path := filepath.Join(dir, "foo") + + before := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + t.Cleanup(cancel) + data, err := filewatcher.New( + ctx, + path, + parser, + filewatcher.WithFSEventsDelay(fsEventsDelayForTests), + filewatcher.WithInitialReadInterval(interval), + ) + if err == nil { + t.Error("Expected context cancellation error, got nil.") + t.Cleanup(func() { + data.Close() + }) + } + duration := time.Since(before) + if duration.Round(round) > timeout.Round(round) { + t.Errorf("Timeout took %v instead of %v", duration, timeout) + } else { + t.Logf("Timeout took %v, set at %v", duration, timeout) + } +} + +func TestFileWatcherRename(t *testing.T) { + swapSlog(t, slog.New(failSlogHandler{ + tb: t, + Handler: slog.Default().Handler(), + })) + + interval := fsEventsDelayForTests + writeDelay := interval * 10 + timeout := writeDelay * 20 + + payload1 := []byte("Hello, world!") + payload2 := []byte("Bye, world!") + + dir := t.TempDir() + path := filepath.Join(dir, "foo") + + // Delay writing the file + go func() { + time.Sleep(writeDelay) + writeFile(t, path, payload1) + }() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + t.Cleanup(cancel) + data, err := filewatcher.New( + ctx, + path, + parser, + filewatcher.WithPollingInterval(writeDelay), + filewatcher.WithFSEventsDelay(fsEventsDelayForTests), + filewatcher.WithInitialReadInterval(interval), + ) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + data.Close() + }) + compareBytesData(t, data.Get(), payload1) + + func() { + newpath := path + ".bar" + writeFile(t, newpath, payload2) + if err := os.Rename(newpath, path); err != nil { + t.Fatal(err) + } + }() + // Give it some time to handle the file content change + time.Sleep(writeDelay * 10) + compareBytesData(t, data.Get(), payload2) +} + +func TestParserFailure(t *testing.T) { + counter := countingSlogHandler{ + Handler: slog.Default().Handler(), + tb: t, + } + swapSlog(t, slog.New(&counter)) + + errParser := errors.New("parser failed") + var n atomic.Int64 + parser := func(io.Reader) (int64, error) { + // This parser implementation fails every other call + value := n.Add(1) + if value%2 == 0 { + return 0, errParser + } + return value, nil + } + + dir := t.TempDir() + path := filepath.Join(dir, "foo") + + writeFile(t, path, nil) + + // Initial call to parser should return 1, nil + data, err := filewatcher.New( + context.Background(), + path, + parser, + filewatcher.WithPollingInterval(-1), // disable polling as we need exact numbers of parser calls in this test + filewatcher.WithFSEventsDelay(fsEventsDelayForTests), + ) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + data.Close() + }) + expected := int64(1) + if value := data.Get(); value != expected { + t.Errorf("data.Get() expected %d, got %d", expected, value) + } + + // Next call to parser should return nil, err + newpath := path + ".bar" + writeFile(t, newpath, nil) + if err := os.Rename(newpath, path); err != nil { + t.Fatal(err) + } + // Give it some time to handle the file content change + time.Sleep(500 * time.Millisecond) + if counter.count.Load() == 0 { + t.Error("Expected logger being called") + } + if value := data.Get(); value != expected { + t.Errorf("data.Get() expected %d, got %d", expected, value) + } + + // Next call to parser should return 3, nil + writeFile(t, newpath, nil) + if err := os.Rename(newpath, path); err != nil { + t.Fatal(err) + } + // Give it some time to handle the file content change + time.Sleep(500 * time.Millisecond) + if got, want := data.Get(), int64(3); got != want { + t.Errorf("data.Get() got %d, want %d", got, want) + } +} + +func updateDirWithContents(tb testing.TB, dst string, contents map[string]string) { + tb.Helper() + + root := tb.TempDir() + dir := filepath.Join(root, "dir") + + if err := os.Mkdir(dir, 0777); err != nil { + tb.Fatalf("Failed to create directory %q: %v", dir, err) + } + for p, content := range contents { + path := filepath.Join(dir, p) + parent := filepath.Dir(path) + if err := os.Mkdir(parent, 0777); err != nil && !errors.Is(err, fs.ErrExist) { + tb.Fatalf("Failed to create directory %q for %q: %v", parent, path, err) + } + if err := os.WriteFile(path, []byte(content), 0666); err != nil { + tb.Fatalf("Failed to write file %q: %v", path, err) + } + } + if err := os.RemoveAll(dst); err != nil && !errors.Is(err, fs.ErrNotExist) { + tb.Fatalf("Failed to remove %q: %v", dst, err) + } + if err := os.Rename(dir, dst); err != nil { + tb.Fatalf("Failed to rename from %q to %q: %v", dir, dst, err) + } +} + +func TestFileWatcherDir(t *testing.T) { + swapSlog(t, slog.New(failSlogHandler{ + tb: t, + Handler: slog.Default().Handler(), + })) + + root := t.TempDir() + dir := filepath.Join(root, "dir") + if err := os.Mkdir(dir, 0777); err != nil { + t.Fatalf("Failed to create directory %q: %v", dir, err) + } + var parserCalled atomic.Int64 + parser := filewatcher.WrapDirParser(func(dir fs.FS) (map[string]string, error) { + parserCalled.Add(1) + m := make(map[string]string) + if err := fs.WalkDir(dir, ".", func(path string, de fs.DirEntry, err error) error { + if err != nil { + return nil // skip to the next file + } + if de.IsDir() { + return nil + } + f, err := dir.Open(path) + if err != nil { + return fmt.Errorf("failed to open %q: %w", path, err) + } + defer f.Close() + content, err := io.ReadAll(f) + if err != nil { + return fmt.Errorf("failed to read %q: %w", path, err) + } + m[path] = string(content) + return nil + }); err != nil { + return nil, err + } + return m, nil + }) + + content1 := map[string]string{ + "foo": "hello, world!", + "bar/fizz": "bye, world!", + } + updateDirWithContents(t, dir, content1) + data, err := filewatcher.New( + context.Background(), + dir, + parser, + filewatcher.WithPollingInterval(-1), // disable polling + filewatcher.WithFSEventsDelay(fsEventsDelayForTests), + ) + if err != nil { + t.Fatalf("Failed to create filewatcher: %v", err) + } + t.Cleanup(func() { + data.Close() + }) + + if diff := cmp.Diff(data.Get(), content1); diff != "" { + t.Errorf("unexpected result (-got, +want):\n%s", diff) + } + if got, want := parserCalled.Load(), int64(1); got != want { + t.Errorf("Got %d parser called, want %d", got, want) + } + + content2 := map[string]string{ + "foo/buzz": "hello, world!", + "bar": "bye, world!", + } + updateDirWithContents(t, dir, content2) + time.Sleep(fsEventsDelayForTests * 5) + if diff := cmp.Diff(data.Get(), content2); diff != "" { + t.Errorf("unexpected result (-got, +want):\n%s", diff) + } + if got, want := parserCalled.Load(), int64(2); got != want { + t.Errorf("Got %d parser called, want %d", got, want) + } + + content3 := map[string]string{ + "foo": "hello, world!", + "bar": "bye, world!", + } + updateDirWithContents(t, dir, content3) + time.Sleep(fsEventsDelayForTests * 5) + if diff := cmp.Diff(data.Get(), content3); diff != "" { + t.Errorf("unexpected result (-got, +want):\n%s", diff) + } + if got, want := parserCalled.Load(), int64(3); got != want { + t.Errorf("Got %d parser called, want %d", got, want) + } +} + +func limitedParser(t *testing.T, expectedSize int64) filewatcher.Parser[[]byte] { + return func(f io.Reader) ([]byte, error) { + var buf bytes.Buffer + size, err := io.Copy(&buf, f) + if err != nil { + t.Error(err) + return nil, err + } + if size != expectedSize { + t.Errorf( + "Expected size of %d, got %d, data %q", + expectedSize, + size, + buf.Bytes(), + ) + } + return buf.Bytes(), nil + } +} + +func TestParserSizeLimit(t *testing.T) { + counter := countingSlogHandler{ + Handler: slog.Default().Handler(), + tb: t, + } + swapSlog(t, slog.New(&counter)) + + interval := fsEventsDelayForTests + writeDelay := interval * 10 + timeout := writeDelay * 20 + + const ( + content1 = "Hello, world!" + content2 = "Bye bye, world!" + limit = int64(len(content1)) + ) + payload1 := bytes.Repeat([]byte(content1), filewatcher.HardLimitMultiplier) + size := int64(len(payload1)) + expectedPayload := payload1 + payload2 := bytes.Repeat([]byte(content2), filewatcher.HardLimitMultiplier) + + dir := t.TempDir() + path := filepath.Join(dir, "foo") + + // Delay writing the file + go func() { + time.Sleep(writeDelay) + writeFile(t, path, payload1) + }() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + t.Cleanup(cancel) + data, err := filewatcher.New( + ctx, + path, + limitedParser(t, size), + filewatcher.WithFileSizeLimit(limit), + filewatcher.WithPollingInterval(writeDelay), + filewatcher.WithFSEventsDelay(fsEventsDelayForTests), + filewatcher.WithInitialReadInterval(interval), + ) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + data.Close() + }) + compareBytesData(t, data.Get(), expectedPayload) + + writeFile(t, path, payload2) + // Give it some time to handle the file content change + time.Sleep(writeDelay * 10) + // We expect the second parse would fail because of the data is beyond the + // hard limit, so the data should still be expectedPayload + compareBytesData(t, data.Get(), expectedPayload) + // Since we expect the second parse would fail, we also expect the logger to + // be called at least once. + // The logger could be called twice because of reload triggered by polling. + const expectedCalledMin = 1 + if called := counter.count.Load(); called < expectedCalledMin { + t.Errorf("Expected log.Wrapper to be called at least %d times, actual %d", expectedCalledMin, called) + } +} diff --git a/filewatcher/v2/fwtest/doc.go b/filewatcher/v2/fwtest/doc.go new file mode 100644 index 000000000..53d567a65 --- /dev/null +++ b/filewatcher/v2/fwtest/doc.go @@ -0,0 +1,2 @@ +// Package fwtest provides test helpers for filewatcher v2. +package fwtest diff --git a/filewatcher/v2/fwtest/fake.go b/filewatcher/v2/fwtest/fake.go new file mode 100644 index 000000000..c3276e7b9 --- /dev/null +++ b/filewatcher/v2/fwtest/fake.go @@ -0,0 +1,95 @@ +package fwtest + +import ( + "io" + "io/fs" + "sync/atomic" + + "github.com/reddit/baseplate.go/filewatcher/v2" +) + +// FakeFileWatcher is an implementation of FileWatcher that does not actually +// read from a file, it simply returns the data given to it when it was +// initialized with NewFakeFilewatcher. It provides an additional Update method +// that allows you to update this data after it has been created. +type FakeFileWatcher[T any] struct { + data atomic.Value // type: T + parser filewatcher.Parser[T] +} + +// NewFakeFilewatcher returns a pointer to a new FakeFileWatcher object +// initialized with the given io.Reader and Parser. +func NewFakeFilewatcher[T any](r io.Reader, parser filewatcher.Parser[T]) (*FakeFileWatcher[T], error) { + fw := &FakeFileWatcher[T]{parser: parser} + if err := fw.Update(r); err != nil { + return nil, err + } + return fw, nil +} + +// Update updates the data of the FakeFileWatcher using the given io.Reader and +// the Parser used to initialize the FileWatcher. +// +// This method is not threadsafe. +func (fw *FakeFileWatcher[T]) Update(r io.Reader) error { + data, err := fw.parser(r) + if err != nil { + return err + } + fw.data.Store(data) + return nil +} + +// Get returns the parsed data. +func (fw *FakeFileWatcher[T]) Get() T { + return fw.data.Load().(T) +} + +// Close is a no-op. +func (fw *FakeFileWatcher[T]) Close() error { + return nil +} + +// FakeDirWatcher is an implementation of FileWatcher for testing with watching +// directories. +type FakeDirWatcher[T any] struct { + data atomic.Value // type: T + parser filewatcher.DirParser[T] +} + +// NewFakeDirWatcher creates a FakeDirWatcher with the initial data and the +// given DirParser. +// +// It provides Update function to update the data after it's been created. +func NewFakeDirWatcher[T any](dir fs.FS, parser filewatcher.DirParser[T]) (*FakeDirWatcher[T], error) { + dw := &FakeDirWatcher[T]{parser: parser} + if err := dw.Update(dir); err != nil { + return nil, err + } + return dw, nil +} + +// Update updates the data stored in this FakeDirWatcher. +func (dw *FakeDirWatcher[T]) Update(dir fs.FS) error { + data, err := dw.parser(dir) + if err != nil { + return err + } + dw.data.Store(data) + return nil +} + +// Get implements FileWatcher by returning the last updated data. +func (dw *FakeDirWatcher[T]) Get() T { + return dw.data.Load().(T) +} + +// Close is a no-op. +func (dw *FakeDirWatcher[T]) Close() error { + return nil +} + +var ( + _ filewatcher.FileWatcher[any] = (*FakeFileWatcher[any])(nil) + _ filewatcher.FileWatcher[any] = (*FakeDirWatcher[any])(nil) +) diff --git a/filewatcher/v2/fwtest/fake_test.go b/filewatcher/v2/fwtest/fake_test.go new file mode 100644 index 000000000..b2822976c --- /dev/null +++ b/filewatcher/v2/fwtest/fake_test.go @@ -0,0 +1,98 @@ +package fwtest_test + +import ( + "bytes" + "errors" + "io" + "strings" + "testing" + + "github.com/reddit/baseplate.go/filewatcher/v2/fwtest" +) + +func TestFakeFileWatcher(t *testing.T) { + t.Parallel() + + const ( + foo = "foo" + bar = "bar" + ) + + r := strings.NewReader(foo) + fw, err := fwtest.NewFakeFilewatcher(r, func(r io.Reader) (string, error) { + var buf bytes.Buffer + _, err := io.Copy(&buf, r) + if err != nil { + return "", err + } + return buf.String(), nil + }) + if err != nil { + t.Fatal(err) + } + + t.Run( + "get", + func(t *testing.T) { + data := fw.Get() + if strings.Compare(data, foo) != 0 { + t.Fatalf("%q does not match %q", data, foo) + } + }, + ) + + t.Run( + "update", + func(t *testing.T) { + if err := fw.Update(strings.NewReader(bar)); err != nil { + t.Fatal(err) + } + + data := fw.Get() + if strings.Compare(data, bar) != 0 { + t.Fatalf("%q does not match %q", data, foo) + } + }, + ) + + t.Run( + "errors", + func(t *testing.T) { + t.Run( + "NewFakeFilewatcher", + func(t *testing.T) { + if _, err := fwtest.NewFakeFilewatcher(r, func(r io.Reader) (string, error) { + return "", errors.New("test") + }); err == nil { + t.Fatal("expected an error, got nil") + } + }, + ) + + t.Run( + "update", + func(t *testing.T) { + fw, err := fwtest.NewFakeFilewatcher(r, func(r io.Reader) (string, error) { + var buf bytes.Buffer + _, err := io.Copy(&buf, r) + if err != nil { + return "", err + } + data := buf.String() + if strings.Compare(data, bar) == 0 { + return "", errors.New("test") + } + return data, nil + }) + if err != nil { + t.Fatal(err) + } + + if err := fw.Update(strings.NewReader(bar)); err == nil { + t.Fatal("expected an error, got nil") + } + }, + ) + }, + ) +} diff --git a/internal/limitopen/limitopen.go b/internal/limitopen/limitopen.go index 2a4f36ac2..71e873e92 100644 --- a/internal/limitopen/limitopen.go +++ b/internal/limitopen/limitopen.go @@ -1,10 +1,10 @@ package limitopen import ( - "context" "fmt" "io" "io/fs" + "log/slog" "os" "path/filepath" @@ -12,7 +12,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/reddit/baseplate.go/internal/prometheusbpint" - "github.com/reddit/baseplate.go/log" ) const ( @@ -91,7 +90,7 @@ type readCloser struct { // It always reports the size of the path as a prometheus gauge of // "limitopen_file_size_bytes". // When softLimit > 0 and the size of the path as reported by the os is larger, -// it will also use log.DefaultWrapper to report it and increase prometheus +// it will also use slog at error level to report it and increase prometheus // counter of limitopen_softlimit_violation_total. // When hardLimit > 0 and the size of the path as reported by the os is larger, // it will close the file and return an error directly. @@ -108,13 +107,12 @@ func OpenWithLimit(path string, softLimit, hardLimit int64) (io.ReadCloser, erro sizeGauge.With(labels).Set(float64(size)) if softLimit > 0 && size > softLimit { - msg := fmt.Sprintf( - "limitopen.OpenWithLimit: file size > soft limit, path=%q size=%d limit=%d", - path, - size, - softLimit, + slog.Error( + "limitopen.OpenWithLimit: file size > soft limit", + "path", path, + "size", size, + "limit", softLimit, ) - log.DefaultWrapper.Log(context.Background(), msg) softLimitCounter.With(labels).Inc() }