Skip to content

Commit

Permalink
br/restore: use a more relax check for changefeeds when restoring (#5…
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen authored May 15, 2024
1 parent 4538a21 commit b9b330a
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 106 deletions.
8 changes: 8 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,14 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if err != nil {
return errors.Trace(err)
}
if cfg.CheckRequirements {
err := checkIncompatibleChangefeed(ctx, backupMeta.EndVersion, mgr.GetDomain().GetEtcdClient())
log.Info("Checking incompatible TiCDC changefeeds before restoring.",
logutil.ShortError(err), zap.Uint64("restore-ts", backupMeta.EndVersion))
if err != nil {
return errors.Trace(err)
}
}

backupVersion := version.NormalizeBackupVersion(backupMeta.ClusterVersion)
if cfg.CheckRequirements && backupVersion != nil {
Expand Down
19 changes: 10 additions & 9 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,15 +1116,16 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig, etcdCLI *clientv3.
"create log-backup task again and create a full backup on this cluster", tasks[0].Info.Name)
}

// check cdc changefeed
if cfg.CheckRequirements {
nameSet, err := cdcutil.GetCDCChangefeedNameSet(ctx, etcdCLI)
if err != nil {
return err
}
if !nameSet.Empty() {
return errors.Errorf("%splease remove changefeed(s) before restore", nameSet.MessageToUser())
}
return nil
}

func checkIncompatibleChangefeed(ctx context.Context, backupTS uint64, etcdCLI *clientv3.Client) error {
nameSet, err := cdcutil.GetIncompatibleChangefeedsWithSafeTS(ctx, etcdCLI, backupTS)
if err != nil {
return err
}
if !nameSet.Empty() {
return errors.Errorf("%splease remove changefeed(s) before restore", nameSet.MessageToUser())
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion lightning/pkg/importer/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*precheck.CheckResult, e
errorMsg = append(errorMsg, fmt.Sprintf("found PiTR log streaming task(s): %v,", names))
}

nameSet, err := cdcutil.GetCDCChangefeedNameSet(ctx, ci.etcdCli)
nameSet, err := cdcutil.GetRunningChangefeeds(ctx, ci.etcdCli)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/importer/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (*LoadDataController) checkCDCPiTRTasks(ctx context.Context) error {
return exeerrors.ErrLoadDataPreCheckFailed.FastGenByArgs(fmt.Sprintf("found PiTR log streaming task(s): %v,", names))
}

nameSet, err := cdcutil.GetCDCChangefeedNameSet(ctx, cli.GetClient())
nameSet, err := cdcutil.GetRunningChangefeeds(ctx, cli.GetClient())
if err != nil {
return errors.Trace(err)
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/util/cdcutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/util/cdcutil",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/mathutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@io_etcd_go_etcd_client_v3//:client",
Expand All @@ -16,10 +17,13 @@ go_library(
go_test(
name = "cdcutil_test",
timeout = "short",
srcs = ["cdc_test.go"],
srcs = [
"cdc_test.go",
"export_for_test.go",
],
embed = [":cdcutil"],
flaky = True,
deps = [
":cdcutil",
"@com_github_stretchr_testify//require",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_tests_v3//integration",
Expand Down
Loading

0 comments on commit b9b330a

Please sign in to comment.