Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: add integration test for pitr #47740

Merged
merged 12 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/stream/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"@com_github_fatih_color//:color",
"@com_github_klauspost_compress//zstd",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
Expand Down
76 changes: 39 additions & 37 deletions br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
dbReplace, exist := sr.DbMap[job.SchemaID]
if !exist {
// skip this mddljob, the same below
log.Debug("try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID))
log.Warn("[rewrite delete range] try to drop a non-existent range, missing oldDBID", zap.Int64("oldDBID", job.SchemaID))
Leavrth marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand Down Expand Up @@ -744,14 +744,14 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
newTableIDs := make([]int64, 0, len(tableIDs))
for tableID, tableReplace := range dbReplace.TableMap {
if _, exist := argsSet[tableID]; !exist {
log.Debug("DropSchema: record a table, but it doesn't exist in job args",
log.Warn("[rewrite delete range] DropSchema: record a table, but it doesn't exist in job args",
zap.Int64("oldTableID", tableID))
continue
}
newTableIDs = append(newTableIDs, tableReplace.TableID)
for partitionID, newPartitionID := range tableReplace.PartitionMap {
if _, exist := argsSet[partitionID]; !exist {
log.Debug("DropSchema: record a partition, but it doesn't exist in job args",
log.Warn("[rewrite delete range] DropSchema: record a partition, but it doesn't exist in job args",
zap.Int64("oldPartitionID", partitionID))
continue
}
Expand All @@ -760,8 +760,8 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
}

if len(newTableIDs) != len(tableIDs) {
log.Debug(
"DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace")
log.Warn(
"[rewrite delete range] DropSchema: try to drop a non-existent table/partition, whose oldID doesn't exist in tableReplace")
// only drop newTableIDs' ranges
}

Expand All @@ -774,7 +774,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
case model.ActionDropTable, model.ActionTruncateTable:
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID",
log.Warn("[rewrite delete range] DropTable/TruncateTable: try to drop a non-existent table, missing oldTableID",
zap.Int64("oldTableID", job.TableID))
return nil
}
Expand All @@ -787,18 +787,19 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
return errors.Trace(err)
}
if len(physicalTableIDs) > 0 {
// delete partition id instead of table id
for i := 0; i < len(physicalTableIDs); i++ {
newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]]
newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs))
// delete partition id
for _, oldPid := range physicalTableIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", physicalTableIDs[i]))
log.Warn("[rewrite delete range] DropTable/TruncateTable: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", oldPid))
continue
}
physicalTableIDs[i] = newPid
newPhysicalTableIDs = append(newPhysicalTableIDs, newPid)
}
if len(physicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, physicalTableIDs)
if len(newPhysicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add some info logs inside this function to record which id should be deleted.

}
return nil
}
Expand All @@ -808,8 +809,8 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
case model.ActionDropTablePartition, model.ActionTruncateTablePartition:
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug(
"DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID",
log.Warn(
"[rewrite delete range] DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldTableID",
zap.Int64("oldTableID", job.TableID))
return nil
}
Expand All @@ -818,26 +819,27 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
return errors.Trace(err)
}

