Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: Support flashback cluster with ddl history #40209

Merged
merged 12 commits into from
Dec 29, 2022
60 changes: 54 additions & 6 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"encoding/hex"
"fmt"
"math"
"strings"
"time"

Expand Down Expand Up @@ -147,6 +148,17 @@ func getTiDBSuperReadOnly(sess sessionctx.Context) (string, error) {
return val, nil
}

func isFlashbackSupportedDDLAction(action model.ActionType) bool {
switch action {
case model.ActionSetTiFlashReplica, model.ActionUpdateTiFlashReplicaStatus, model.ActionAlterPlacementPolicy,
model.ActionAlterTablePlacement, model.ActionAlterTablePartitionPlacement, model.ActionCreatePlacementPolicy,
model.ActionDropPlacementPolicy, model.ActionModifySchemaDefaultPlacement:
return false
default:
return true
}
}

func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) {
if err = ValidateFlashbackTS(d.ctx, sess, flashbackTS); err != nil {
return err
Expand All @@ -170,19 +182,47 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta
return errors.Trace(err)
}

flashbackSchemaVersion, err := meta.NewSnapshotMeta(d.store.GetSnapshot(kv.NewVersion(flashbackTS))).GetSchemaVersion()
flashbackSnapshotMeta := meta.NewSnapshotMeta(d.store.GetSnapshot(kv.NewVersion(flashbackTS)))
flashbackSchemaVersion, err := flashbackSnapshotMeta.GetSchemaVersion()
if err != nil {
return errors.Trace(err)
}

// If flashbackSchemaVersion not same as nowSchemaVersion, we've done ddl during [flashbackTs, now).
flashbackTSString := oracle.GetTimeFromTS(flashbackTS).String()

// Check is there a upgrade during [flashbackTS, now)
Defined2014 marked this conversation as resolved.
Show resolved Hide resolved
sql := fmt.Sprintf("select VARIABLE_VALUE from mysql.tidb as of timestamp '%s' where VARIABLE_NAME='tidb_server_version'", flashbackTSString)
rows, err := newSession(sess).execute(d.ctx, sql, "check_tidb_server_version")
xhebox marked this conversation as resolved.
Show resolved Hide resolved
if err != nil || len(rows) == 0 {
return errors.Errorf("Get history `tidb_server_version` failed, can't do flashback")
}
sql = fmt.Sprintf("select 1 from mysql.tidb where VARIABLE_NAME='tidb_server_version' and VARIABLE_VALUE=%s", rows[0].GetString(0))
rows, err = newSession(sess).execute(d.ctx, sql, "check_tidb_server_version")
if err != nil {
return errors.Trace(err)
}
if len(rows) == 0 {
return errors.Errorf("Detected TiDB upgrade during [%s, now), can't do flashback", flashbackTSString)
}

// Check is there a DDL task at flashbackTS.
sql = fmt.Sprintf("select count(*) from mysql.%s as of timestamp '%s'", JobTable, flashbackTSString)
rows, err = newSession(sess).execute(d.ctx, sql, "check_history_job")
if err != nil || len(rows) == 0 {
return errors.Errorf("Get history ddl jobs failed, can't do flashback")
}
if rows[0].GetInt64(0) != 0 {
return errors.Errorf("Detected another DDL job at %s, can't do flashback", flashbackTSString)
}

// If flashbackSchemaVersion not same as nowSchemaVersion, we should check all schema diffs during [flashbackTs, now).
for i := flashbackSchemaVersion + 1; i <= nowSchemaVersion; i++ {
diff, err := t.GetSchemaDiff(i)
if err != nil {
return errors.Trace(err)
}
if diff != nil && diff.Type != model.ActionFlashbackCluster {
return errors.Errorf("Detected schema change due to another DDL job during [%s, now), can't do flashback", oracle.GetTimeFromTS(flashbackTS))
if diff != nil && !isFlashbackSupportedDDLAction(diff.Type) {
return errors.Errorf("Detected unsupported DDL job type(%s) during [%s, now), can't do flashback", diff.Type.String(), flashbackTSString)
}
}

Expand Down Expand Up @@ -211,7 +251,7 @@ type flashbackID struct {

func addToSlice(schema string, tableName string, tableID int64, flashbackIDs []flashbackID) []flashbackID {
var excluded bool
if filter.IsSystemSchema(schema) && !strings.HasPrefix(tableName, "stats_") {
if filter.IsSystemSchema(schema) && !strings.HasPrefix(tableName, "stats_") && tableName != "gc_delete_range" {
bb7133 marked this conversation as resolved.
Show resolved Hide resolved
excluded = true
}
flashbackIDs = append(flashbackIDs, flashbackID{
Expand Down Expand Up @@ -270,6 +310,14 @@ func GetFlashbackKeyRanges(sess sessionctx.Context) ([]kv.KeyRange, error) {
})
}

// The meta data key ranges.
metaStartKey := tablecodec.EncodeMetaKey(meta.DBkey(0), meta.TableKey(0))
metaEndKey := tablecodec.EncodeMetaKey(meta.DBkey(math.MaxInt64), meta.TableKey(math.MaxInt64))
keyRanges = append(keyRanges, kv.KeyRange{
StartKey: metaStartKey,
EndKey: metaEndKey,
})

return keyRanges, nil
}

Expand Down Expand Up @@ -633,7 +681,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
asyncNotifyEvent(d, &util.Event{Tp: model.ActionFlashbackCluster})
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return ver, nil
return updateSchemaVersion(d, t, job)
}
return ver, nil
}
Expand Down
14 changes: 8 additions & 6 deletions ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ func TestGetFlashbackKeyRanges(t *testing.T) {

kvRanges, err := ddl.GetFlashbackKeyRanges(se)
require.NoError(t, err)
// The results are 6 key ranges
// 0: (stats_meta,stats_histograms,stats_buckets)
// The results are 8 key ranges
// 0: (stats_meta,stats_histograms,stats_buckets, gc_delete_range)
// 1: (stats_feedback)
// 2: (stats_top_n)
// 3: (stats_extended)
// 4: (stats_fm_sketch)
// 5: (stats_history, stats_meta_history)
// 6: (stats_table_locked)
require.Len(t, kvRanges, 7)
// 7: meta Ranges
require.Len(t, kvRanges, 8)

tk.MustExec("use test")
tk.MustExec("CREATE TABLE employees (" +
Expand All @@ -64,7 +65,7 @@ func TestGetFlashbackKeyRanges(t *testing.T) {
");")
tk.MustExec("truncate table mysql.analyze_jobs")

// truncate all `stats_` tables, make table ID consecutive.
// truncate all `stats_` and `gc_delete_range` tables, make table ID consecutive.
tk.MustExec("truncate table mysql.stats_meta")
tk.MustExec("truncate table mysql.stats_histograms")
tk.MustExec("truncate table mysql.stats_buckets")
Expand All @@ -75,14 +76,15 @@ func TestGetFlashbackKeyRanges(t *testing.T) {
tk.MustExec("truncate table mysql.stats_history")
tk.MustExec("truncate table mysql.stats_meta_history")
tk.MustExec("truncate table mysql.stats_table_locked")
tk.MustExec("truncate table mysql.gc_delete_range")
kvRanges, err = ddl.GetFlashbackKeyRanges(se)
require.NoError(t, err)
require.Len(t, kvRanges, 2)
require.Len(t, kvRanges, 3)

tk.MustExec("truncate table test.employees")
kvRanges, err = ddl.GetFlashbackKeyRanges(se)
require.NoError(t, err)
require.Len(t, kvRanges, 1)
require.Len(t, kvRanges, 2)
}

func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,11 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...
OldTableID: recoverTabsInfo[i].TableInfo.ID,
}
}
case model.ActionFlashbackCluster:
diff.TableID = -1
if job.SchemaState == model.StatePublic {
diff.RegenerateSchemaMap = true
}
default:
diff.TableID = job.TableID
}
Expand Down
1 change: 1 addition & 0 deletions ddl/tiflashtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_test(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//testutils",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
Expand Down
39 changes: 39 additions & 0 deletions ddl/tiflashtest/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -438,6 +439,44 @@ func TestTiFlashDropPartition(t *testing.T) {
CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "ddltiflash")
}

func TestTiFlashFlashbackCluster(t *testing.T) {
s, teardown := createTiFlashContext(t)
defer teardown()
tk := testkit.NewTestKit(t, s.store)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
tk.MustExec("insert into t values (1), (2), (3)")

ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)

tk.MustExec("alter table t set tiflash replica 1")
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable)
CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "t")

injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`))
require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", injectSafeTS)))

ChangeGCSafePoint(tk, time.Now().Add(-10*time.Second), "true", "10m0s")
defer func() {
ChangeGCSafePoint(tk, time.Now(), "true", "10m0s")
}()

errorMsg := fmt.Sprintf("[ddl:-1]Detected unsupported DDL job type(%s) during [%s, now), can't do flashback",
model.ActionSetTiFlashReplica.String(), oracle.GetTimeFromTS(ts).String())
tk.MustGetErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errorMsg)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
}

func CheckTableAvailableWithTableName(dom *domain.Domain, t *testing.T, count uint64, labels []string, db string, table string) {
tb, err := dom.InfoSchema().TableByName(model.NewCIStr(db), model.NewCIStr(table))
require.NoError(t, err)
Expand Down
39 changes: 23 additions & 16 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,21 +188,24 @@ func (do *Domain) loadInfoSchema(startTS uint64) (infoschema.InfoSchema, bool, i
// 1. Not first time bootstrap loading, which needs a full load.
// 2. It is newer than the current one, so it will be "the current one" after this function call.
// 3. There are less 100 diffs.
// 4. No regenrated schema diff.
startTime := time.Now()
if currentSchemaVersion != 0 && neededSchemaVersion > currentSchemaVersion && neededSchemaVersion-currentSchemaVersion < 100 {
is, relatedChanges, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion)
if err == nil {
do.infoCache.Insert(is, startTS)
logutil.BgLogger().Info("diff load InfoSchema success",
zap.Int64("currentSchemaVersion", currentSchemaVersion),
zap.Int64("neededSchemaVersion", neededSchemaVersion),
zap.Duration("start time", time.Since(startTime)),
zap.Int64s("phyTblIDs", relatedChanges.PhyTblIDS),
zap.Uint64s("actionTypes", relatedChanges.ActionTypes))
return is, false, currentSchemaVersion, relatedChanges, nil
is, relatedChanges, regenerateSchemaMap, err := do.tryLoadSchemaDiffs(m, currentSchemaVersion, neededSchemaVersion)
Defined2014 marked this conversation as resolved.
Show resolved Hide resolved
if !regenerateSchemaMap {
if err == nil {
do.infoCache.Insert(is, startTS)
logutil.BgLogger().Info("diff load InfoSchema success",
zap.Int64("currentSchemaVersion", currentSchemaVersion),
zap.Int64("neededSchemaVersion", neededSchemaVersion),
zap.Duration("start time", time.Since(startTime)),
zap.Int64s("phyTblIDs", relatedChanges.PhyTblIDS),
zap.Uint64s("actionTypes", relatedChanges.ActionTypes))
return is, false, currentSchemaVersion, relatedChanges, nil
}
// We can fall back to full load, don't need to return the error.
logutil.BgLogger().Error("failed to load schema diff", zap.Error(err))
}
// We can fall back to full load, don't need to return the error.
logutil.BgLogger().Error("failed to load schema diff", zap.Error(err))
}

schemas, err := do.fetchAllSchemasWithTables(m)
Expand Down Expand Up @@ -323,13 +326,14 @@ func (do *Domain) fetchSchemasWithTables(schemas []*model.DBInfo, m *meta.Meta,
// Return true if the schema is loaded successfully.
// Return false if the schema can not be loaded by schema diff, then we need to do full load.
// The second returned value is the delta updated table and partition IDs.
Defined2014 marked this conversation as resolved.
Show resolved Hide resolved
func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, error) {
// The third returned value means whether to reload shema info from TiKV or not.
func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64) (infoschema.InfoSchema, *transaction.RelatedSchemaChange, bool, error) {
var diffs []*model.SchemaDiff
for usedVersion < newVersion {
usedVersion++
diff, err := m.GetSchemaDiff(usedVersion)
if err != nil {
return nil, nil, err
return nil, nil, false, err
}
if diff == nil {
// Empty diff means the txn of generating schema version is committed, but the txn of `runDDLJob` is not or fail.
Expand All @@ -345,7 +349,10 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
for _, diff := range diffs {
IDs, err := builder.ApplyDiff(m, diff)
if err != nil {
return nil, nil, err
return nil, nil, false, err
}
if diff.RegenerateSchemaMap {
return nil, nil, true, nil
}
if canSkipSchemaCheckerDDL(diff.Type) {
continue
Expand All @@ -360,7 +367,7 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
relatedChange := transaction.RelatedSchemaChange{}
relatedChange.PhyTblIDS = phyTblIDs
relatedChange.ActionTypes = actions
return is, &relatedChange, nil
return is, &relatedChange, false, nil
}

func canSkipSchemaCheckerDDL(tp model.ActionType) bool {
Expand Down
2 changes: 1 addition & 1 deletion domain/schema_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (s *schemaValidator) isRelatedTablesChanged(currVer int64, tableIDs []int64
affected := false
for i, tblID := range item.relatedIDs {
for _, relatedTblID := range tableIDs {
if tblID == relatedTblID {
if tblID == relatedTblID || relatedTblID == -1 {
// if actionType >= 64, the value of left shift equals 0, and it will not impact amend txn
changedTblMap[tblID] |= 1 << item.relatedActions[i]
affected = true
Expand Down
10 changes: 6 additions & 4 deletions executor/recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,12 @@ func TestRecoverClusterMeetError(t *testing.T) {
newTk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(0-30*time.Second)), errno.ErrPrivilegeCheckFail)
tk.MustExec("drop user 'testflashback'@'localhost';")

// Flashback failed because of ddl history.
tk.MustExec("use test;")
tk.MustExec("create table t(a int);")
tk.MustMatchErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs), "Detected schema change due to another DDL job during \\[.*, now\\), can't do flashback")
// update tidb_server_version
nowTS, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)
tk.MustExec("update mysql.tidb set VARIABLE_VALUE=VARIABLE_VALUE+1 where VARIABLE_NAME='tidb_server_version'")
errorMsg := fmt.Sprintf("[ddl:-1]Detected TiDB upgrade during [%s, now), can't do flashback", oracle.GetTimeFromTS(nowTS).String())
tk.MustGetErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(nowTS)), errorMsg)

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
Expand Down
2 changes: 2 additions & 0 deletions parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,8 @@ type SchemaDiff struct {
OldTableID int64 `json:"old_table_id"`
// OldSchemaID is the schema ID before rename table, only used by rename table DDL.
OldSchemaID int64 `json:"old_schema_id"`
// RegenerateSchemaMap means whether to rebuild the schema map when applying to the schema diff.
RegenerateSchemaMap bool `json:"regenerate_schema_map"`
Defined2014 marked this conversation as resolved.
Show resolved Hide resolved

AffectedOpts []*AffectedOption `json:"affected_options"`
}
Expand Down
6 changes: 0 additions & 6 deletions tests/realtikvtest/brietest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,20 @@ go_test(
srcs = [
"backup_restore_test.go",
"binlog_test.go",
"flashback_test.go",
"main_test.go",
],
flaky = True,
race = "on",
deps = [
"//config",
"//ddl/util",
"//parser/model",
"//parser/mysql",
"//sessionctx/binloginfo",
"//store/mockstore/mockcopr",
"//testkit",
"//testkit/testsetup",
"//tests/realtikvtest",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_tipb//go-binlog",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//util",
"@org_golang_google_grpc//:grpc",
"@org_uber_go_goleak//:goleak",
],
Expand Down
Loading