From 0c711226bc7b1c05bd1426ad1ef0609b4a4a0714 Mon Sep 17 00:00:00 2001 From: heylongdacoder Date: Mon, 2 May 2022 21:19:23 +0800 Subject: [PATCH] reloader: Force trigger reload when config rollbacked Signed-off-by: heylongdacoder --- CHANGELOG.md | 1 + pkg/reloader/reloader.go | 5 +- pkg/reloader/reloader_test.go | 113 ++++++++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8baa7ac1232..5f60ae70c33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed - [#5281](https://github.com/thanos-io/thanos/pull/5281) Blocks: Use correct separators for filesystem paths and object storage paths respectively. - [#5300](https://github.com/thanos-io/thanos/pull/5300) Query: Ignore cache on queries with deduplication off. +- [#5324](https://github.com/thanos-io/thanos/pull/5324) Reloader: Force trigger reload when config rollbacked ### Added diff --git a/pkg/reloader/reloader.go b/pkg/reloader/reloader.go index 6303bba5647..9437907cb8f 100644 --- a/pkg/reloader/reloader.go +++ b/pkg/reloader/reloader.go @@ -96,6 +96,7 @@ type Reloader struct { lastCfgHash []byte lastWatchedDirsHash []byte + forceReload bool reloads prometheus.Counter reloadErrors prometheus.Counter @@ -352,7 +353,7 @@ func (r *Reloader) apply(ctx context.Context) error { watchedDirsHash = h.Sum(nil) } - if bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) { + if !r.forceReload && bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) { // Nothing to do. return nil } @@ -363,11 +364,13 @@ func (r *Reloader) apply(ctx context.Context) error { } r.reloads.Inc() if err := r.triggerReload(ctx); err != nil { + r.forceReload = true r.reloadErrors.Inc() r.lastReloadSuccess.Set(0) return errors.Wrap(err, "trigger reload") } + r.forceReload = false r.lastCfgHash = cfgHash r.lastWatchedDirsHash = watchedDirsHash level.Info(r.logger).Log( diff --git a/pkg/reloader/reloader_test.go b/pkg/reloader/reloader_test.go index 9367ebfe3bb..b469446e0de 100644 --- a/pkg/reloader/reloader_test.go +++ b/pkg/reloader/reloader_test.go @@ -167,6 +167,119 @@ config: testutil.Ok(t, os.Unsetenv("TEST_RELOADER_THANOS_ENV2")) } +func TestReloader_ConfigRollback(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + l, err := net.Listen("tcp", "localhost:0") + testutil.Ok(t, err) + + correctConfig := []byte(` +config: + a: 1 +`) + faultyConfig := []byte(` +faulty_config: + a: 1 +`) + + dir, err := ioutil.TempDir("", "reloader-cfg-test") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + testutil.Ok(t, os.Mkdir(filepath.Join(dir, "in"), os.ModePerm)) + testutil.Ok(t, os.Mkdir(filepath.Join(dir, "out"), os.ModePerm)) + + var ( + input = filepath.Join(dir, "in", "cfg.yaml.tmpl") + output = filepath.Join(dir, "out", "cfg.yaml") + ) + + reloads := &atomic.Value{} + reloads.Store(0) + srv := &http.Server{} + + srv.Handler = http.HandlerFunc(func(resp http.ResponseWriter, r *http.Request) { + f, err := ioutil.ReadFile(output) + testutil.Ok(t, err) + + if string(f) == string(faultyConfig) { + resp.WriteHeader(http.StatusServiceUnavailable) + return + } + + reloads.Store(reloads.Load().(int) + 1) // The only writer. + resp.WriteHeader(http.StatusOK) + }) + go func() { _ = srv.Serve(l) }() + defer func() { testutil.Ok(t, srv.Close()) }() + + reloadURL, err := url.Parse(fmt.Sprintf("http://%s", l.Addr().String())) + testutil.Ok(t, err) + + reloader := New(nil, nil, &Options{ + ReloadURL: reloadURL, + CfgFile: input, + CfgOutputFile: output, + WatchedDirs: nil, + WatchInterval: 10 * time.Second, // 10 seconds to make the reload of faulty config fail quick + RetryInterval: 100 * time.Millisecond, + DelayInterval: 1 * time.Millisecond, + }) + + testutil.Ok(t, ioutil.WriteFile(input, correctConfig, os.ModePerm)) + + rctx, cancel2 := context.WithCancel(ctx) + g := sync.WaitGroup{} + g.Add(1) + go func() { + defer g.Done() + testutil.Ok(t, reloader.Watch(rctx)) + }() + + reloadsSeen := 0 + i := 0 + + for { + select { + case <-ctx.Done(): + t.Fatalf("Timeout with i = %d, reloadsSeen = %d", i, reloadsSeen) + case <-time.After(300 * time.Millisecond): + } + + rel := reloads.Load().(int) + reloadsSeen = rel + + if reloadsSeen == 1 && i == 0 { + // Initial apply seen (without doing nothing). + f, err := ioutil.ReadFile(output) + testutil.Ok(t, err) + testutil.Equals(t, string(correctConfig), string(f)) + + // Change to a faulty config + testutil.Ok(t, ioutil.WriteFile(input, faultyConfig, os.ModePerm)) + i++ + } else if reloadsSeen == 1 && i == 1 { + // Faulty config will trigger a reload, but reload failed + f, err := ioutil.ReadFile(output) + testutil.Ok(t, err) + testutil.Equals(t, string(faultyConfig), string(f)) + + // Rollback config + testutil.Ok(t, ioutil.WriteFile(input, correctConfig, os.ModePerm)) + } else if reloadsSeen == 2 { + // Rollback to previous config should trigger a reload + f, err := ioutil.ReadFile(output) + testutil.Ok(t, err) + testutil.Equals(t, string(correctConfig), string(f)) + + break + } + } + cancel2() + g.Wait() +} + func TestReloader_DirectoriesApply(t *testing.T) { l, err := net.Listen("tcp", "localhost:0") testutil.Ok(t, err)