Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br/restore: use a more relax check for changefeeds when restoring #53121

Merged
merged 9 commits into from
May 15, 2024
Merged
8 changes: 8 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,14 @@ func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
if err != nil {
return errors.Trace(err)
}
if cfg.CheckRequirements {
err := checkIncompatibleChangefeed(ctx, backupMeta.EndVersion, mgr.GetDomain().GetEtcdClient())
log.Info("Checking incompatible TiCDC changefeeds before restoring.",
logutil.ShortError(err), zap.Uint64("restore-ts", backupMeta.EndVersion))
if err != nil {
return errors.Trace(err)
}
}

backupVersion := version.NormalizeBackupVersion(backupMeta.ClusterVersion)
if cfg.CheckRequirements && backupVersion != nil {
Expand Down
19 changes: 10 additions & 9 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,15 +1116,16 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig, etcdCLI *clientv3.
"create log-backup task again and create a full backup on this cluster", tasks[0].Info.Name)
}

// check cdc changefeed
if cfg.CheckRequirements {
nameSet, err := cdcutil.GetCDCChangefeedNameSet(ctx, etcdCLI)
if err != nil {
return err
}
if !nameSet.Empty() {
return errors.Errorf("%splease remove changefeed(s) before restore", nameSet.MessageToUser())
}
return nil
}

func checkIncompatibleChangefeed(ctx context.Context, backupTS uint64, etcdCLI *clientv3.Client) error {
nameSet, err := cdcutil.GetIncompatibleChangefeedsWithSafeTS(ctx, etcdCLI, backupTS)
if err != nil {
return err
}
if !nameSet.Empty() {
return errors.Errorf("%splease remove changefeed(s) before restore", nameSet.MessageToUser())
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion lightning/pkg/importer/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*precheck.CheckResult, e
errorMsg = append(errorMsg, fmt.Sprintf("found PiTR log streaming task(s): %v,", names))
}

nameSet, err := cdcutil.GetCDCChangefeedNameSet(ctx, ci.etcdCli)
nameSet, err := cdcutil.GetRunningChangefeeds(ctx, ci.etcdCli)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/importer/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (*LoadDataController) checkCDCPiTRTasks(ctx context.Context) error {
return exeerrors.ErrLoadDataPreCheckFailed.FastGenByArgs(fmt.Sprintf("found PiTR log streaming task(s): %v,", names))
}

nameSet, err := cdcutil.GetCDCChangefeedNameSet(ctx, cli.GetClient())
nameSet, err := cdcutil.GetRunningChangefeeds(ctx, cli.GetClient())
if err != nil {
return errors.Trace(err)
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/util/cdcutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/util/cdcutil",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/mathutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_log//:log",
"@io_etcd_go_etcd_client_v3//:client",
Expand All @@ -16,10 +17,13 @@ go_library(
go_test(
name = "cdcutil_test",
timeout = "short",
srcs = ["cdc_test.go"],
srcs = [
"cdc_test.go",
"export_for_test.go",
],
embed = [":cdcutil"],
flaky = True,
deps = [
":cdcutil",
"@com_github_stretchr_testify//require",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_tests_v3//integration",
Expand Down
Loading