From 576655e70fa6dc60babeeffd3119f166d1820cbd Mon Sep 17 00:00:00 2001
From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com>
Date: Thu, 9 Feb 2023 11:38:00 +0800
Subject: [PATCH] pitr: prevent from restore point to cluster running log
 backup (#40871)

ref pingcap/tidb#40797
---
 br/pkg/backup/push.go                      |  3 +-
 br/pkg/restore/client.go                   |  9 ++--
 br/pkg/task/restore.go                     |  7 +++
 br/pkg/task/stream.go                      | 34 +++++++++++--
 br/tests/br_restore_log_task_enable/run.sh | 56 ++++++++++++++++++++++
 5 files changed, 99 insertions(+), 10 deletions(-)
 create mode 100644 br/tests/br_restore_log_task_enable/run.sh

diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go
index 2ffffe690ffe5..ed929a12895a4 100644
--- a/br/pkg/backup/push.go
+++ b/br/pkg/backup/push.go
@@ -218,10 +218,9 @@ func (push *pushDown) pushBackup(
 					if len(errMsg) <= 0 {
 						errMsg = errPb.Msg
 					}
-					return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s %s",
+					return errors.Annotatef(berrors.ErrKVStorage, "error happen in store %v at %s: %s",
 						store.GetId(),
 						redact.String(store.GetAddress()),
-						req.StorageBackend.String(),
 						errMsg,
 					)
 				}
diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go
index c818c479e1fca..a783fed862d29 100644
--- a/br/pkg/restore/client.go
+++ b/br/pkg/restore/client.go
@@ -2111,15 +2111,16 @@ func (rc *Client) RestoreKVFiles(
 		return errors.Trace(err)
 	})
 
+	if err = eg.Wait(); err != nil {
+		summary.CollectFailureUnit("file", err)
+		log.Error("restore files failed", zap.Error(err))
+	}
+
 	log.Info("total skip files due to table id not matched", zap.Int("count", skipFile))
 	if skipFile > 0 {
 		log.Debug("table id in full backup storage", zap.Any("tables", rules))
 	}
 
-	if err = eg.Wait(); err != nil {
-		summary.CollectFailureUnit("file", err)
-		log.Error("restore files failed", zap.Error(err))
-	}
 	return errors.Trace(err)
 }
 
diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go
index 3993d2fa88543..2a5c58a9febf3 100644
--- a/br/pkg/task/restore.go
+++ b/br/pkg/task/restore.go
@@ -490,13 +490,20 @@ func IsStreamRestore(cmdName string) bool {
 
 // RunRestore starts a restore task inside the current goroutine.
 func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
+	if err := checkTaskExists(c, cfg); err != nil {
+		return errors.Annotate(err, "failed to check task exits")
+	}
+
 	config.UpdateGlobal(func(conf *config.Config) {
 		conf.KeyspaceName = cfg.KeyspaceName
 	})
 	if IsStreamRestore(cmdName) {
 		return RunStreamRestore(c, g, cmdName, cfg)
 	}
+	return runRestore(c, g, cmdName, cfg)
+}
 
+func runRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
 	cfg.Adjust()
 	defer summary.Summary(cmdName)
 	ctx, cancel := context.WithCancel(c)
diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go
index 2ffa7bc7dd9af..40f851048da37 100644
--- a/br/pkg/task/stream.go
+++ b/br/pkg/task/stream.go
@@ -868,8 +868,8 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre
 	return nil
 }
 
-func checkConfigForStatus(cfg *StreamConfig) error {
-	if len(cfg.PD) == 0 {
+func checkConfigForStatus(pd []string) error {
+	if len(pd) == 0 {
 		return errors.Annotatef(berrors.ErrInvalidArgument,
 			"the command needs access to PD, please specify `-u` or `--pd`")
 	}
@@ -919,7 +919,7 @@ func RunStreamStatus(
 		ctx = opentracing.ContextWithSpan(ctx, span1)
 	}
 
-	if err := checkConfigForStatus(cfg); err != nil {
+	if err := checkConfigForStatus(cfg.PD); err != nil {
 		return err
 	}
 	ctl, err := makeStatusController(ctx, cfg, g)
@@ -1034,6 +1034,32 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre
 	return nil
 }
 
+// checkTaskExists checks whether there is a log backup task running.
+// If so, return an error.
+func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error {
+	if err := checkConfigForStatus(cfg.PD); err != nil {
+		return err
+	}
+	etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config)
+	if err != nil {
+		return err
+	}
+	cli := streamhelper.NewMetaDataClient(etcdCLI)
+	defer func() {
+		if err := cli.Close(); err != nil {
+			log.Error("failed to close the etcd client", zap.Error(err))
+		}
+	}()
+	tasks, err := cli.GetAllTasks(ctx)
+	if err != nil {
+		return err
+	}
+	if len(tasks) > 0 {
+		return errors.Errorf("log backup task is running: %s, please stop the task before restore, and after PITR operation finished, create log-backup task again and create a full backup on this cluster", tasks[0].Info.Name)
+	}
+	return nil
+}
+
 // RunStreamRestore restores stream log.
 func RunStreamRestore(
 	c context.Context,
@@ -1095,7 +1121,7 @@ func RunStreamRestore(
 		logStorage := cfg.Config.Storage
 		cfg.Config.Storage = cfg.FullBackupStorage
 		// TiFlash replica is restored to down-stream on 'pitr' currently.
-		if err = RunRestore(ctx, g, FullRestoreCmd, cfg); err != nil {
+		if err = runRestore(ctx, g, FullRestoreCmd, cfg); err != nil {
 			return errors.Trace(err)
 		}
 		cfg.Config.Storage = logStorage
diff --git a/br/tests/br_restore_log_task_enable/run.sh b/br/tests/br_restore_log_task_enable/run.sh
new file mode 100644
index 0000000000000..923f8fe7c2b33
--- /dev/null
+++ b/br/tests/br_restore_log_task_enable/run.sh
@@ -0,0 +1,56 @@
+#!/bin/sh
+#
+# Copyright 2022 PingCAP, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -eux
+DB="$TEST_NAME"
+TABLE="usertable"
+
+# start log task
+run_br log start --task-name 1234 -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR
+
+run_sql "CREATE DATABASE $DB;"
+run_sql "CREATE TABLE $DB.$TABLE (id int);"
+run_sql "INSERT INTO $DB.$TABLE VALUES (1), (2), (3);"
+
+# backup full
+run_br backup full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR
+
+# clean db
+run_sql "DROP DATABASE $DB;"
+
+# restore full (should be failed)
+run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR && exit 1
+
+# restore point (should be failed)
+run_br restore point -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR && exit 1
+
+# pause log task
+run_br log pause --task-name 1234 --pd $PD_ADDR
+
+# restore full (should be failed)
+run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR && exit 1
+
+# restore point (should be failed)
+run_br restore point -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR && exit 1
+
+# stop log task
+run_br log stop --task-name 1234 --pd $PD_ADDR
+
+# restore full (should be success)
+run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR
+
+# clean db
+run_sql "DROP DATABASE $DB"