Skip to content

Commit

Permalink
snapshot_backup: deny executing tidb-lightning import while running s…
Browse files Browse the repository at this point in the history
…napshot_backup (pingcap#47001) (pingcap#47342)

ref pingcap#46850
  • Loading branch information
ti-chi-bot authored Oct 16, 2023
1 parent c14b794 commit a56af1b
Show file tree
Hide file tree
Showing 15 changed files with 518 additions and 41 deletions.
8 changes: 4 additions & 4 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4194,8 +4194,8 @@ def go_deps():
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sum = "h1:dO0YtekJ9IKiO9dXMZz0rNHPBiGCo+c37RUsnshoLwo=",
version = "v0.0.0-20230519121736-d15a686a670e",
sum = "h1:pKIvLrqEmTMw+J8wwdv4r2QTp+cjW9giiOIyaRy9mLM=",
version = "v0.0.0-20230905092614-113cdedbebb6",
)
go_repository(
name = "com_github_timakin_bodyclose",
Expand Down Expand Up @@ -6193,8 +6193,8 @@ def go_deps():
name = "org_uber_go_automaxprocs",
build_file_proto_mode = "disable_global",
importpath = "go.uber.org/automaxprocs",
sum = "h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=",
version = "v1.5.3",
sum = "h1:CpDZl6aOlLhReez+8S3eEotD7Jx0Os++lemPlMULQP0=",
version = "v1.4.0",
)
go_repository(
name = "org_uber_go_dig",
Expand Down
2 changes: 1 addition & 1 deletion br/cmd/br/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func main() {
NewBackupCommand(),
NewRestoreCommand(),
NewStreamCommand(),
newOpeartorCommand(),
newOperatorCommand(),
)
// Outputs cmd.Print to stdout.
rootCmd.SetOut(os.Stdout)
Expand Down
19 changes: 12 additions & 7 deletions br/cmd/br/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/spf13/cobra"
)

func newOpeartorCommand() *cobra.Command {
func newOperatorCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "operator <subcommand>",
Short: "utilities for operators like tidb-operator.",
Expand All @@ -26,24 +26,29 @@ func newOpeartorCommand() *cobra.Command {
},
Hidden: true,
}
cmd.AddCommand(newPauseGcCommand())
cmd.AddCommand(newPrepareForSnapshotBackupCommand(
"pause-gc-and-schedulers",
"(Will be replaced with `prepare-for-snapshot-backup`) pause gc, schedulers and importing until the program exits."))
cmd.AddCommand(newPrepareForSnapshotBackupCommand(
"prepare-for-snapshot-backup",
"pause gc, schedulers and importing until the program exits, for snapshot backup."))
return cmd
}

func newPauseGcCommand() *cobra.Command {
func newPrepareForSnapshotBackupCommand(use string, short string) *cobra.Command {
cmd := &cobra.Command{
Use: "pause-gc-and-schedulers",
Short: "pause gc and schedulers to the ts until the program exits.",
Use: use,
Short: short,
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.PauseGcConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.PauseGCAndScheduler(ctx, &cfg)
return operator.AdaptEnvForSnapshotBackup(ctx, &cfg)
},
}
operator.DefineFlagsForPauseGcConfig(cmd.Flags())
operator.DefineFlagsForPrepareSnapBackup(cmd.Flags())
return cmd
}
2 changes: 2 additions & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,6 @@ var (
ErrKVDownloadFailed = errors.Normalize("download sst failed", errors.RFCCodeText("BR:KV:ErrKVDownloadFailed"))
// ErrKVIngestFailed indicates a generic, retryable ingest error.
ErrKVIngestFailed = errors.Normalize("ingest sst failed", errors.RFCCodeText("BR:KV:ErrKVIngestFailed"))

ErrPossibleInconsistency = errors.Normalize("the cluster state might be inconsistent", errors.RFCCodeText("BR:KV:ErrPossibleInconsistency"))
)
18 changes: 18 additions & 0 deletions br/pkg/task/backup_ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/conn/util"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -144,6 +145,16 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error {
if e != nil {
return errors.Trace(err)
}
denyLightning := utils.NewSuspendImporting("backup_ebs_command", mgr.StoreManager)
_, err := denyLightning.DenyAllStores(ctx, utils.DefaultBRGCSafePointTTL)
if err != nil {
return errors.Annotate(err, "lightning from running")
}
go func() {
if err := denyLightning.Keeper(ctx, utils.DefaultBRGCSafePointTTL); err != nil {
log.Warn("cannot keep deny importing, the backup archive may not be useable if there were importing.", logutil.ShortError(err))
}
}()
defer func() {
if ctx.Err() != nil {
log.Warn("context canceled, doing clean work with background context")
Expand All @@ -155,6 +166,13 @@ func RunBackupEBS(c context.Context, g glue.Glue, cfg *BackupConfig) error {
if restoreE := restoreFunc(ctx); restoreE != nil {
log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE))
}
res, err := denyLightning.AllowAllStores(ctx)
if err != nil {
log.Warn("failed to restore importing, you may need to wait until you are able to start importing", zap.Duration("wait_for", utils.DefaultBRGCSafePointTTL))
}
if err := denyLightning.ConsistentWithPrev(res); err != nil {
log.Warn("lightning hasn't been denied, the backup archive may not be usable.", logutil.ShortError(err))
}
}()
}

