From 7248e0544799d96551d63a581f781179a5a854b9 Mon Sep 17 00:00:00 2001
From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com>
Date: Mon, 23 Oct 2023 10:44:00 +0800
Subject: [PATCH] This is an automated cherry-pick of #47740

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
---
 br/pkg/stream/rewrite_meta_rawkv.go           |  70 +++++-----
 br/pkg/stream/util_test.go                    |   4 +
 .../br_pitr/incremental_data/delete_range.sql |  25 ++++
 .../br_pitr/prepare_data/delete_range.sql     | 124 ++++++++++++++++++
 br/tests/br_pitr/run.sh                       | 100 ++++++++++++++
 br/tests/run_group.sh                         |   4 +
 6 files changed, 294 insertions(+), 33 deletions(-)
 create mode 100644 br/tests/br_pitr/incremental_data/delete_range.sql
 create mode 100644 br/tests/br_pitr/prepare_data/delete_range.sql
 create mode 100644 br/tests/br_pitr/run.sh

diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go
index 759549ef07155..a4f41215e6d49 100644
--- a/br/pkg/stream/rewrite_meta_rawkv.go
+++ b/br/pkg/stream/rewrite_meta_rawkv.go
@@ -23,6 +23,7 @@ import (
 	backuppb "github.com/pingcap/kvproto/pkg/brpb"
 	"github.com/pingcap/log"
 	berrors "github.com/pingcap/tidb/br/pkg/errors"
+	"github.com/pingcap/tidb/br/pkg/logutil"
 	"github.com/pingcap/tidb/br/pkg/restore/ingestrec"
 	"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
 	"github.com/pingcap/tidb/kv"
@@ -708,10 +709,11 @@ func (sr *SchemasReplace) restoreFromHistory(job *model.Job, isSubJob bool) erro
 }
 
 func (sr *SchemasReplace) deleteRange(job *model.Job) error {
+	lctx := logutil.ContextWithField(context.Background(), logutil.RedactAny("category", "ddl: rewrite delete range"))
 	dbReplace, exist := sr.DbMap[job.SchemaID]
 	if !exist {
 		// skip this mddljob, the same below
-		log.Debug("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID))
+		logutil.CL(lctx).Warn("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID))
 		return nil
 	}
 
@@ -747,14 +749,14 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 		newTableIDs := make([]int64, 0, len(tableIDs))
 		for tableID, tableReplace := range dbReplace.TableMap {
 			if _, exist := argsSet[tableID]; !exist {
-				log.Debug("DropSchema: record a table, but it doesn't exist in job args",
+				logutil.CL(lctx).Warn("DropSchema: record a table, but it doesn't exist in job args",
 					zap.Int64("oldTableID", tableID))
 				continue
 			}
 			newTableIDs = append(newTableIDs, tableReplace.TableID)
 			for partitionID, newPartitionID := range tableReplace.PartitionMap {
 				if _, exist := argsSet[partitionID]; !exist {
-					log.Debug("DropSchema: record a partition, but it doesn't exist in job args",
+					logutil.CL(lctx).Warn("DropSchema: record a partition, but it doesn't exist in job args",
 						zap.Int64("oldPartitionID", partitionID))
 					continue
 				}
@@ -763,7 +765,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 		}
 
 		if len(newTableIDs) != len(tableIDs) {
-			log.Debug(
+			logutil.CL(lctx).Warn(
 				"DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace")
 			// only drop newTableIDs' ranges
 		}
@@ -777,7 +779,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 	case model.ActionDropTable, model.ActionTruncateTable:
 		tableReplace, exist := dbReplace.TableMap[job.TableID]
 		if !exist {
-			log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID",
+			logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID",
 				zap.Int64("oldTableID", job.TableID))
 			return nil
 		}
@@ -790,18 +792,19 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 			return errors.Trace(err)
 		}
 		if len(physicalTableIDs) > 0 {
-			// delete partition id instead of table id
-			for i := 0; i < len(physicalTableIDs); i++ {
-				newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]]
+			newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs))
+			// delete partition id
+			for _, oldPid := range physicalTableIDs {
+				newPid, exist := tableReplace.PartitionMap[oldPid]
 				if !exist {
-					log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID",
-						zap.Int64("oldPartitionID", physicalTableIDs[i]))
+					logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID",
+						zap.Int64("oldPartitionID", oldPid))
 					continue
 				}
-				physicalTableIDs[i] = newPid
+				newPhysicalTableIDs = append(newPhysicalTableIDs, newPid)
 			}
-			if len(physicalTableIDs) > 0 {
-				sr.insertDeleteRangeForTable(newJobID, physicalTableIDs)
+			if len(newPhysicalTableIDs) > 0 {
+				sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs)
 			}
 			return nil
 		}
