diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index 55086f17d99d4..5366b60150d40 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -23,6 +23,7 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/restore/ingestrec" "github.com/pingcap/tidb/br/pkg/restore/tiflashrec" "github.com/pingcap/tidb/pkg/kv" @@ -705,10 +706,11 @@ func (sr *SchemasReplace) restoreFromHistory(job *model.Job, isSubJob bool) erro } func (sr *SchemasReplace) deleteRange(job *model.Job) error { + lctx := logutil.ContextWithField(context.Background(), logutil.RedactAny("category", "ddl: rewrite delete range")) dbReplace, exist := sr.DbMap[job.SchemaID] if !exist { // skip this mddljob, the same below - log.Debug("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID)) + logutil.CL(lctx).Warn("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID)) return nil } @@ -744,14 +746,14 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { newTableIDs := make([]int64, 0, len(tableIDs)) for tableID, tableReplace := range dbReplace.TableMap { if _, exist := argsSet[tableID]; !exist { - log.Debug("DropSchema: record a table, but it doesn't exist in job args", + logutil.CL(lctx).Warn("DropSchema: record a table, but it doesn't exist in job args", zap.Int64("oldTableID", tableID)) continue } newTableIDs = append(newTableIDs, tableReplace.TableID) for partitionID, newPartitionID := range tableReplace.PartitionMap { if _, exist := argsSet[partitionID]; !exist { - log.Debug("DropSchema: record a partition, but it doesn't exist in job args", + logutil.CL(lctx).Warn("DropSchema: record a partition, but it doesn't exist in job args", zap.Int64("oldPartitionID", partitionID)) continue } @@ -760,7 +762,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { } if len(newTableIDs) != len(tableIDs) { - log.Debug( + logutil.CL(lctx).Warn( "DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace") // only drop newTableIDs' ranges } @@ -774,7 +776,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { case model.ActionDropTable, model.ActionTruncateTable: tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID", + logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -787,18 +789,19 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { return errors.Trace(err) } if len(physicalTableIDs) > 0 { - // delete partition id instead of table id - for i := 0; i < len(physicalTableIDs); i++ { - newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]] + newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs)) + // delete partition id + for _, oldPid := range physicalTableIDs { + newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID", - zap.Int64("oldPartitionID", physicalTableIDs[i])) + logutil.CL(lctx).Warn("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID", + zap.Int64("oldPartitionID", oldPid)) continue } - physicalTableIDs[i] = newPid + newPhysicalTableIDs = append(newPhysicalTableIDs, newPid) } - if len(physicalTableIDs) > 0 { - sr.insertDeleteRangeForTable(newJobID, physicalTableIDs) + if len(newPhysicalTableIDs) > 0 { + sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs) } return nil } @@ -808,7 +811,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { case model.ActionDropTablePartition, model.ActionTruncateTablePartition: tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug( + logutil.CL(lctx).Warn( "DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil @@ -818,18 +821,19 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { return errors.Trace(err) } - for i := 0; i < len(physicalTableIDs); i++ { - newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]] + newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs)) + for _, oldPid := range physicalTableIDs { + newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug( + logutil.CL(lctx).Warn( "DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID", - zap.Int64("oldPartitionID", physicalTableIDs[i])) + zap.Int64("oldPartitionID", oldPid)) continue } - physicalTableIDs[i] = newPid + newPhysicalTableIDs = append(newPhysicalTableIDs, newPid) } - if len(physicalTableIDs) > 0 { - sr.insertDeleteRangeForTable(newJobID, physicalTableIDs) + if len(newPhysicalTableIDs) > 0 { + sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs) } return nil // ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled. @@ -837,7 +841,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { // iff job.State = model.JobStateRollbackDone tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID", + logutil.CL(lctx).Warn("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -856,7 +860,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug( + logutil.CL(lctx).Warn( "AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue @@ -871,7 +875,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { case model.ActionDropIndex, model.ActionDropPrimaryKey: tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -890,7 +894,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } // len(indexIDs) = 1 @@ -913,7 +917,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -922,7 +926,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) @@ -942,7 +946,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { if len(indexIDs) > 0 { tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -951,7 +955,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) @@ -972,7 +976,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { if len(indexIDs) > 0 { tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -981,7 +985,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) @@ -1001,7 +1005,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { } tableReplace, exist := dbReplace.TableMap[job.TableID] if !exist { - log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) + logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID)) return nil } @@ -1010,7 +1014,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error { for _, oldPid := range partitionIDs { newPid, exist := tableReplace.PartitionMap[oldPid] if !exist { - log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) + logutil.CL(lctx).Warn("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid)) continue } sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs) diff --git a/br/pkg/stream/util_test.go b/br/pkg/stream/util_test.go index 2562c9ce15840..6dda62a04ad60 100644 --- a/br/pkg/stream/util_test.go +++ b/br/pkg/stream/util_test.go @@ -23,6 +23,10 @@ func TestDateFormat(t *testing.T) { 434605479096221697, "2022-07-15 20:32:12.734 +0800", }, + { + 434605478903808000, + "2022-07-15 20:32:12 +0800", + }, } timeZone, _ := time.LoadLocation("Asia/Shanghai") diff --git a/br/tests/br_pitr/incremental_data/delete_range.sql b/br/tests/br_pitr/incremental_data/delete_range.sql new file mode 100644 index 0000000000000..f5afde943649e --- /dev/null +++ b/br/tests/br_pitr/incremental_data/delete_range.sql @@ -0,0 +1,25 @@ +-- 1. Drop Schema +drop database db_to_be_dropped; +-- 2. Drop/Truncate Table +drop table table_to_be_dropped_or_truncated.t0_dropped; +drop table table_to_be_dropped_or_truncated.t1_dropped; +truncate table table_to_be_dropped_or_truncated.t0_truncated; +truncate table table_to_be_dropped_or_truncated.t1_truncated; +-- 3. Drop/Truncate Table Partition +alter table partition_to_be_dropped_or_truncated.t1_dropped drop partition p0; +alter table partition_to_be_dropped_or_truncated.t1_truncated truncate partition p0; +-- 4. Drop Table Index/PrimaryKey +alter table index_or_primarykey_to_be_dropped.t0 drop index k1; +alter table index_or_primarykey_to_be_dropped.t1 drop index k1; +alter table index_or_primarykey_to_be_dropped.t0 drop primary key; +alter table index_or_primarykey_to_be_dropped.t1 drop primary key; +-- 5. Drop Table Indexes +alter table indexes_to_be_dropped.t0 drop index k1, drop index k2; +alter table indexes_to_be_dropped.t1 drop index k1, drop index k2; +-- 6. Drop Table Column/Columns +alter table column_s_to_be_dropped.t0_column drop column name; +alter table column_s_to_be_dropped.t1_column drop column name; +alter table column_s_to_be_dropped.t0_columns drop column name, drop column c; +alter table column_s_to_be_dropped.t1_columns drop column name, drop column c; +-- 7. Modify Table Column +alter table column_to_be_modified.t0 modify column name varchar(25); diff --git a/br/tests/br_pitr/prepare_data/delete_range.sql b/br/tests/br_pitr/prepare_data/delete_range.sql new file mode 100644 index 0000000000000..e2a20be9e45fa --- /dev/null +++ b/br/tests/br_pitr/prepare_data/delete_range.sql @@ -0,0 +1,124 @@ +-- 1. Drop Schema +create database db_to_be_dropped; +create table db_to_be_dropped.t0(id int primary key, c int, name char(20)); +create table db_to_be_dropped.t1(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on db_to_be_dropped.t0 (name); +create index k2 on db_to_be_dropped.t0(c); +create index k1 on db_to_be_dropped.t1(name); +create index k2 on db_to_be_dropped.t1(c); +create index k3 on db_to_be_dropped.t1 (id, c); + +insert into db_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); +insert into db_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); +-- 2. Drop/Truncate Table +create database table_to_be_dropped_or_truncated; +create table table_to_be_dropped_or_truncated.t0_dropped(id int primary key, c int, name char(20)); +create table table_to_be_dropped_or_truncated.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table table_to_be_dropped_or_truncated.t0_truncated(id int primary key, c int, name char(20)); +create table table_to_be_dropped_or_truncated.t1_truncated(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on table_to_be_dropped_or_truncated.t0_dropped (name); +create index k2 on table_to_be_dropped_or_truncated.t0_dropped (c); +create index k1 on table_to_be_dropped_or_truncated.t1_dropped (name); +create index k2 on table_to_be_dropped_or_truncated.t1_dropped (c); +create index k3 on table_to_be_dropped_or_truncated.t1_dropped (id, c); + +create index k1 on table_to_be_dropped_or_truncated.t0_truncated (name); +create index k2 on table_to_be_dropped_or_truncated.t0_truncated (c); +create index k1 on table_to_be_dropped_or_truncated.t1_truncated (name); +create index k2 on table_to_be_dropped_or_truncated.t1_truncated (c); +create index k3 on table_to_be_dropped_or_truncated.t1_truncated (id, c); + +insert into table_to_be_dropped_or_truncated.t0_dropped values (1, 2, "123"), (2, 3, "123"); +insert into table_to_be_dropped_or_truncated.t1_dropped values (1, 2, "123"), (2, 3, "123"); + +insert into table_to_be_dropped_or_truncated.t0_truncated values (1, 2, "123"), (2, 3, "123"); +insert into table_to_be_dropped_or_truncated.t1_truncated values (1, 2, "123"), (2, 3, "123"); + +-- 3. Drop/Truncate Table Partition +create database partition_to_be_dropped_or_truncated; +create table partition_to_be_dropped_or_truncated.t0_dropped(id int primary key, c int, name char(20)); +create table partition_to_be_dropped_or_truncated.t1_dropped(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table partition_to_be_dropped_or_truncated.t0_truncated(id int primary key, c int, name char(20)); +create table partition_to_be_dropped_or_truncated.t1_truncated(id int primary key, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on partition_to_be_dropped_or_truncated.t0_dropped (name); +create index k2 on partition_to_be_dropped_or_truncated.t0_dropped (c); +create index k1 on partition_to_be_dropped_or_truncated.t1_dropped (name); +create index k2 on partition_to_be_dropped_or_truncated.t1_dropped (c); +create index k3 on partition_to_be_dropped_or_truncated.t1_dropped (id, c); + +create index k1 on partition_to_be_dropped_or_truncated.t0_truncated (name); +create index k2 on partition_to_be_dropped_or_truncated.t0_truncated (c); +create index k1 on partition_to_be_dropped_or_truncated.t1_truncated (name); +create index k2 on partition_to_be_dropped_or_truncated.t1_truncated (c); +create index k3 on partition_to_be_dropped_or_truncated.t1_truncated (id, c); + +insert into partition_to_be_dropped_or_truncated.t0_dropped values (1, 2, "123"), (2, 3, "123"); +insert into partition_to_be_dropped_or_truncated.t1_dropped values (1, 2, "123"), (2, 3, "123"); + +insert into partition_to_be_dropped_or_truncated.t0_truncated values (1, 2, "123"), (2, 3, "123"); +insert into partition_to_be_dropped_or_truncated.t1_truncated values (1, 2, "123"), (2, 3, "123"); +-- 4. Drop Table Index/PrimaryKey +create database index_or_primarykey_to_be_dropped; +create table index_or_primarykey_to_be_dropped.t0(id int primary key nonclustered, c int, name char(20)); +create table index_or_primarykey_to_be_dropped.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on index_or_primarykey_to_be_dropped.t0 (name); +create index k2 on index_or_primarykey_to_be_dropped.t0 (c); +create index k1 on index_or_primarykey_to_be_dropped.t1 (name); +create index k2 on index_or_primarykey_to_be_dropped.t1 (c); +create index k3 on index_or_primarykey_to_be_dropped.t1 (id, c); + +insert into index_or_primarykey_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); +insert into index_or_primarykey_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); +-- 5. Drop Table INDEXES +create database indexes_to_be_dropped; +create table indexes_to_be_dropped.t0(id int primary key nonclustered, c int, name char(20)); +create table indexes_to_be_dropped.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on indexes_to_be_dropped.t0 (name); +create index k2 on indexes_to_be_dropped.t0 (c); +create index k1 on indexes_to_be_dropped.t1 (name); +create index k2 on indexes_to_be_dropped.t1 (c); +create index k3 on indexes_to_be_dropped.t1 (id, c); + +insert into indexes_to_be_dropped.t0 values (1, 2, "123"), (2, 3, "123"); +insert into indexes_to_be_dropped.t1 values (1, 2, "123"), (2, 3, "123"); +-- 6. Drop Table Column/Columns +create database column_s_to_be_dropped; +create table column_s_to_be_dropped.t0_column(id int primary key nonclustered, c int, name char(20)); +create table column_s_to_be_dropped.t1_column(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); +create table column_s_to_be_dropped.t0_columns(id int primary key nonclustered, c int, name char(20)); +create table column_s_to_be_dropped.t1_columns(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on column_s_to_be_dropped.t0_column (name); +create index k2 on column_s_to_be_dropped.t0_column (c); +create index k1 on column_s_to_be_dropped.t1_column (name); +create index k2 on column_s_to_be_dropped.t1_column (c); +create index k3 on column_s_to_be_dropped.t1_column (id, c); + +create index k1 on column_s_to_be_dropped.t0_columns (name); +create index k2 on column_s_to_be_dropped.t0_columns (c); +create index k1 on column_s_to_be_dropped.t1_columns (name); +create index k2 on column_s_to_be_dropped.t1_columns (c); +-- create index k3 on column_s_to_be_dropped.t1_columns (id, c); + +insert into column_s_to_be_dropped.t0_column values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped.t1_column values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped.t0_columns values (1, 2, "123"), (2, 3, "123"); +insert into column_s_to_be_dropped.t1_columns values (1, 2, "123"), (2, 3, "123"); +-- 7. Modify Table Column +create database column_to_be_modified; +create table column_to_be_modified.t0(id int primary key nonclustered, c int, name char(20)); +create table column_to_be_modified.t1(id int primary key nonclustered, c int, name char(20)) PARTITION BY RANGE(id) ( PARTITION p0 VALUES LESS THAN (0), PARTITION p1 VALUES LESS THAN (10), PARTITION p2 VALUES LESS THAN MAXVALUE ); + +create index k1 on column_to_be_modified.t0 (name); +create index k2 on column_to_be_modified.t0 (c); +create index k1 on column_to_be_modified.t1 (name); +create index k2 on column_to_be_modified.t1 (c); +create index k3 on column_to_be_modified.t1 (id, c); + +insert into column_to_be_modified.t0 values (1, 2, "123"), (2, 3, "123"); +insert into column_to_be_modified.t1 values (1, 2, "123"), (2, 3, "123"); diff --git a/br/tests/br_pitr/run.sh b/br/tests/br_pitr/run.sh new file mode 100644 index 0000000000000..25a7fda5588f2 --- /dev/null +++ b/br/tests/br_pitr/run.sh @@ -0,0 +1,100 @@ +#!/bin/bash +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +. run_services +CUR=$(cd `dirname $0`; pwd) + +# const value +PREFIX="pitr_backup" # NOTICE: don't start with 'br' because `restart services` would remove file/directory br*. +res_file="$TEST_DIR/sql_res.$TEST_NAME.txt" + +# start a new cluster +echo "restart a services" +restart_services + +# prepare the data +echo "prepare the data" +run_sql_file $CUR/prepare_data/delete_range.sql +# ... + +# start the log backup task +echo "start log task" +run_br --pd $PD_ADDR log start --task-name integration_test -s "local://$TEST_DIR/$PREFIX/log" + +# run snapshot backup +echo "run snapshot backup" +run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full" + +# load the incremental data +echo "load the incremental data" +run_sql_file $CUR/incremental_data/delete_range.sql +# ... + +# wait checkpoint advance +echo "wait checkpoint advance" +sleep 10 +current_ts=$(echo $(($(date +%s%3N) << 18))) +echo "current ts: $current_ts" +i=0 +while true; do + # extract the checkpoint ts of the log backup task. If there is some error, the checkpoint ts should be empty + log_backup_status=$(unset BR_LOG_TO_TERM && run_br --pd $PD_ADDR log status --task-name integration_test --json 2>/dev/null) + echo "log backup status: $log_backup_status" + checkpoint_ts=$(echo "$log_backup_status" | head -n 1 | jq 'if .[0].last_errors | length == 0 then .[0].checkpoint else empty end') + echo "checkpoint ts: $checkpoint_ts" + + # check whether the checkpoint ts is a number + if [ $checkpoint_ts -gt 0 ] 2>/dev/null; then + # check whether the checkpoint has advanced + if [ $checkpoint_ts -gt $current_ts ]; then + echo "the checkpoint has advanced" + break + fi + # the checkpoint hasn't advanced + echo "the checkpoint hasn't advanced" + i=$((i+1)) + if [ "$i" -gt 50 ]; then + echo 'the checkpoint lag is too large' + exit 1 + fi + sleep 10 + else + # unknown status, maybe somewhere is wrong + echo "TEST: [$TEST_NAME] failed to wait checkpoint advance!" + exit 1 + fi +done + +# dump some info from upstream cluster +# ... + +# start a new cluster +echo "restart a services" +restart_services + +# PITR restore +echo "run pitr" +run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" > $res_file 2>&1 + +# check something in downstream cluster +echo "check br log" +check_contains "restore log success summary" +# check_not_contains "rewrite delete range" +echo "" > $res_file +echo "check sql result" +run_sql "select count(*) DELETE_RANGE_CNT from mysql.gc_delete_range group by ts order by DELETE_RANGE_CNT desc limit 1;" +check_contains "DELETE_RANGE_CNT: 46" diff --git a/br/tests/run_group.sh b/br/tests/run_group.sh index 39068ab078427..8da15cab19a30 100755 --- a/br/tests/run_group.sh +++ b/br/tests/run_group.sh @@ -23,7 +23,7 @@ groups=( ["G00"]="br_300_small_tables br_backup_empty br_backup_version br_cache_table br_case_sensitive br_charset_gbk br_check_new_collocation_enable" ["G01"]="br_autoid br_crypter2 br_db br_db_online br_db_online_newkv br_db_skip br_debug_meta br_ebs br_foreign_key br_full" ["G02"]="br_full_cluster_restore br_full_ddl br_full_index br_gcs br_history" - ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index' + ["G03"]='br_incompatible_tidb_config br_incremental br_incremental_ddl br_incremental_index br_pitr' ["G04"]='br_incremental_only_ddl br_incremental_same_table br_insert_after_restore br_key_locked br_log_test br_move_backup br_mv_index br_other br_partition_add_index' ["G05"]='br_range br_rawkv br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table' ["G06"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn'