diff --git a/ddl/cluster.go b/ddl/cluster.go index fbcfa9cd8a49f..227963b3951d5 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -19,6 +19,7 @@ import ( "context" "encoding/hex" "fmt" + "math" "strings" "time" @@ -147,6 +148,17 @@ func getTiDBSuperReadOnly(sess sessionctx.Context) (string, error) { return val, nil } +func isFlashbackSupportedDDLAction(action model.ActionType) bool { + switch action { + case model.ActionSetTiFlashReplica, model.ActionUpdateTiFlashReplicaStatus, model.ActionAlterPlacementPolicy, + model.ActionAlterTablePlacement, model.ActionAlterTablePartitionPlacement, model.ActionCreatePlacementPolicy, + model.ActionDropPlacementPolicy, model.ActionModifySchemaDefaultPlacement: + return false + default: + return true + } +} + func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) { if err = ValidateFlashbackTS(d.ctx, sess, flashbackTS); err != nil { return err @@ -170,19 +182,47 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta return errors.Trace(err) } - flashbackSchemaVersion, err := meta.NewSnapshotMeta(d.store.GetSnapshot(kv.NewVersion(flashbackTS))).GetSchemaVersion() + flashbackSnapshotMeta := meta.NewSnapshotMeta(d.store.GetSnapshot(kv.NewVersion(flashbackTS))) + flashbackSchemaVersion, err := flashbackSnapshotMeta.GetSchemaVersion() if err != nil { return errors.Trace(err) } - // If flashbackSchemaVersion not same as nowSchemaVersion, we've done ddl during [flashbackTs, now). + flashbackTSString := oracle.GetTimeFromTS(flashbackTS).String() + + // Check if there is an upgrade during [flashbackTS, now) + sql := fmt.Sprintf("select VARIABLE_VALUE from mysql.tidb as of timestamp '%s' where VARIABLE_NAME='tidb_server_version'", flashbackTSString) + rows, err := newSession(sess).execute(d.ctx, sql, "check_tidb_server_version") + if err != nil || len(rows) == 0 { + return errors.Errorf("Get history `tidb_server_version` failed, can't do flashback") + } + sql = fmt.Sprintf("select 1 from mysql.tidb where VARIABLE_NAME='tidb_server_version' and VARIABLE_VALUE=%s", rows[0].GetString(0)) + rows, err = newSession(sess).execute(d.ctx, sql, "check_tidb_server_version") + if err != nil { + return errors.Trace(err) + } + if len(rows) == 0 { + return errors.Errorf("Detected TiDB upgrade during [%s, now), can't do flashback", flashbackTSString) + } + + // Check is there a DDL task at flashbackTS. + sql = fmt.Sprintf("select count(*) from mysql.%s as of timestamp '%s'", JobTable, flashbackTSString) + rows, err = newSession(sess).execute(d.ctx, sql, "check_history_job") + if err != nil || len(rows) == 0 { + return errors.Errorf("Get history ddl jobs failed, can't do flashback") + } + if rows[0].GetInt64(0) != 0 { + return errors.Errorf("Detected another DDL job at %s, can't do flashback", flashbackTSString) + } + + // If flashbackSchemaVersion not same as nowSchemaVersion, we should check all schema diffs during [flashbackTs, now). for i := flashbackSchemaVersion + 1; i <= nowSchemaVersion; i++ { diff, err := t.GetSchemaDiff(i) if err != nil { return errors.Trace(err) } - if diff != nil && diff.Type != model.ActionFlashbackCluster { - return errors.Errorf("Detected schema change due to another DDL job during [%s, now), can't do flashback", oracle.GetTimeFromTS(flashbackTS)) + if diff != nil && !isFlashbackSupportedDDLAction(diff.Type) { + return errors.Errorf("Detected unsupported DDL job type(%s) during [%s, now), can't do flashback", diff.Type.String(), flashbackTSString) } } @@ -211,7 +251,7 @@ type flashbackID struct { func addToSlice(schema string, tableName string, tableID int64, flashbackIDs []flashbackID) []flashbackID { var excluded bool - if filter.IsSystemSchema(schema) && !strings.HasPrefix(tableName, "stats_") { + if filter.IsSystemSchema(schema) && !strings.HasPrefix(tableName, "stats_") && tableName != "gc_delete_range" { excluded = true } flashbackIDs = append(flashbackIDs, flashbackID{ @@ -270,6 +310,14 @@ func GetFlashbackKeyRanges(sess sessionctx.Context) ([]kv.KeyRange, error) { }) } + // The meta data key ranges. + metaStartKey := tablecodec.EncodeMetaKey(meta.DBkey(0), meta.TableKey(0)) + metaEndKey := tablecodec.EncodeMetaKey(meta.DBkey(math.MaxInt64), meta.TableKey(math.MaxInt64)) + keyRanges = append(keyRanges, kv.KeyRange{ + StartKey: metaStartKey, + EndKey: metaEndKey, + }) + return keyRanges, nil } @@ -633,7 +681,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve asyncNotifyEvent(d, &util.Event{Tp: model.ActionFlashbackCluster}) job.State = model.JobStateDone job.SchemaState = model.StatePublic - return ver, nil + return updateSchemaVersion(d, t, job) } return ver, nil } diff --git a/ddl/cluster_test.go b/ddl/cluster_test.go index 4c1ec291f87f2..12c77c42edafe 100644 --- a/ddl/cluster_test.go +++ b/ddl/cluster_test.go @@ -42,15 +42,16 @@ func TestGetFlashbackKeyRanges(t *testing.T) { kvRanges, err := ddl.GetFlashbackKeyRanges(se) require.NoError(t, err) - // The results are 6 key ranges - // 0: (stats_meta,stats_histograms,stats_buckets) + // The results are 8 key ranges + // 0: (stats_meta,stats_histograms,stats_buckets, gc_delete_range) // 1: (stats_feedback) // 2: (stats_top_n) // 3: (stats_extended) // 4: (stats_fm_sketch) // 5: (stats_history, stats_meta_history) // 6: (stats_table_locked) - require.Len(t, kvRanges, 7) + // 7: meta Ranges + require.Len(t, kvRanges, 8) tk.MustExec("use test") tk.MustExec("CREATE TABLE employees (" + @@ -64,7 +65,7 @@ func TestGetFlashbackKeyRanges(t *testing.T) { ");") tk.MustExec("truncate table mysql.analyze_jobs") - // truncate all `stats_` tables, make table ID consecutive. + // truncate all `stats_` and `gc_delete_range` tables, make table ID consecutive. tk.MustExec("truncate table mysql.stats_meta") tk.MustExec("truncate table mysql.stats_histograms") tk.MustExec("truncate table mysql.stats_buckets") @@ -75,14 +76,15 @@ func TestGetFlashbackKeyRanges(t *testing.T) { tk.MustExec("truncate table mysql.stats_history") tk.MustExec("truncate table mysql.stats_meta_history") tk.MustExec("truncate table mysql.stats_table_locked") + tk.MustExec("truncate table mysql.gc_delete_range") kvRanges, err = ddl.GetFlashbackKeyRanges(se) require.NoError(t, err) - require.Len(t, kvRanges, 2) + require.Len(t, kvRanges, 3) tk.MustExec("truncate table test.employees") kvRanges, err = ddl.GetFlashbackKeyRanges(se) require.NoError(t, err) - require.Len(t, kvRanges, 1) + require.Len(t, kvRanges, 2) } func TestFlashbackCloseAndResetPDSchedule(t *testing.T) { diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 89f466b1497c5..a760cb598129b 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -1609,6 +1609,11 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ... OldTableID: recoverTabsInfo[i].TableInfo.ID, } } + case model.ActionFlashbackCluster: + diff.TableID = -1 + if job.SchemaState == model.StatePublic { + diff.RegenerateSchemaMap = true + } default: diff.TableID = job.TableID } diff --git a/ddl/tiflashtest/BUILD.bazel b/ddl/tiflashtest/BUILD.bazel index 8854778def892..2a803cf03c5af 100644 --- a/ddl/tiflashtest/BUILD.bazel +++ b/ddl/tiflashtest/BUILD.bazel @@ -28,6 +28,7 @@ go_test( "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//testutils", "@org_uber_go_goleak//:goleak", "@org_uber_go_zap//:zap", diff --git a/ddl/tiflashtest/ddl_tiflash_test.go b/ddl/tiflashtest/ddl_tiflash_test.go index 8922f7211f8b7..d1d0368138b18 100644 --- a/ddl/tiflashtest/ddl_tiflash_test.go +++ b/ddl/tiflashtest/ddl_tiflash_test.go @@ -45,6 +45,7 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/testutils" "go.uber.org/zap" ) @@ -438,6 +439,44 @@ func TestTiFlashDropPartition(t *testing.T) { CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "ddltiflash") } +func TestTiFlashFlashbackCluster(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + tk.MustExec("insert into t values (1), (2), (3)") + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + tk.MustExec("alter table t set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable) + CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "t") + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) + require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + + ChangeGCSafePoint(tk, time.Now().Add(-10*time.Second), "true", "10m0s") + defer func() { + ChangeGCSafePoint(tk, time.Now(), "true", "10m0s") + }() + + errorMsg := fmt.Sprintf("[ddl:-1]Detected unsupported DDL job type(%s) during [%s, now), can't do flashback", + model.ActionSetTiFlashReplica.String(), oracle.GetTimeFromTS(ts).String()) + tk.MustGetErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errorMsg) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) + require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) +} + func CheckTableAvailableWithTableName(dom *domain.Domain, t *testing.T, count uint64, labels []string, db string, table string) { tb, err := dom.InfoSchema().TableByName(model.NewCIStr(db), model.NewCIStr(table)) require.NoError(t, err) diff --git a/domain/domain.go b/domain/domain.go index 08f49ed018799..2112ca17fb5e9 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -188,6 +188,7 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i // 1. Not first time bootstrap loading, which needs a full load. // 2. It is newer than the current one, so it will be "the current one" after this function call. // 3. There are less 100 diffs. + // 4. No regenrated schema diff. startTime := time.Now() if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 { is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion) @@ -347,6 +348,9 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64 if err != nil { return nil, nil, err } + if diff.RegenerateSchemaMap { + return nil, nil, errors.Errorf("Meets a schema diff with RegenerateSchemaMap flag") + } if canSkipSchemaCheckerDDL(diff.Type) { continue } diff --git a/domain/schema_validator.go b/domain/schema_validator.go index eb933adbc899c..4d7cf2e9b814a 100644 --- a/domain/schema_validator.go +++ b/domain/schema_validator.go @@ -185,7 +185,7 @@ func (s *schemaValidator) isRelatedTablesChanged(currVer int64, tableIDs []int64 affected := false for i, tblID := range item.relatedIDs { for _, relatedTblID := range tableIDs { - if tblID == relatedTblID { + if tblID == relatedTblID || relatedTblID == -1 { // if actionType >= 64, the value of left shift equals 0, and it will not impact amend txn changedTblMap[tblID] |= 1 << item.relatedActions[i] affected = true diff --git a/executor/recover_test.go b/executor/recover_test.go index aad1c93d9fb87..a7d26f247c952 100644 --- a/executor/recover_test.go +++ b/executor/recover_test.go @@ -332,10 +332,12 @@ func TestRecoverClusterMeetError(t *testing.T) { newTk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(0-30*time.Second)), errno.ErrPrivilegeCheckFail) tk.MustExec("drop user 'testflashback'@'localhost';") - // Flashback failed because of ddl history. - tk.MustExec("use test;") - tk.MustExec("create table t(a int);") - tk.MustMatchErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs), "Detected schema change due to another DDL job during \\[.*, now\\), can't do flashback") + // update tidb_server_version + nowTS, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + tk.MustExec("update mysql.tidb set VARIABLE_VALUE=VARIABLE_VALUE+1 where VARIABLE_NAME='tidb_server_version'") + errorMsg := fmt.Sprintf("[ddl:-1]Detected TiDB upgrade during [%s, now), can't do flashback", oracle.GetTimeFromTS(nowTS).String()) + tk.MustGetErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(nowTS)), errorMsg) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) diff --git a/parser/model/ddl.go b/parser/model/ddl.go index c278ffb56167d..4c09f06c29152 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -941,6 +941,8 @@ type SchemaDiff struct { OldTableID int64 `json:"old_table_id"` // OldSchemaID is the schema ID before rename table, only used by rename table DDL. OldSchemaID int64 `json:"old_schema_id"` + // RegenerateSchemaMap means whether to rebuild the schema map when applying to the schema diff. + RegenerateSchemaMap bool `json:"regenerate_schema_map"` AffectedOpts []*AffectedOption `json:"affected_options"` } diff --git a/tests/realtikvtest/brietest/BUILD.bazel b/tests/realtikvtest/brietest/BUILD.bazel index 62de71ea3b77d..c3118c4d7a88a 100644 --- a/tests/realtikvtest/brietest/BUILD.bazel +++ b/tests/realtikvtest/brietest/BUILD.bazel @@ -6,26 +6,20 @@ go_test( srcs = [ "backup_restore_test.go", "binlog_test.go", - "flashback_test.go", "main_test.go", ], flaky = True, race = "on", deps = [ "//config", - "//ddl/util", - "//parser/model", "//parser/mysql", "//sessionctx/binloginfo", "//store/mockstore/mockcopr", "//testkit", "//testkit/testsetup", "//tests/realtikvtest", - "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_tipb//go-binlog", "@com_github_stretchr_testify//require", - "@com_github_tikv_client_go_v2//oracle", - "@com_github_tikv_client_go_v2//util", "@org_golang_google_grpc//:grpc", "@org_uber_go_goleak//:goleak", ], diff --git a/tests/realtikvtest/brietest/flashback_test.go b/tests/realtikvtest/brietest/flashback_test.go deleted file mode 100644 index 470a62fb90d93..0000000000000 --- a/tests/realtikvtest/brietest/flashback_test.go +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package brietest - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/pingcap/failpoint" - ddlutil "github.com/pingcap/tidb/ddl/util" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/testkit" - "github.com/pingcap/tidb/tests/realtikvtest" - "github.com/stretchr/testify/require" - "github.com/tikv/client-go/v2/oracle" - tikvutil "github.com/tikv/client-go/v2/util" -) - -// MockGC is used to make GC work in the test environment. -func MockGC(tk *testkit.TestKit) (string, string, string, func()) { - originGC := ddlutil.IsEmulatorGCEnable() - resetGC := func() { - if originGC { - ddlutil.EmulatorGCEnable() - } else { - ddlutil.EmulatorGCDisable() - } - } - - // disable emulator GC. - // Otherwise emulator GC will delete table record as soon as possible after execute drop table ddl. - ddlutil.EmulatorGCDisable() - timeBeforeDrop := time.Now().Add(0 - 48*60*60*time.Second).Format(tikvutil.GCTimeFormat) - timeAfterDrop := time.Now().Add(48 * 60 * 60 * time.Second).Format(tikvutil.GCTimeFormat) - safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '') - ON DUPLICATE KEY - UPDATE variable_value = '%[1]s'` - // clear GC variables first. - tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point','tikv_gc_enable' )") - return timeBeforeDrop, timeAfterDrop, safePointSQL, resetGC -} - -func TestFlashback(t *testing.T) { - if *realtikvtest.WithRealTiKV { - store := realtikvtest.CreateMockStoreAndSetup(t) - - tk := testkit.NewTestKit(t, store) - - timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) - defer resetGC() - - tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) - tk.MustExec("use test") - tk.MustExec("drop table if exists t") - tk.MustExec("create table t(a int, index i(a))") - tk.MustExec("insert t values (1), (2), (3)") - - time.Sleep(1 * time.Second) - - ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) - require.NoError(t, err) - - injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - - tk.MustExec("insert t values (4), (5), (6)") - tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) - - tk.MustExec("admin check table t") - require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3") - require.Equal(t, tk.MustQuery("select max(a) from t use index(i)").Rows()[0][0], "3") - - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) - } -} - -func TestPrepareFlashbackFailed(t *testing.T) { - if *realtikvtest.WithRealTiKV { - store := realtikvtest.CreateMockStoreAndSetup(t) - - tk := testkit.NewTestKit(t, store) - - timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) - defer resetGC() - - tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) - tk.MustExec("use test") - tk.MustExec("drop table if exists t") - tk.MustExec("create table t(a int, index i(a))") - tk.MustExec("insert t values (1), (2), (3)") - - time.Sleep(1 * time.Second) - - ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) - require.NoError(t, err) - - injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) - require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", - fmt.Sprintf("return(%v)", injectSafeTS))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockPrepareMeetsEpochNotMatch", `return(true)`)) - - tk.MustExec("insert t values (4), (5), (6)") - tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) - - tk.MustExec("admin check table t") - require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3") - require.Equal(t, tk.MustQuery("select max(a) from t use index(i)").Rows()[0][0], "3") - - jobMeta := tk.MustQuery("select job_meta from mysql.tidb_ddl_history order by job_id desc limit 1").Rows()[0][0].(string) - job := model.Job{} - require.NoError(t, job.Decode([]byte(jobMeta))) - require.Equal(t, job.ErrorCount, int64(0)) - - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) - require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockPrepareMeetsEpochNotMatch")) - } -} diff --git a/tests/realtikvtest/flashbacktest/BUILD.bazel b/tests/realtikvtest/flashbacktest/BUILD.bazel new file mode 100644 index 0000000000000..6e2410abfc1ea --- /dev/null +++ b/tests/realtikvtest/flashbacktest/BUILD.bazel @@ -0,0 +1,22 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "flashbacktest_test", + srcs = [ + "flashback_test.go", + "main_test.go", + ], + deps = [ + "//ddl/util", + "//errno", + "//parser/model", + "//testkit", + "//testkit/testsetup", + "//tests/realtikvtest", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//util", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/tests/realtikvtest/flashbacktest/flashback_test.go b/tests/realtikvtest/flashbacktest/flashback_test.go new file mode 100644 index 0000000000000..50cf274490a5a --- /dev/null +++ b/tests/realtikvtest/flashbacktest/flashback_test.go @@ -0,0 +1,288 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flashbacktest + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/pingcap/failpoint" + ddlutil "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/tests/realtikvtest" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" + tikvutil "github.com/tikv/client-go/v2/util" +) + +// MockGC is used to make GC work in the test environment. +func MockGC(tk *testkit.TestKit) (string, string, string, func()) { + originGC := ddlutil.IsEmulatorGCEnable() + resetGC := func() { + if originGC { + ddlutil.EmulatorGCEnable() + } else { + ddlutil.EmulatorGCDisable() + } + } + + // disable emulator GC. + // Otherwise emulator GC will delete table record as soon as possible after execute drop table ddl. + ddlutil.EmulatorGCDisable() + timeBeforeDrop := time.Now().Add(0 - 48*60*60*time.Second).Format(tikvutil.GCTimeFormat) + timeAfterDrop := time.Now().Add(48 * 60 * 60 * time.Second).Format(tikvutil.GCTimeFormat) + safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '') + ON DUPLICATE KEY + UPDATE variable_value = '%[1]s'` + // clear GC variables first. + tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point','tikv_gc_enable' )") + return timeBeforeDrop, timeAfterDrop, safePointSQL, resetGC +} + +func TestFlashback(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, index i(a))") + tk.MustExec("insert t values (1), (2), (3)") + + time.Sleep(1 * time.Second) + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + + tk.MustExec("insert t values (4), (5), (6)") + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table t") + require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3") + require.Equal(t, tk.MustQuery("select max(a) from t use index(i)").Rows()[0][0], "3") + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) + require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + } +} + +func TestPrepareFlashbackFailed(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, index i(a))") + tk.MustExec("insert t values (1), (2), (3)") + + time.Sleep(1 * time.Second) + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockPrepareMeetsEpochNotMatch", `return(true)`)) + + tk.MustExec("insert t values (4), (5), (6)") + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table t") + require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3") + require.Equal(t, tk.MustQuery("select max(a) from t use index(i)").Rows()[0][0], "3") + + jobMeta := tk.MustQuery("select job_meta from mysql.tidb_ddl_history order by job_id desc limit 1").Rows()[0][0].(string) + job := model.Job{} + require.NoError(t, job.Decode([]byte(jobMeta))) + require.Equal(t, job.ErrorCount, int64(0)) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) + require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockPrepareMeetsEpochNotMatch")) + } +} + +func TestFlashbackAddDropIndex(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, index i(a))") + tk.MustExec("insert t values (1), (2), (3)") + prevGCCount := tk.MustQuery("select count(*) from mysql.gc_delete_range").Rows()[0][0] + + time.Sleep(1 * time.Second) + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + tk.MustExec("alter table t add index k(a)") + require.Equal(t, tk.MustQuery("select max(a) from t use index(k)").Rows()[0][0], "3") + tk.MustExec("alter table t drop index i") + tk.MustGetErrCode("select max(a) from t use index(i)", errno.ErrKeyDoesNotExist) + require.Greater(t, tk.MustQuery("select count(*) from mysql.gc_delete_range").Rows()[0][0], prevGCCount) + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + + tk.MustExec("insert t values (4), (5), (6)") + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table t") + require.Equal(t, tk.MustQuery("select max(a) from t use index(i)").Rows()[0][0], "3") + tk.MustGetErrCode("select max(a) from t use index(k)", errno.ErrKeyDoesNotExist) + require.Equal(t, tk.MustQuery("select count(*) from mysql.gc_delete_range").Rows()[0][0], prevGCCount) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) + require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + } +} + +func TestFlashbackAddDropModifyColumn(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b int, index i(a))") + tk.MustExec("insert t values (1, 1), (2, 2), (3, 3)") + + time.Sleep(1 * time.Second) + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + tk.MustExec("alter table t add column c int") + tk.MustExec("alter table t drop column b") + tk.MustExec("alter table t modify column a tinyint") + require.Equal(t, tk.MustQuery("show create table t").Rows()[0][1], "CREATE TABLE `t` (\n"+ + " `a` tinyint(4) DEFAULT NULL,\n"+ + " `c` int(11) DEFAULT NULL,\n"+ + " KEY `i` (`a`)\n"+ + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin") + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + + tk.MustExec("insert t values (4, 4), (5, 5), (6, 6)") + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table t") + require.Equal(t, tk.MustQuery("show create table t").Rows()[0][1], "CREATE TABLE `t` (\n"+ + " `a` int(11) DEFAULT NULL,\n"+ + " `b` int(11) DEFAULT NULL,\n"+ + " KEY `i` (`a`)\n"+ + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin") + require.Equal(t, tk.MustQuery("select max(b) from t").Rows()[0][0], "3") + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) + require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + } +} + +func TestFlashbackRenameDropCreateTable(t *testing.T) { + if *realtikvtest.WithRealTiKV { + store := realtikvtest.CreateMockStoreAndSetup(t) + + tk := testkit.NewTestKit(t, store) + + timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk) + defer resetGC() + + tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop)) + tk.MustExec("use test") + tk.MustExec("drop table if exists t, t1, t2, t3") + tk.MustExec("create table t(a int, index i(a))") + tk.MustExec("insert t values (1), (2), (3)") + tk.MustExec("create table t1(a int, index i(a))") + tk.MustExec("insert t1 values (4), (5), (6)") + prevGCCount := tk.MustQuery("select count(*) from mysql.gc_delete_range").Rows()[0][0] + + time.Sleep(1 * time.Second) + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + tk.MustExec("rename table t to t3") + tk.MustExec("drop table t1") + tk.MustExec("create table t2(a int, index i(a))") + tk.MustExec("insert t2 values (7), (8), (9)") + + require.Equal(t, tk.MustQuery("select max(a) from t3").Rows()[0][0], "3") + require.Equal(t, tk.MustQuery("select max(a) from t2").Rows()[0][0], "9") + + require.Greater(t, tk.MustQuery("select count(*) from mysql.gc_delete_range").Rows()[0][0], prevGCCount) + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(100 * time.Second)) + + require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts))) + + tk.MustExec("admin check table t") + require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3") + tk.MustExec("admin check table t1") + require.Equal(t, tk.MustQuery("select max(a) from t1").Rows()[0][0], "6") + require.Equal(t, tk.MustQuery("select count(*) from mysql.gc_delete_range").Rows()[0][0], prevGCCount) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) + require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) + } +} diff --git a/tests/realtikvtest/flashbacktest/main_test.go b/tests/realtikvtest/flashbacktest/main_test.go new file mode 100644 index 0000000000000..d24310861a836 --- /dev/null +++ b/tests/realtikvtest/flashbacktest/main_test.go @@ -0,0 +1,39 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flashbacktest + +import ( + "testing" + + "github.com/pingcap/tidb/testkit/testsetup" + "github.com/pingcap/tidb/tests/realtikvtest" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + opts := []goleak.Option{ + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), + goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), + goleak.IgnoreTopFunction("google.golang.org/grpc.(*ccBalancerWrapper).watcher"), + goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*http2Client).keepalive"), + goleak.IgnoreTopFunction("google.golang.org/grpc/internal/transport.(*controlBuffer).get"), + goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"), + goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), + } + testsetup.SetupForCommonTest() + goleak.VerifyTestMain(m, opts...) + realtikvtest.RunTestMain(m) +}