for i := 0; i < len(physicalTableIDs); i++ {
newPid, exist := tableReplace.PartitionMap[physicalTableIDs[i]]
newPhysicalTableIDs := make([]int64, 0, len(physicalTableIDs))
for _, oldPid := range physicalTableIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug(
"DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", physicalTableIDs[i]))
log.Warn(
"[rewrite delete range] DropTablePartition/TruncateTablePartition: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", oldPid))
continue
}
physicalTableIDs[i] = newPid
newPhysicalTableIDs = append(newPhysicalTableIDs, newPid)
}
if len(physicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, physicalTableIDs)
if len(newPhysicalTableIDs) > 0 {
sr.insertDeleteRangeForTable(newJobID, newPhysicalTableIDs)
}
return nil
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex, model.ActionAddPrimaryKey:
// iff job.State = model.JobStateRollbackDone
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID",
log.Warn("[rewrite delete range] AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldTableID",
zap.Int64("oldTableID", job.TableID))
return nil
}
Expand All @@ -856,8 +858,8 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug(
"AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID",
log.Warn(
"[rewrite delete range] AddIndex/AddPrimaryKey roll-back: try to drop a non-existent table, missing oldPartitionID",
zap.Int64("oldPartitionID", oldPid))
continue
}
Expand All @@ -871,7 +873,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
case model.ActionDropIndex, model.ActionDropPrimaryKey:
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
log.Warn("[rewrite delete range] DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -890,7 +892,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
log.Warn("[rewrite delete range] DropIndex/DropPrimaryKey: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
// len(indexIDs) = 1
Expand All @@ -913,7 +915,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {

tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
log.Warn("[rewrite delete range] DropIndexes: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -922,7 +924,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
log.Warn("[rewrite delete range] DropIndexes: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand All @@ -942,7 +944,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
if len(indexIDs) > 0 {
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
log.Warn("[rewrite delete range] DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -951,7 +953,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
log.Warn("[rewrite delete range] DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand All @@ -972,7 +974,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
if len(indexIDs) > 0 {
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
log.Warn("[rewrite delete range] DropColumns: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -981,7 +983,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
log.Warn("[rewrite delete range] DropColumns: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand All @@ -1001,7 +1003,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
}
tableReplace, exist := dbReplace.TableMap[job.TableID]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
log.Warn("[rewrite delete range] DropColumn: try to drop a non-existent table, missing oldTableID", zap.Int64("oldTableID", job.TableID))
return nil
}

Expand All @@ -1010,7 +1012,7 @@ func (sr *SchemasReplace) deleteRange(job *model.Job) error {
for _, oldPid := range partitionIDs {
newPid, exist := tableReplace.PartitionMap[oldPid]
if !exist {
log.Debug("DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
log.Warn("[rewrite delete range] DropColumn: try to drop a non-existent table, missing oldPartitionID", zap.Int64("oldPartitionID", oldPid))
continue
}
sr.insertDeleteRangeForIndex(newJobID, &elementID, newPid, indexIDs)
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/stream/stream_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"encoding/json"
"fmt"
"io"
"os"
"regexp"
"strconv"
"sync"
"time"

"github.com/fatih/color"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -419,6 +421,20 @@ func (ctl *StatusController) PrintStatusOfTask(ctx context.Context, name string)
if err != nil {
return err
}
failpoint.Inject("only-checkpoint-ts-with-check", func(val failpoint.Value) {
if nowTs, ok := val.(string); ok && len(tasks) > 0 && len(nowTs) > 0 {
checkpointTime := oracle.GetTimeFromTS(tasks[0].globalCheckpoint)
log.Info("get checkpoint time", zap.Time("checkpoint ts", checkpointTime))
nowTime, err := ParseDate(nowTs)
if err != nil {
failpoint.Return(err)
}
if checkpointTime.After(nowTime) {
os.Exit(50)
}
os.Exit(51)
}
})
ctl.printToView(tasks)
return nil
}
4 changes: 4 additions & 0 deletions br/pkg/stream/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ func FormatDate(ts time.Time) string {
return ts.Format(DATE_FORMAT)
}

func ParseDate(date string) (time.Time, error) {
return time.Parse(DATE_FORMAT, date)
}

func IsMetaDBKey(key []byte) bool {
return strings.HasPrefix(string(key), "mDB")
}
Expand Down
10 changes: 9 additions & 1 deletion br/pkg/stream/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,20 @@ func TestDateFormat(t *testing.T) {
434605479096221697,
"2022-07-15 20:32:12.734 +0800",
},
{
434605478903808000,
"2022-07-15 20:32:12 +0800",
},
}

timeZone, _ := time.LoadLocation("Asia/Shanghai")
for _, ca := range cases {
date := FormatDate(oracle.GetTimeFromTS(ca.ts).In(timeZone))
ts := oracle.GetTimeFromTS(ca.ts).In(timeZone)
date := FormatDate(ts)
require.Equal(t, ca.target, date)
ts2, err := ParseDate(date)
require.NoError(t, err)
require.Equal(t, ts, ts2.In(timeZone))
}
}

Expand Down
2 changes: 1 addition & 1 deletion br/tests/br_full/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ test_log="${TEST_DIR}/${DB}_test.log"
error_str="not read from or written to within the timeout period"
unset BR_LOG_TO_TERM

export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"<Code>RequestTimeout</Code>\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"<Code>InvalidPart</Code>\")""
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/backup/backup-storage-error=1*return(\"connection refused\")->1*return(\"InternalError\");github.com/pingcap/tidb/br/pkg/backup/backup-timeout-error=1*return(\"<Code>RequestTimeout</Code>\")->1*return(\"not read from or written to within the timeout period\")->1*return(\"<Code>InvalidPart</Code>\")"
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB-lz4" --concurrency 4 --compression lz4 --log-file $test_log
export GO_FAILPOINTS=""
size_lz4=$(du -d 0 $TEST_DIR/$DB-lz4 | awk '{print $1}')
Expand Down
25 changes: 25 additions & 0 deletions br/tests/br_pitr/incremental_data/delete_range.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- 1. Drop Schema
drop database db_to_be_dropped;
-- 2. Drop/Truncate Table
drop table table_to_be_dropped_or_truncated.t0_dropped;
drop table table_to_be_dropped_or_truncated.t1_dropped;
truncate table table_to_be_dropped_or_truncated.t0_truncated;
truncate table table_to_be_dropped_or_truncated.t1_truncated;
-- 3. Drop/Truncate Table Partition
alter table partition_to_be_dropped_or_truncated.t1_dropped drop partition p0;
alter table partition_to_be_dropped_or_truncated.t1_truncated truncate partition p0;
-- 4. Drop Table Index/PrimaryKey
alter table index_or_primarykey_to_be_dropped.t0 drop index k1;
alter table index_or_primarykey_to_be_dropped.t1 drop index k1;
alter table index_or_primarykey_to_be_dropped.t0 drop primary key;
alter table index_or_primarykey_to_be_dropped.t1 drop primary key;
-- 5. Drop Table Indexes
alter table indexes_to_be_dropped.t0 drop index k1, drop index k2;
alter table indexes_to_be_dropped.t1 drop index k1, drop index k2;
-- 6. Drop Table Column/Columns
alter table column_s_to_be_dropped.t0_column drop column name;
alter table column_s_to_be_dropped.t1_column drop column name;
alter table column_s_to_be_dropped.t0_columns drop column name, drop column c;
alter table column_s_to_be_dropped.t1_columns drop column name, drop column c;
-- 7. Modify Table Column
alter table column_to_be_modified.t0 modify column name varchar(25);
Loading