Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backup: use history iterator to scan ddl jobs #54100

Merged
merged 7 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ go_test(
embed = [":backup"],
flaky = True,
race = "on",
shard_count = 10,
shard_count = 11,
deps = [
"//br/pkg/conn",
"//br/pkg/gluetidb/mock",
Expand Down
94 changes: 66 additions & 28 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/base64"
"encoding/json"
"reflect"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -1000,8 +1001,37 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S
if err != nil {
return errors.Trace(err)
}

// determine whether the jobs need to be append into `allJobs`
appendJobsFn := func(jobs []*model.Job) ([]*model.Job, bool) {
appendJobs := make([]*model.Job, 0, len(jobs))
for _, job := range jobs {
if skipUnsupportedDDLJob(job) {
continue
}
if job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion <= lastSchemaVersion {
// early exits to stop unnecessary scan
return appendJobs, true
}

if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) {
if job.BinlogInfo.DBInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.DBInfo.PlacementPolicyRef = nil
}
if job.BinlogInfo.TableInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.TableInfo.ClearPlacement()
}
appendJobs = append(appendJobs, job)
}
}
return appendJobs, false
}

newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver)))
allJobs := make([]*model.Job, 0)
var allJobs []*model.Job
err = g.UseOneShotSession(store, !needDomain, func(se glue.Session) error {
allJobs, err = ddl.GetAllDDLJobs(se.GetSessionCtx())
if err != nil {
Expand All @@ -1014,41 +1044,49 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S
return errors.Trace(err)
}

historyJobs, err := ddl.GetAllHistoryDDLJobs(newestMeta)
// filter out the jobs
allJobs, _ = appendJobsFn(allJobs)

historyJobsIter, err := ddl.GetLastHistoryDDLJobsIterator(newestMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("get history jobs", zap.Int("jobs", len(historyJobs)))
allJobs = append(allJobs, historyJobs...)

count := 0
count := len(allJobs)

cacheJobs := make([]*model.Job, 0, ddl.DefNumHistoryJobs)
for {
cacheJobs, err = historyJobsIter.GetLastJobs(ddl.DefNumHistoryJobs, cacheJobs)
if err != nil {
return errors.Trace(err)
}
if len(cacheJobs) == 0 {
// no more jobs
break
}
jobs, finished := appendJobsFn(cacheJobs)
count += len(jobs)
allJobs = append(allJobs, jobs...)
if finished {
// no more jobs between [LastTS, ts]
break
}
}
log.Debug("get complete jobs", zap.Int("jobs", count))
// sort by job id with ascend order
sort.Slice(allJobs, func(i, j int) bool {
return allJobs[i].ID < allJobs[j].ID
})
for _, job := range allJobs {
if skipUnsupportedDDLJob(job) {
continue
jobBytes, err := json.Marshal(job)
if err != nil {
return errors.Trace(err)
}

if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) {
if job.BinlogInfo.DBInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.DBInfo.PlacementPolicyRef = nil
}
if job.BinlogInfo.TableInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.TableInfo.ClearPlacement()
}
jobBytes, err := json.Marshal(job)
if err != nil {
return errors.Trace(err)
}
err = metaWriter.Send(jobBytes, metautil.AppendDDL)
if err != nil {
return errors.Trace(err)
}
count++
err = metaWriter.Send(jobBytes, metautil.AppendDDL)
if err != nil {
return errors.Trace(err)
}
}
log.Debug("get completed jobs", zap.Int("jobs", count))
return nil
}

Expand Down
59 changes: 59 additions & 0 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,65 @@ func TestGetTS(t *testing.T) {
require.Equal(t, backupts, ts)
}

func TestGetHistoryDDLJobs(t *testing.T) {
s := createBackupSuite(t)

tk := testkit.NewTestKit(t, s.cluster.Storage)
lastTS1, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)
tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;")
tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);")
lastTS2, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)
tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;")
tk.MustExec("DROP TABLE test_db.test_table1;")
tk.MustExec("DROP DATABASE test_db;")
tk.MustExec("CREATE DATABASE test_db;")
tk.MustExec("USE test_db;")
tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));")
tk.MustExec("RENAME TABLE test_table1 to test_table;")
tk.MustExec("RENAME TABLE test_table to test_table2;")
tk.MustExec("RENAME TABLE test_table2 to test_table;")
lastTS3, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)
tk.MustExec("TRUNCATE TABLE test_table;")
ts, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)

checkFn := func(lastTS uint64, ts uint64, jobsCount int) {
cipher := backuppb.CipherInfo{CipherType: encryptionpb.EncryptionMethod_PLAINTEXT}
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
s.mockGlue.SetSession(tk.Session())
err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.cluster.Storage, lastTS, ts, false)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoError(t, err, "Flush failed", err)
err = metaWriter.FlushBackupMeta(ctx)
require.NoError(t, err, "Finally flush backup meta failed", err)

metaBytes, err := s.storage.ReadFile(ctx, metautil.MetaFile)
require.NoError(t, err)
mockMeta := &backuppb.BackupMeta{}
err = proto.Unmarshal(metaBytes, mockMeta)
require.NoError(t, err)
// check the schema version
metaReader := metautil.NewMetaReader(mockMeta, s.storage, &cipher)
allDDLJobsBytes, err := metaReader.ReadDDLs(ctx)
require.NoError(t, err)
var allDDLJobs []*model.Job
err = json.Unmarshal(allDDLJobsBytes, &allDDLJobs)
require.NoError(t, err)
require.Len(t, allDDLJobs, jobsCount)
}

checkFn(lastTS1, ts, 11)
checkFn(lastTS2, ts, 9)
checkFn(lastTS1, lastTS2, 2)
checkFn(lastTS3, ts, 1)
}

func TestSkipUnsupportedDDLJob(t *testing.T) {
s := createBackupSuite(t)

Expand Down
Loading