@@ -811,7 +814,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 	case model.ActionDropTablePartition, model.ActionTruncateTablePartition:
 		tableReplace, exist := dbReplace.TableMap[job.TableID]
 		if !exist {
-			log.Debug(
+			logutil.CL(lctx).Warn(
 				"DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID",
 				zap.Int64("oldTableID", job.TableID))
 			return nil
@@ -821,18 +824,19 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 			return errors.Trace(err)
 		}
 
-		for i := 0; i < len(physicalTableIDs); i++ {
-			newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]]
+		newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs))
+		for _, oldPid := range physicalTableIDs {
+			newPid, exist := tableReplace.PartitionMap[oldPid]
 			if !exist {
-				log.Debug(
+				logutil.CL(lctx).Warn(
 					"DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID",
-					zap.Int64("oldPartitionID", physicalTableIDs[i]))
+					zap.Int64("oldPartitionID", oldPid))
 				continue
 			}
-			physicalTableIDs[i] = newPid
+			newPhysicalTableIDs = append(newPhysicalTableIDs, newPid)
 		}
-		if len(physicalTableIDs) > 0 {
-			sr.insertDeleteRangeForTable(newJobID, physicalTableIDs)
+		if len(newPhysicalTableIDs) > 0 {
+			sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs)
 		}
 		return nil
 	// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
