Skip to content

Commit

Permalink
br/operator: fix adapt env for snapshot backup stuck when encountered…
Browse files Browse the repository at this point in the history
… error (pingcap#52607) (pingcap#52969) (pingcap#88)

close pingcap#52846

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
2 people authored and GitHub Enterprise committed May 7, 2024
1 parent a22e2c9 commit 7fc6448
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 12 deletions.
1 change: 1 addition & 0 deletions br/pkg/backup/prepare_snap/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"@com_github_docker_go_units//:go-units",
"@com_github_google_btree//:btree",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/metapb",
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/backup/prepare_snap/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
brpb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -453,6 +454,9 @@ func (p *Preparer) pushWaitApply(reqs pendingRequests, region Region) {
// PrepareConnections prepares the connections for each store.
// This will pause the admin commands for each store.
func (p *Preparer) PrepareConnections(ctx context.Context) error {
failpoint.Inject("PrepareConnectionsErr", func() {
failpoint.Return(errors.New("mock PrepareConnectionsErr"))
})
log.Info("Preparing connections to stores.")
stores, err := p.env.GetAllLiveStores(ctx)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/task/operator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//br/pkg/task",
"//br/pkg/utils",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_spf13_pflag//:pflag",
"@com_github_tikv_client_go_v2//tikv",
Expand Down
21 changes: 9 additions & 12 deletions br/pkg/task/operator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ package operator
import (
"context"
"crypto/tls"
"fmt"
"math/rand"
"os"
"runtime/debug"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
preparesnap "github.com/pingcap/tidb/br/pkg/backup/prepare_snap"
berrors "github.com/pingcap/tidb/br/pkg/errors"
Expand Down Expand Up @@ -140,12 +138,19 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
cx.run(func() error { return pauseGCKeeper(cx) })
cx.run(func() error {
log.Info("Pause scheduler waiting all connections established.")
<-initChan
select {
case <-initChan:
case <-cx.Done():
return cx.Err()
}
log.Info("Pause scheduler noticed connections established.")
return pauseSchedulerKeeper(cx)
})
cx.run(func() error { return pauseAdminAndWaitApply(cx, initChan) })
go func() {
failpoint.Inject("SkipReadyHint", func() {
failpoint.Return()
})
cx.rdGrp.Wait()
if cfg.OnAllReady != nil {
cfg.OnAllReady()
Expand Down Expand Up @@ -194,14 +199,6 @@ func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext, afterConnectio
return nil
}

func getCallerName() string {
name, err := os.Hostname()
if err != nil {
name = fmt.Sprintf("UNKNOWN-%d", rand.Int63())
}
return fmt.Sprintf("operator@%sT%d#%d", name, time.Now().Unix(), os.Getpid())
}

func pauseGCKeeper(cx *AdaptEnvForSnapshotBackupContext) (err error) {
// Note: should we remove the service safepoint as soon as this exits?
sp := utils.BRServiceSafePoint{
Expand Down
31 changes: 31 additions & 0 deletions tests/realtikvtest/brietest/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/br/pkg/task"
Expand Down Expand Up @@ -241,3 +242,33 @@ func TestOperator(t *testing.T) {
verifySchedulerNotStopped(req, cfg)
verifyGCNotStopped(req, cfg)
}

func TestFailure(t *testing.T) {
req := require.New(t)
req.NoError(failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/prepare_snap/PrepareConnectionsErr", "return()"))
// Make goleak happy.
req.NoError(failpoint.Enable("github.com/pingcap/tidb/br/pkg/task/operator/SkipReadyHint", "return()"))
defer func() {
req.NoError(failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/prepare_snap/PrepareConnectionsErr"))
req.NoError(failpoint.Disable("github.com/pingcap/tidb/br/pkg/task/operator/SkipReadyHint"))
}()

cfg := operator.PauseGcConfig{
Config: task.Config{
PD: []string{"127.0.0.1:2379"},
},
TTL: 5 * time.Minute,
SafePoint: oracle.GoTimeToTS(time.Now()),
}

verifyGCNotStopped(req, cfg)
verifySchedulerNotStopped(req, cfg)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := operator.AdaptEnvForSnapshotBackup(ctx, &cfg)
require.Error(t, err)

verifyGCNotStopped(req, cfg)
verifySchedulerNotStopped(req, cfg)
}

0 comments on commit 7fc6448

Please sign in to comment.