Expand Down
2 changes: 2 additions & 0 deletions br/pkg/task/operator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ go_library(
"//br/pkg/pdutil",
"//br/pkg/task",
"//br/pkg/utils",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@com_github_spf13_pflag//:pflag",
"@org_golang_google_grpc//keepalive",
"@org_golang_x_sync//errgroup",
"@org_uber_go_zap//:zap",
],
Expand Down
124 changes: 104 additions & 20 deletions br/pkg/task/operator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ import (
"context"
"crypto/tls"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/pingcap/tidb/br/pkg/utils"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/keepalive"
)

func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error) {
Expand All @@ -34,59 +37,140 @@ func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error)
return mgr, nil
}

func cleanUpWith(f func(ctx context.Context)) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWith(f func(ctx context.Context)) {
_ = cx.cleanUpWithErr(func(ctx context.Context) error { f(ctx); return nil })
}

func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWithErr(f func(ctx context.Context) error) error {
ctx, cancel := context.WithTimeout(context.Background(), cx.cfg.TTL)
defer cancel()
f(ctx)
return f(ctx)
}

// PauseGCAndScheduler blocks the current goroutine and pause the GC safepoint and remove the scheduler by the config.
type AdaptEnvForSnapshotBackupContext struct {
context.Context

pdMgr *pdutil.PdController
kvMgr *utils.StoreManager
cfg PauseGcConfig

rdGrp sync.WaitGroup
runGrp *errgroup.Group
}

func (cx *AdaptEnvForSnapshotBackupContext) ReadyL(name string, notes ...zap.Field) {
logutil.CL(cx).Info("Stage ready.", append(notes, zap.String("component", name))...)
cx.rdGrp.Done()
}

func hintAllReady() {
// Hacking: some version of operators using the follow two logs to check whether we are ready...
log.Info("Schedulers are paused.")
log.Info("GC is paused.")
log.Info("All ready.")
}

// AdaptEnvForSnapshotBackup blocks the current goroutine and pause the GC safepoint and remove the scheduler by the config.
// This function will block until the context being canceled.
func PauseGCAndScheduler(ctx context.Context, cfg *PauseGcConfig) error {
func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
mgr, err := dialPD(ctx, &cfg.Config)
if err != nil {
return err
return errors.Annotate(err, "failed to dial PD")
}

var tconf *tls.Config
if cfg.TLS.IsEnabled() {
tconf, err = cfg.TLS.ToTLSConfig()
if err != nil {
return errors.Annotate(err, "invalid tls config")
}
}
kvMgr := utils.NewStoreManager(mgr.GetPDClient(), keepalive.ClientParameters{
Time: cfg.Config.GRPCKeepaliveTime,
Timeout: cfg.Config.GRPCKeepaliveTimeout,
}, tconf)
eg, ectx := errgroup.WithContext(ctx)
cx := &AdaptEnvForSnapshotBackupContext{
Context: logutil.ContextWithField(ectx, zap.String("tag", "br_operator")),
pdMgr: mgr,
kvMgr: kvMgr,
cfg: *cfg,
rdGrp: sync.WaitGroup{},
runGrp: eg,
}
cx.rdGrp.Add(3)

eg.Go(func() error { return pauseGCKeeper(ectx, cfg, mgr) })
eg.Go(func() error { return pauseSchedulerKeeper(ectx, mgr) })
eg.Go(func() error { return pauseGCKeeper(cx) })
eg.Go(func() error { return pauseSchedulerKeeper(cx) })
eg.Go(func() error { return pauseImporting(cx) })
go func() {
cx.rdGrp.Wait()
hintAllReady()
}()

return eg.Wait()
}

func pauseGCKeeper(ctx context.Context, cfg *PauseGcConfig, ctl *pdutil.PdController) error {
func pauseImporting(cx *AdaptEnvForSnapshotBackupContext) error {
denyLightning := utils.NewSuspendImporting("prepare_for_snapshot_backup", cx.kvMgr)
if _, err := denyLightning.DenyAllStores(cx, cx.cfg.TTL); err != nil {
return errors.Trace(err)
}
cx.ReadyL("pause_lightning")
cx.runGrp.Go(func() error {
err := denyLightning.Keeper(cx, cx.cfg.TTL)
if errors.Cause(err) != context.Canceled {
logutil.CL(cx).Warn("keeper encounters error.", logutil.ShortError(err))
}
return cx.cleanUpWithErr(func(ctx context.Context) error {
for {
if ctx.Err() != nil {
return errors.Annotate(ctx.Err(), "cleaning up timed out")
}
res, err := denyLightning.AllowAllStores(ctx)
if err != nil {
logutil.CL(ctx).Warn("Failed to restore lightning, will retry.", logutil.ShortError(err))
// Retry for 10 times.
time.Sleep(cx.cfg.TTL / 10)
continue
}
return denyLightning.ConsistentWithPrev(res)
}
})
})
return nil
}

func pauseGCKeeper(ctx *AdaptEnvForSnapshotBackupContext) error {
// Note: should we remove the service safepoint as soon as this exits?
sp := utils.BRServiceSafePoint{
ID: utils.MakeSafePointID(),
TTL: int64(cfg.TTL.Seconds()),
BackupTS: cfg.SafePoint,
TTL: int64(ctx.cfg.TTL.Seconds()),
BackupTS: ctx.cfg.SafePoint,
}
if sp.BackupTS == 0 {
rts, err := ctl.GetMinResolvedTS(ctx)
rts, err := ctx.pdMgr.GetMinResolvedTS(ctx)
if err != nil {
return err
}
log.Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts))
logutil.CL(ctx).Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts))
sp.BackupTS = rts
}
err := utils.StartServiceSafePointKeeper(ctx, ctl.GetPDClient(), sp)
err := utils.StartServiceSafePointKeeper(ctx, ctx.pdMgr.GetPDClient(), sp)
if err != nil {
return err
}
log.Info("GC is paused.", zap.Object("safepoint", sp))
ctx.ReadyL("pause_gc", zap.Object("safepoint", sp))
// Note: in fact we can directly return here.
// But the name `keeper` implies once the function exits,
// the GC should be resume, so let's block here.
<-ctx.Done()
return nil
}

func pauseSchedulerKeeper(ctx context.Context, ctl *pdutil.PdController) error {
undo, err := ctl.RemoveAllPDSchedulers(ctx)
func pauseSchedulerKeeper(ctx *AdaptEnvForSnapshotBackupContext) error {
undo, err := ctx.pdMgr.RemoveAllPDSchedulers(ctx)
if undo != nil {
defer cleanUpWith(func(ctx context.Context) {
defer ctx.cleanUpWith(func(ctx context.Context) {
if err := undo(ctx); err != nil {
log.Warn("failed to restore pd scheduler.", logutil.ShortError(err))
}
Expand All @@ -95,7 +179,7 @@ func pauseSchedulerKeeper(ctx context.Context, ctl *pdutil.PdController) error {
if err != nil {
return err
}
log.Info("Schedulers are paused.")
ctx.ReadyL("pause_scheduler")
// Wait until the context canceled.
// So we can properly do the clean up work.
<-ctx.Done()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/operator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type PauseGcConfig struct {
TTL time.Duration `json:"ttl" yaml:"ttl"`
}

func DefineFlagsForPauseGcConfig(f *pflag.FlagSet) {
func DefineFlagsForPrepareSnapBackup(f *pflag.FlagSet) {
_ = f.DurationP("ttl", "i", 5*time.Minute, "The time-to-live of the safepoint.")
_ = f.Uint64P("safepoint", "t", 0, "The GC safepoint to be kept.")
}
Expand Down
10 changes: 9 additions & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"schema.go",
"sensitive.go",
"store_manager.go",
"suspend_importing.go",
"worker.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/utils",
Expand All @@ -38,6 +39,7 @@ go_library(
"//parser/types",
"//sessionctx",
"//util",
"//util/engine",
"//util/sqlexec",
"@com_github_cheggaaa_pb_v3//:pb",
"@com_github_cznic_mathutil//:mathutil",
Expand All @@ -46,6 +48,7 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_tikv_client_go_v2//oracle",
Expand Down Expand Up @@ -87,10 +90,11 @@ go_test(
"safe_point_test.go",
"schema_test.go",
"sensitive_test.go",
"suspend_importing_test.go",
],
embed = [":utils"],
flaky = True,
shard_count = 29,
shard_count = 33,
deps = [
"//br/pkg/errors",
"//br/pkg/metautil",
Expand All @@ -111,13 +115,17 @@ go_test(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/encryptionpb",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_x_sync//errgroup",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_multierr//:multierr",
],
Expand Down
1 change: 1 addition & 0 deletions br/pkg/utils/store_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (mgr *StoreManager) getGrpcConnLocked(ctx context.Context, storeID uint64)
if addr == "" {
addr = store.GetAddress()
}
log.Info("StoreManager: dialing to store.", zap.String("address", addr), zap.Uint64("store-id", storeID))
conn, err := grpc.DialContext(
ctx,
addr,
Expand Down
Loading

0 comments on commit a56af1b

Please sign in to comment.