From 68933ecba501ce5bea18d6e00690bb4e5c8242d8 Mon Sep 17 00:00:00 2001 From: Yu Juncen Date: Fri, 15 Mar 2024 15:54:39 +0800 Subject: [PATCH] pause scheduler after all connections established Signed-off-by: Yu Juncen --- br/pkg/backup/prepare_snap/BUILD.bazel | 2 +- br/pkg/backup/prepare_snap/prepare.go | 6 +++++ br/pkg/backup/prepare_snap/prepare_test.go | 30 +++++++++++++++++++++- br/pkg/task/operator/cmd.go | 16 +++++++++--- 4 files changed, 49 insertions(+), 5 deletions(-) diff --git a/br/pkg/backup/prepare_snap/BUILD.bazel b/br/pkg/backup/prepare_snap/BUILD.bazel index 65f2772fc3858..f99a723e0a1ad 100644 --- a/br/pkg/backup/prepare_snap/BUILD.bazel +++ b/br/pkg/backup/prepare_snap/BUILD.bazel @@ -35,7 +35,7 @@ go_test( timeout = "short", srcs = ["prepare_test.go"], flaky = True, - shard_count = 8, + shard_count = 9, deps = [ ":prepare_snap", "//br/pkg/utils", diff --git a/br/pkg/backup/prepare_snap/prepare.go b/br/pkg/backup/prepare_snap/prepare.go index e799f8c346444..dfb44300e5bf1 100644 --- a/br/pkg/backup/prepare_snap/prepare.go +++ b/br/pkg/backup/prepare_snap/prepare.go @@ -91,6 +91,9 @@ type Preparer struct { RetryBackoff time.Duration RetryLimit int LeaseDuration time.Duration + + /* Observers. Initialize them before starting.*/ + AfterConnectionsEstablished func() } func New(env Env) *Preparer { @@ -159,6 +162,9 @@ func (p *Preparer) DriveLoopAndWaitPrepare(ctx context.Context) error { log.Error("failed to prepare connections", logutil.ShortError(err)) return errors.Annotate(err, "failed to prepare connections") } + if p.AfterConnectionsEstablished != nil { + p.AfterConnectionsEstablished() + } if err := p.AdvanceState(ctx); err != nil { log.Error("failed to check the progress of our work", logutil.ShortError(err)) return errors.Annotate(err, "failed to begin step") diff --git a/br/pkg/backup/prepare_snap/prepare_test.go b/br/pkg/backup/prepare_snap/prepare_test.go index 5f3d2e28d44dc..cfc5b9c33a479 100644 --- a/br/pkg/backup/prepare_snap/prepare_test.go +++ b/br/pkg/backup/prepare_snap/prepare_test.go @@ -21,6 +21,7 @@ import ( "io" "slices" "sync" + "sync/atomic" "testing" "time" @@ -474,7 +475,6 @@ func TestSplitEnv(t *testing.T) { } func TestConnectionDelay(t *testing.T) { - log.SetLevel(zapcore.DebugLevel) req := require.New(t) pdc := fakeCluster(t, 3, dummyRegions(100)...) ms := newTestEnv(pdc) @@ -510,3 +510,31 @@ func TestConnectionDelay(t *testing.T) { delayConn <- struct{}{} req.NoError(<-connectionPrepareResult) } + +func TestHooks(t *testing.T) { + req := require.New(t) + pdc := fakeCluster(t, 3, dummyRegions(100)...) + pauseWaitApply := make(chan struct{}) + ms := newTestEnv(pdc) + ms.onCreateStore = func(ms *mockStore) { + ms.onWaitApply = func(r *metapb.Region) error { + <-pauseWaitApply + return nil + } + } + adv := New(ms) + connectionsEstablished := new(atomic.Bool) + adv.AfterConnectionsEstablished = func() { + connectionsEstablished.Store(true) + } + errCh := make(chan error, 1) + go func() { + errCh <- adv.DriveLoopAndWaitPrepare(context.Background()) + }() + req.Eventually(connectionsEstablished.Load, 1*time.Second, 100*time.Millisecond) + close(pauseWaitApply) + req.NoError(<-errCh) + ms.AssertSafeForBackup(t) + req.NoError(adv.Finalize(context.Background())) + ms.AssertIsNormalMode(t) +} diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index 2b0157a699036..2c944203c457f 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -134,9 +134,15 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { } defer cx.Close() + initChan := make(chan struct{}) cx.run(func() error { return pauseGCKeeper(cx) }) - cx.run(func() error { return pauseSchedulerKeeper(cx) }) - cx.run(func() error { return pauseAdminAndWaitApply(cx) }) + cx.run(func() error { + log.Info("Pause scheduler waiting all connections established.") + <-initChan + log.Info("Pause scheduler noticed connections established.") + return pauseSchedulerKeeper(cx) + }) + cx.run(func() error { return pauseAdminAndWaitApply(cx, initChan) }) go func() { cx.rdGrp.Wait() if cfg.OnAllReady != nil { @@ -154,7 +160,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { return eg.Wait() } -func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext) error { +func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext, afterConnectionsEstablished chan<- struct{}) error { env := preparesnap.CliEnv{ Cache: tikv.NewRegionCache(cx.pdMgr.GetPDClient()), Mgr: cx.kvMgr, @@ -164,6 +170,10 @@ func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext) error { begin := time.Now() prep := preparesnap.New(retryEnv) prep.LeaseDuration = cx.cfg.TTL + prep.AfterConnectionsEstablished = func() { + log.Info("All connections are stablished.") + close(afterConnectionsEstablished) + } defer cx.cleanUpWith(func(ctx context.Context) { if err := prep.Finalize(ctx); err != nil {