Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#51823
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
YuJuncen authored and ti-chi-bot committed Apr 12, 2024
1 parent 2ecc47d commit 02e0781
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 3 deletions.
4 changes: 4 additions & 0 deletions br/pkg/backup/prepare_snap/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ go_test(
timeout = "short",
srcs = ["prepare_test.go"],
flaky = True,
<<<<<<< HEAD
shard_count = 7,
=======
shard_count = 9,
>>>>>>> 411e945da33 (operator: pause scheduler after all connections established (#51823))
deps = [
":prepare_snap",
"//br/pkg/utils",
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/backup/prepare_snap/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type Preparer struct {
RetryBackoff time.Duration
RetryLimit int
LeaseDuration time.Duration

/* Observers. Initialize them before starting.*/
AfterConnectionsEstablished func()
}

func New(env Env) *Preparer {
Expand Down Expand Up @@ -159,6 +162,9 @@ func (p *Preparer) DriveLoopAndWaitPrepare(ctx context.Context) error {
log.Error("failed to prepare connections", logutil.ShortError(err))
return errors.Annotate(err, "failed to prepare connections")
}
if p.AfterConnectionsEstablished != nil {
p.AfterConnectionsEstablished()
}
if err := p.AdvanceState(ctx); err != nil {
log.Error("failed to check the progress of our work", logutil.ShortError(err))
return errors.Annotate(err, "failed to begin step")
Expand Down
69 changes: 69 additions & 0 deletions br/pkg/backup/prepare_snap/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"slices"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -456,3 +457,71 @@ func TestSplitEnv(t *testing.T) {
require.Equal(t, cc.PrepareClient.(*counterClient).send, 1)
require.ElementsMatch(t, cc.PrepareClient.(*counterClient).regions, tinyRequest.Regions)
}
<<<<<<< HEAD
=======

func TestConnectionDelay(t *testing.T) {
req := require.New(t)
pdc := fakeCluster(t, 3, dummyRegions(100)...)
ms := newTestEnv(pdc)
called := 0
delayConn := make(chan struct{})
blocked := make(chan struct{}, 64)
ms.connectDelay = func(i uint64) <-chan struct{} {
called += 1
if called == 2 {
blocked <- struct{}{}
return delayConn
}
return nil
}
ctx := context.Background()
prep := New(ms)
connectionPrepareResult := make(chan error)
go func() {
connectionPrepareResult <- prep.PrepareConnections(ctx)
}()
<-blocked
ms.mu.Lock()
nonNilStore := 0
for id, store := range ms.stores {
// We must not create and lease (i.e. reject admin command from any tikv) here.
if store != nil {
req.True(store.leaseUntil.Before(time.Now()), "%d->%s", id, store.leaseUntil)
nonNilStore += 1
}
}
req.GreaterOrEqual(nonNilStore, 2)
ms.mu.Unlock()
delayConn <- struct{}{}
req.NoError(<-connectionPrepareResult)
}

func TestHooks(t *testing.T) {
req := require.New(t)
pdc := fakeCluster(t, 3, dummyRegions(100)...)
pauseWaitApply := make(chan struct{})
ms := newTestEnv(pdc)
ms.onCreateStore = func(ms *mockStore) {
ms.onWaitApply = func(r *metapb.Region) error {
<-pauseWaitApply
return nil
}
}
adv := New(ms)
connectionsEstablished := new(atomic.Bool)
adv.AfterConnectionsEstablished = func() {
connectionsEstablished.Store(true)
}
errCh := make(chan error, 1)
go func() {
errCh <- adv.DriveLoopAndWaitPrepare(context.Background())
}()
req.Eventually(connectionsEstablished.Load, 1*time.Second, 100*time.Millisecond)
close(pauseWaitApply)
req.NoError(<-errCh)
ms.AssertSafeForBackup(t)
req.NoError(adv.Finalize(context.Background()))
ms.AssertIsNormalMode(t)
}
>>>>>>> 411e945da33 (operator: pause scheduler after all connections established (#51823))
16 changes: 13 additions & 3 deletions br/pkg/task/operator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,15 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
}
defer cx.Close()

initChan := make(chan struct{})
cx.run(func() error { return pauseGCKeeper(cx) })
cx.run(func() error { return pauseSchedulerKeeper(cx) })
cx.run(func() error { return pauseAdminAndWaitApply(cx) })
cx.run(func() error {
log.Info("Pause scheduler waiting all connections established.")
<-initChan
log.Info("Pause scheduler noticed connections established.")
return pauseSchedulerKeeper(cx)
})
cx.run(func() error { return pauseAdminAndWaitApply(cx, initChan) })
go func() {
cx.rdGrp.Wait()
if cfg.OnAllReady != nil {
Expand All @@ -154,7 +160,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
return eg.Wait()
}

func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext) error {
func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext, afterConnectionsEstablished chan<- struct{}) error {
env := preparesnap.CliEnv{
Cache: tikv.NewRegionCache(cx.pdMgr.GetPDClient()),
Mgr: cx.kvMgr,
Expand All @@ -164,6 +170,10 @@ func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext) error {
begin := time.Now()
prep := preparesnap.New(retryEnv)
prep.LeaseDuration = cx.cfg.TTL
prep.AfterConnectionsEstablished = func() {
log.Info("All connections are stablished.")
close(afterConnectionsEstablished)
}

defer cx.cleanUpWith(func(ctx context.Context) {
if err := prep.Finalize(ctx); err != nil {
Expand Down

0 comments on commit 02e0781

Please sign in to comment.