-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a fs watcher based reloader for PathOrContent #17
base: main
Are you sure you want to change the base?
Changes from all commits
67b6c3d
483a406
0318b76
c6239ee
84b4d91
3ffd730
c5c3a31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,139 @@ | ||||||
// Copyright (c) The EfficientGo Authors. | ||||||
// Licensed under the Apache License 2.0. | ||||||
|
||||||
// Taken from Thanos project. | ||||||
// | ||||||
// Copyright (c) The Thanos Authors. | ||||||
// Licensed under the Apache License 2.0. | ||||||
|
||||||
package extkingpin | ||||||
|
||||||
import ( | ||||||
"context" | ||||||
"fmt" | ||||||
"io/ioutil" | ||||||
"path" | ||||||
"path/filepath" | ||||||
"time" | ||||||
|
||||||
"github.com/fsnotify/fsnotify" | ||||||
"github.com/pkg/errors" | ||||||
) | ||||||
|
||||||
// logger is an interface compatible with go-kit/logger. | ||||||
type logger interface { | ||||||
Log(keyvals ...interface{}) error | ||||||
} | ||||||
|
||||||
// pathOrContent is an interface compatible with PathOrContent. | ||||||
type pathOrContent interface { | ||||||
Content() ([]byte, error) | ||||||
Path() string | ||||||
} | ||||||
|
||||||
// PathContentReloader starts a file watcher that monitors the file indicated by pathOrContent.Path() and runs | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's make a note this can run forever if not cancelled? Also I wonder if it wouldn't be cleaner to make it sync. Otherwise it looks like a function that just leaks things when you are not careful. So either make it sync so others would run it in go routine or return closer? |
||||||
// reloadFunc whenever a change is detected. | ||||||
// A debounce timer can be configured via function args to handle situations where many events that would trigger | ||||||
// a reload are receive in a short period of time. Files will be effectively reloaded at the latest after 2 times | ||||||
// the debounce timer. By default the debouncer timer is 1 second. | ||||||
// To ensure renames and deletes are properly handled, the file watcher is put at the file's parent folder. See | ||||||
// https://github.com/fsnotify/fsnotify/issues/214 for more details. | ||||||
func PathContentReloader(ctx context.Context, fileContent pathOrContent, debugLogger logger, errorLogger logger, reloadFunc func(), debounceTime time.Duration) error { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I think we need verb to tell what's going on 🤔 |
||||||
filePath, err := filepath.Abs(fileContent.Path()) | ||||||
if err != nil { | ||||||
return errors.Wrap(err, "getting absolute file path") | ||||||
} | ||||||
|
||||||
watcher, err := fsnotify.NewWatcher() | ||||||
if filePath == "" { | ||||||
_ = debugLogger.Log("msg", "no path detected for config reload") | ||||||
} | ||||||
if err != nil { | ||||||
return errors.Wrap(err, "creating file watcher") | ||||||
} | ||||||
go func() { | ||||||
var reloadTimer *time.Timer | ||||||
if debounceTime != 0 { | ||||||
reloadTimer = time.AfterFunc(debounceTime, func() { | ||||||
reloadFunc() | ||||||
_ = debugLogger.Log("msg", "configuration reloaded after debouncing") | ||||||
}) | ||||||
} | ||||||
defer watcher.Close() | ||||||
for { | ||||||
select { | ||||||
case <-ctx.Done(): | ||||||
if reloadTimer != nil { | ||||||
reloadTimer.Stop() | ||||||
} | ||||||
return | ||||||
case event := <-watcher.Events: | ||||||
// fsnotify sometimes sends a bunch of events without name or operation. | ||||||
// It's unclear what they are and why they are sent - filter them out. | ||||||
if event.Name == "" { | ||||||
break | ||||||
} | ||||||
// We are watching the file's parent folder (more details on why this is done can be found below), but | ||||||
// we are only interested in changes to the target file. Discard every other file as quickly as possible. | ||||||
if event.Name != filePath { | ||||||
break | ||||||
} | ||||||
// We only react to files being written or created. | ||||||
// On "chmod" or "remove" we have nothing to do. | ||||||
// On "rename" we have the old file name (not useful). A "create" event for the new file will come later. | ||||||
if !event.Op.Has(fsnotify.Write) || !event.Op.Has(fsnotify.Create) { | ||||||
break | ||||||
} | ||||||
_ = debugLogger.Log("msg", fmt.Sprintf("change detected for %s", filePath), "eventName", event.Name, "eventOp", event.Op) | ||||||
if reloadTimer != nil { | ||||||
reloadTimer.Reset(debounceTime) | ||||||
} | ||||||
case err := <-watcher.Errors: | ||||||
_ = errorLogger.Log("msg", "watcher error", "error", err) | ||||||
} | ||||||
} | ||||||
}() | ||||||
// We watch the file's parent folder and not the file itself to better handle DELETE and RENAME events. Check | ||||||
// https://github.com/fsnotify/fsnotify/issues/214 for more details. | ||||||
if err := watcher.Add(path.Dir(filePath)); err != nil { | ||||||
return errors.Wrapf(err, "adding path %s to file watcher", filePath) | ||||||
} | ||||||
return nil | ||||||
} | ||||||
|
||||||
// StaticPathContent serves the contents of a given file through the pathOrContent interface. It's useful for tests | ||||||
// that rely on such interface. | ||||||
type StaticPathContent struct { | ||||||
content []byte | ||||||
path string | ||||||
} | ||||||
|
||||||
var _ pathOrContent = (*StaticPathContent)(nil) | ||||||
|
||||||
// Content returns the static content. | ||||||
func (t *StaticPathContent) Content() ([]byte, error) { | ||||||
return t.content, nil | ||||||
} | ||||||
|
||||||
// Path returns the path to the file that contains the content. | ||||||
func (t *StaticPathContent) Path() string { | ||||||
return t.path | ||||||
} | ||||||
|
||||||
// NewStaticPathContent creates a new content that can be used to serve a static configuration. | ||||||
func NewStaticPathContent(fromPath string) (*StaticPathContent, error) { | ||||||
content, err := ioutil.ReadFile(fromPath) | ||||||
|
||||||
if err != nil { | ||||||
return nil, errors.Wrapf(err, "could not load test content: %s", fromPath) | ||||||
} | ||||||
return &StaticPathContent{content, fromPath}, nil | ||||||
} | ||||||
|
||||||
// Rewrite rewrites the file backing this StaticPathContent and swaps the local content cache. The file writing | ||||||
// is needed to trigger the file system monitor. | ||||||
func (t *StaticPathContent) Rewrite(newContent []byte) error { | ||||||
t.content = newContent | ||||||
// Write the file to ensure possible file watcher reloaders get triggered. | ||||||
return ioutil.WriteFile(t.path, newContent, 0666) | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
// Copyright (c) The EfficientGo Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
// Taken from Thanos project. | ||
// | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package extkingpin | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"io/ioutil" | ||
"os" | ||
"path" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/efficientgo/core/testutil" | ||
) | ||
|
||
func TestPathContentReloader(t *testing.T) { | ||
type args struct { | ||
runSteps func(t *testing.T, testFile string, pathContent *StaticPathContent) | ||
} | ||
tests := []struct { | ||
name string | ||
args args | ||
wantReloads int | ||
}{ | ||
{ | ||
name: "Many operations, only rewrite triggers one reload", | ||
args: args{ | ||
runSteps: func(t *testing.T, testFile string, pathContent *StaticPathContent) { | ||
testutil.Ok(t, os.Chmod(testFile, 0777)) | ||
testutil.Ok(t, os.Remove(testFile)) | ||
testutil.Ok(t, pathContent.Rewrite([]byte("test modified"))) | ||
}, | ||
}, | ||
wantReloads: 1, | ||
}, | ||
{ | ||
name: "Many operations, only rename triggers one reload", | ||
args: args{ | ||
runSteps: func(t *testing.T, testFile string, pathContent *StaticPathContent) { | ||
testutil.Ok(t, os.Chmod(testFile, 0777)) | ||
testutil.Ok(t, os.Rename(testFile, testFile+".tmp")) | ||
testutil.Ok(t, os.Rename(testFile+".tmp", testFile)) | ||
}, | ||
}, | ||
wantReloads: 1, | ||
}, | ||
{ | ||
name: "Many operations, two rewrites trigger two reloads", | ||
args: args{ | ||
runSteps: func(t *testing.T, testFile string, pathContent *StaticPathContent) { | ||
testutil.Ok(t, os.Chmod(testFile, 0777)) | ||
testutil.Ok(t, os.Remove(testFile)) | ||
testutil.Ok(t, pathContent.Rewrite([]byte("test modified"))) | ||
time.Sleep(2 * time.Second) | ||
testutil.Ok(t, pathContent.Rewrite([]byte("test modified again"))) | ||
}, | ||
}, | ||
wantReloads: 1, | ||
}, | ||
{ | ||
name: "Chmod doesn't trigger reload", | ||
args: args{ | ||
runSteps: func(t *testing.T, testFile string, pathContent *StaticPathContent) { | ||
testutil.Ok(t, os.Chmod(testFile, 0777)) | ||
}, | ||
}, | ||
wantReloads: 0, | ||
}, | ||
{ | ||
name: "Remove doesn't trigger reload", | ||
args: args{ | ||
runSteps: func(t *testing.T, testFile string, pathContent *StaticPathContent) { | ||
testutil.Ok(t, os.Remove(testFile)) | ||
}, | ||
}, | ||
wantReloads: 0, | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
testFile := path.Join(t.TempDir(), "test") | ||
testutil.Ok(t, ioutil.WriteFile(testFile, []byte("test"), 0666)) | ||
pathContent, err := NewStaticPathContent(testFile) | ||
testutil.Ok(t, err) | ||
|
||
wg := &sync.WaitGroup{} | ||
wg.Add(tt.wantReloads) | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
reloadCount := 0 | ||
err = PathContentReloader(ctx, pathContent, newTestLogger("debug"), newTestLogger("error"), func() { | ||
reloadCount++ | ||
wg.Done() | ||
}, 100*time.Millisecond) | ||
testutil.Ok(t, err) | ||
|
||
tt.args.runSteps(t, testFile, pathContent) | ||
wg.Wait() | ||
testutil.Equals(t, tt.wantReloads, reloadCount) | ||
}) | ||
} | ||
} | ||
|
||
type testLogger struct { | ||
prefix string | ||
} | ||
|
||
func newTestLogger(prefix string) testLogger { | ||
return testLogger{prefix: prefix} | ||
} | ||
|
||
func (t testLogger) Log(keyvals ...interface{}) error { | ||
_, err := fmt.Printf("[%s] %s", t.prefix, keyvals) | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this?