@@ -840,7 +844,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 		// iff job.State = model.JobStateRollbackDone
 		tableReplace, exist := dbReplace.TableMap[job.TableID]
 		if !exist {
-			log.Debug("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID",
+			logutil.CL(lctx).Warn("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID",
 				zap.Int64("oldTableID", job.TableID))
 			return nil
 		}
@@ -858,7 +862,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 			for _, oldPid := range partitionIDs {
 				newPid, exist := tableReplace.PartitionMap[oldPid]
 				if !exist {
-					log.Debug(
+					logutil.CL(lctx).Warn(
 						"AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID",
 						zap.Int64("oldPartitionID", oldPid))
 					continue
@@ -873,7 +877,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 	case model.ActionDropIndex, model.ActionDropPrimaryKey:
 		tableReplace, exist := dbReplace.TableMap[job.TableID]
 		if !exist {
-			log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
+			logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
 			return nil
 		}
 
@@ -892,7 +896,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 			for _, oldPid := range partitionIDs {
 				newPid, exist := tableReplace.PartitionMap[oldPid]
 				if !exist {
-					log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
+					logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
 					continue
 				}
 				// len(indexIDs) = 1
@@ -915,7 +919,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 
 		tableReplace, exist := dbReplace.TableMap[job.TableID]
 		if !exist {
-			log.Debug("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
+			logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
 			return nil
 		}
 
@@ -924,7 +928,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 			for _, oldPid := range partitionIDs {
 				newPid, exist := tableReplace.PartitionMap[oldPid]
 				if !exist {
-					log.Debug("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
+					logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
 					continue
 				}
 				sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
@@ -944,7 +948,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 		if len(indexIDs) > 0 {
 			tableReplace, exist := dbReplace.TableMap[job.TableID]
 			if !exist {
-				log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
+				logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
 				return nil
 			}
 
@@ -953,7 +957,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 				for _, oldPid := range partitionIDs {
 					newPid, exist := tableReplace.PartitionMap[oldPid]
 					if !exist {
-						log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
+						logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
 						continue
 					}
 					sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
@@ -974,7 +978,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 		if len(indexIDs) > 0 {
 			tableReplace, exist := dbReplace.TableMap[job.TableID]
 			if !exist {
-				log.Debug("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
+				logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
 				return nil
 			}
 
@@ -983,7 +987,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 				for _, oldPid := range partitionIDs {
 					newPid, exist := tableReplace.PartitionMap[oldPid]
 					if !exist {
-						log.Debug("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
+						logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
 						continue
 					}
 					sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
@@ -1003,7 +1007,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 		}
 		tableReplace, exist := dbReplace.TableMap[job.TableID]
 		if !exist {
-			log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
+			logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
 			return nil
 		}
 
@@ -1012,7 +1016,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
 			for _, oldPid := range partitionIDs {
 				newPid, exist := tableReplace.PartitionMap[oldPid]
 				if !exist {
-					log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
+					logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
 					continue
 				}
 				sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
diff --git a/br/pkg/stream/util_test.go b/br/pkg/stream/util_test.go
index 2562c9ce15840..6dda62a04ad60 100644
--- a/br/pkg/stream/util_test.go
+++ b/br/pkg/stream/util_test.go
@@ -23,6 +23,10 @@ func TestDateFormat(t *testing.T) {
 			434605479096221697,
 			"2022-07-15 20:32:12.734 +0800",
 		},
+		{
+			434605478903808000,
+			"2022-07-15 20:32:12 +0800",
+		},
 	}
 
 	timeZone, _ := time.LoadLocation("Asia/Shanghai")
diff --git a/br/tests/br_pitr/incremental_data/delete_range.sql b/br/tests/br_pitr/incremental_data/delete_range.sql
new file mode 100644
index 0000000000000..f5afde943649e
--- /dev/null
+++ b/br/tests/br_pitr/incremental_data/delete_range.sql
@@ -0,0 +1,25 @@
+-- 1. Drop Schema
+drop database db_to_be_dropped;
+-- 2. Drop/Truncate Table
+drop table table_to_be_dropped_or_truncated.t0_dropped;
+drop table table_to_be_dropped_or_truncated.t1_dropped;
+truncate table table_to_be_dropped_or_truncated.t0_truncated;
+truncate table table_to_be_dropped_or_truncated.t1_truncated;
+-- 3. Drop/Truncate Table Partition
+alter table partition_to_be_dropped_or_truncated.t1_dropped drop partition p0; 
+alter table partition_to_be_dropped_or_truncated.t1_truncated truncate partition p0;
+-- 4. Drop Table Index/PrimaryKey
+alter table index_or_primarykey_to_be_dropped.t0 drop index k1;
+alter table index_or_primarykey_to_be_dropped.t1 drop index k1;
+alter table index_or_primarykey_to_be_dropped.t0 drop primary key;
+alter table index_or_primarykey_to_be_dropped.t1 drop primary key;
+-- 5. Drop Table Indexes
+alter table indexes_to_be_dropped.t0 drop index k1, drop index k2;
+alter table indexes_to_be_dropped.t1 drop index k1, drop index k2;
+-- 6. Drop Table Column/Columns
+alter table column_s_to_be_dropped.t0_column drop column name;
+alter table column_s_to_be_dropped.t1_column drop column name;
+alter table column_s_to_be_dropped.t0_columns drop column name, drop column c;
+alter table column_s_to_be_dropped.t1_columns drop column name, drop column c;
+-- 7. Modify Table Column
+alter table column_to_be_modified.t0 modify column name varchar(25);
diff --git a/br/tests/br_pitr/prepare_data/delete_range.sql b/br/tests/br_pitr/prepare_data/delete_range.sql
new file mode 100644
index 0000000000000..e2a20be9e45fa
--- /dev/null
+++ b/br/tests/br_pitr/prepare_data/delete_range.sql
@@ -0,0 +1,124 @@
+-- 1. Drop Schema
+create database db_to_be_dropped;
+create table db_to_be_dropped.t0(id int primary key, c int, name char(20));
+create table db_to_be_dropped.t1(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE );
+ 
+create index k1 on db_to_be_dropped.t0 (name);
+create index k2 on db_to_be_dropped.t0(c);
+create index k1 on db_to_be_dropped.t1(name);
+create index k2 on db_to_be_dropped.t1(c);
+create index k3 on db_to_be_dropped.t1 (id, c);
+ 
+insert into db_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123");
+insert into db_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123");
+-- 2. Drop/Truncate Table
+create database table_to_be_dropped_or_truncated;
+create table table_to_be_dropped_or_truncated.t0_dropped(id int primary key, c int, name char(20));
+create table table_to_be_dropped_or_truncated.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE );
+create table table_to_be_dropped_or_truncated.t0_truncated(id int primary key, c int, name char(20));
+create table table_to_be_dropped_or_truncated.t1_truncated(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE );
+
+create index k1 on table_to_be_dropped_or_truncated.t0_dropped (name);
+create index k2 on table_to_be_dropped_or_truncated.t0_dropped (c);
+create index k1 on table_to_be_dropped_or_truncated.t1_dropped (name);
+create index k2 on table_to_be_dropped_or_truncated.t1_dropped (c);
+create index k3 on table_to_be_dropped_or_truncated.t1_dropped (id, c);
+
+create index k1 on table_to_be_dropped_or_truncated.t0_truncated (name);
+create index k2 on table_to_be_dropped_or_truncated.t0_truncated (c);
+create index k1 on table_to_be_dropped_or_truncated.t1_truncated (name);
+create index k2 on table_to_be_dropped_or_truncated.t1_truncated (c);
+create index k3 on table_to_be_dropped_or_truncated.t1_truncated (id, c);
+ 
+insert into table_to_be_dropped_or_truncated.t0_dropped values (1, 2, "123"), (2, 3, "123");
+insert into table_to_be_dropped_or_truncated.t1_dropped values (1, 2, "123"), (2, 3, "123");
+
+insert into table_to_be_dropped_or_truncated.t0_truncated values (1, 2, "123"), (2, 3, "123");
+insert into table_to_be_dropped_or_truncated.t1_truncated values (1, 2, "123"), (2, 3, "123");
+
+-- 3. Drop/Truncate Table Partition
+create database partition_to_be_dropped_or_truncated;
+create table partition_to_be_dropped_or_truncated.t0_dropped(id int primary key, c int, name char(20));
+create table partition_to_be_dropped_or_truncated.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE );
+create table partition_to_be_dropped_or_truncated.t0_truncated(id int primary key, c int, name char(20));
+create table partition_to_be_dropped_or_truncated.t1_truncated(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE );
+
+create index k1 on partition_to_be_dropped_or_truncated.t0_dropped (name);
+create index k2 on partition_to_be_dropped_or_truncated.t0_dropped (c);
+create index k1 on partition_to_be_dropped_or_truncated.t1_dropped (name);
+create index k2 on partition_to_be_dropped_or_truncated.t1_dropped (c);
+create index k3 on partition_to_be_dropped_or_truncated.t1_dropped (id, c);
+
+create index k1 on partition_to_be_dropped_or_truncated.t0_truncated (name);
+create index k2 on partition_to_be_dropped_or_truncated.t0_truncated (c);
+create index k1 on partition_to_be_dropped_or_truncated.t1_truncated (name);
+create index k2 on partition_to_be_dropped_or_truncated.t1_truncated (c);
+create index k3 on partition_to_be_dropped_or_truncated.t1_truncated (id, c);
+ 
+insert into partition_to_be_dropped_or_truncated.t0_dropped values (1, 2, "123"), (2, 3, "123");
+insert into partition_to_be_dropped_or_truncated.t1_dropped values (1, 2, "123"), (2, 3, "123");
+
+insert into partition_to_be_dropped_or_truncated.t0_truncated values (1, 2, "123"), (2, 3, "123");
+insert into partition_to_be_dropped_or_truncated.t1_truncated values (1, 2, "123"), (2, 3, "123");
+-- 4. Drop Table Index/PrimaryKey
+create database index_or_primarykey_to_be_dropped;
+create table index_or_primarykey_to_be_dropped.t0(id int primary key nonclustered, c int, name char(20));
+create table index_or_primarykey_to_be_dropped.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE );
+
+create index k1 on index_or_primarykey_to_be_dropped.t0 (name);
+create index k2 on index_or_primarykey_to_be_dropped.t0 (c);
+create index k1 on index_or_primarykey_to_be_dropped.t1 (name);
+create index k2 on index_or_primarykey_to_be_dropped.t1 (c);
+create index k3 on index_or_primarykey_to_be_dropped.t1 (id, c);
+ 
+insert into index_or_primarykey_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123");
+insert into index_or_primarykey_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123");
+-- 5. Drop Table INDEXES
+create database indexes_to_be_dropped;
+create table indexes_to_be_dropped.t0(id int primary key nonclustered, c int, name char(20));
+create table indexes_to_be_dropped.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE );
+
+create index k1 on indexes_to_be_dropped.t0 (name);
+create index k2 on indexes_to_be_dropped.t0 (c);
+create index k1 on indexes_to_be_dropped.t1 (name);
+create index k2 on indexes_to_be_dropped.t1 (c);
+create index k3 on indexes_to_be_dropped.t1 (id, c);
+ 
+insert into indexes_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123");
+insert into indexes_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123");
+-- 6. Drop Table Column/Columns
+create database column_s_to_be_dropped;
+create table column_s_to_be_dropped.t0_column(id int primary key nonclustered, c int, name char(20));
+create table column_s_to_be_dropped.t1_column(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE );
+create table column_s_to_be_dropped.t0_columns(id int primary key nonclustered, c int, name char(20));
+create table column_s_to_be_dropped.t1_columns(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE );
+
+create index k1 on column_s_to_be_dropped.t0_column (name);
+create index k2 on column_s_to_be_dropped.t0_column (c);
+create index k1 on column_s_to_be_dropped.t1_column (name);
+create index k2 on column_s_to_be_dropped.t1_column (c);
+create index k3 on column_s_to_be_dropped.t1_column (id, c);
+
+create index k1 on column_s_to_be_dropped.t0_columns (name);
+create index k2 on column_s_to_be_dropped.t0_columns (c);
+create index k1 on column_s_to_be_dropped.t1_columns (name);
+create index k2 on column_s_to_be_dropped.t1_columns (c);
+-- create index k3 on column_s_to_be_dropped.t1_columns (id, c);
+ 
+insert into column_s_to_be_dropped.t0_column values (1, 2, "123"), (2, 3, "123");
+insert into column_s_to_be_dropped.t1_column values (1, 2, "123"), (2, 3, "123");
+insert into column_s_to_be_dropped.t0_columns values (1, 2, "123"), (2, 3, "123");
+insert into column_s_to_be_dropped.t1_columns values (1, 2, "123"), (2, 3, "123");
+-- 7. Modify Table Column
+create database column_to_be_modified;
+create table column_to_be_modified.t0(id int primary key nonclustered, c int, name char(20));
+create table column_to_be_modified.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE );
+
+create index k1 on column_to_be_modified.t0 (name);
+create index k2 on column_to_be_modified.t0 (c);
+create index k1 on column_to_be_modified.t1 (name);
+create index k2 on column_to_be_modified.t1 (c);
+create index k3 on column_to_be_modified.t1 (id, c);
+
+insert into column_to_be_modified.t0 values (1, 2, "123"), (2, 3, "123");
+insert into column_to_be_modified.t1 values (1, 2, "123"), (2, 3, "123");
diff --git a/br/tests/br_pitr/run.sh b/br/tests/br_pitr/run.sh
new file mode 100644
index 0000000000000..25a7fda5588f2
--- /dev/null
+++ b/br/tests/br_pitr/run.sh
@@ -0,0 +1,100 @@
+#!/bin/bash
+#
+# Copyright 2023 PingCAP, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -eu
+. run_services
+CUR=$(cd `dirname $0`; pwd)
+
+# const value
+PREFIX="pitr_backup" # NOTICE: don't start with 'br' because `restart services` would remove file/directory br*.
+res_file="$TEST_DIR/sql_res.$TEST_NAME.txt"
+
+# start a new cluster
+echo "restart a services"
+restart_services
+
+# prepare the data
+echo "prepare the data"
+run_sql_file $CUR/prepare_data/delete_range.sql
+# ...
+
+# start the log backup task
+echo "start log task"
+run_br --pd $PD_ADDR log start --task-name integration_test -s "local://$TEST_DIR/$PREFIX/log"
+
+# run snapshot backup
+echo "run snapshot backup"
+run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full"
+
+# load the incremental data
+echo "load the incremental data"
+run_sql_file $CUR/incremental_data/delete_range.sql
+# ...
+
+# wait checkpoint advance
+echo "wait checkpoint advance"
+sleep 10
+current_ts=$(echo $(($(date +%s%3N) << 18)))
+echo "current ts: $current_ts"
+i=0
+while true; do
+    # extract the checkpoint ts of the log backup task. If there is some error, the checkpoint ts should be empty
+    log_backup_status=$(unset BR_LOG_TO_TERM && run_br --pd $PD_ADDR log status --task-name integration_test --json 2>/dev/null)
+    echo "log backup status: $log_backup_status"
+    checkpoint_ts=$(echo "$log_backup_status" | head -n 1 | jq 'if .[0].last_errors | length  == 0 then .[0].checkpoint else empty end')
+    echo "checkpoint ts: $checkpoint_ts"
+
+    # check whether the checkpoint ts is a number
+    if [ $checkpoint_ts -gt 0 ] 2>/dev/null; then
+        # check whether the checkpoint has advanced
+        if [ $checkpoint_ts -gt $current_ts ]; then
+            echo "the checkpoint has advanced"
+            break
+        fi
+        # the checkpoint hasn't advanced
+        echo "the checkpoint hasn't advanced"
+        i=$((i+1))
+        if [ "$i" -gt 50 ]; then
+            echo 'the checkpoint lag is too large'
+            exit 1
+        fi
+        sleep 10
+    else
+        # unknown status, maybe somewhere is wrong
+        echo "TEST: [$TEST_NAME] failed to wait checkpoint advance!"
+        exit 1
+    fi
+done
+
+# dump some info from upstream cluster
+# ...
+
+# start a new cluster
+echo "restart a services"
+restart_services
+
+# PITR restore
+echo "run pitr"
+run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" > $res_file 2>&1
+
+# check something in downstream cluster
+echo "check br log"
+check_contains "restore log success summary"
+# check_not_contains "rewrite delete range"
+echo "" > $res_file
+echo "check sql result"
+run_sql "select count(*) DELETE_RANGE_CNT from mysql.gc_delete_range group by ts order by DELETE_RANGE_CNT desc limit 1;"
+check_contains "DELETE_RANGE_CNT: 46"
diff --git a/br/tests/run_group.sh b/br/tests/run_group.sh
index ca66c8d5013ce..717125b6d3552 100755
--- a/br/tests/run_group.sh
+++ b/br/tests/run_group.sh
@@ -24,7 +24,11 @@ groups=(
 	["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable"
 	["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full"
 	["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history"
+<<<<<<< HEAD
         ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index'
+=======
+	["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index br_pitr'
+>>>>>>> c21a5cfcb33 (br: add integration test for pitr (#47740))
 	["G04"]='br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index'
 	["G05"]='br_range br_rawkv br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table'
 	["G06"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn'