From 25cd3c3ed172934fab9847238358c2a9561a0f73 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 19 Jun 2024 10:54:46 +0800 Subject: [PATCH 1/7] backup: use history iterator to scan ddl jobs --- br/pkg/backup/client.go | 79 ++++++++++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 29 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 012984cf39057..ce66b817e9801 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -1000,6 +1000,38 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S if err != nil { return errors.Trace(err) } + + handleJobsFn := func(jobs []*model.Job) (bool, error) { + for _, job := range jobs { + if skipUnsupportedDDLJob(job) { + continue + } + + if (job.State == model.JobStateDone || job.State == model.JobStateSynced) && + (job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) { + if job.BinlogInfo.DBInfo != nil { + // ignore all placement policy info during incremental backup for now. + job.BinlogInfo.DBInfo.PlacementPolicyRef = nil + } + if job.BinlogInfo.TableInfo != nil { + // ignore all placement policy info during incremental backup for now. + job.BinlogInfo.TableInfo.ClearPlacement() + } + jobBytes, err := json.Marshal(job) + if err != nil { + return true, errors.Trace(err) + } + err = metaWriter.Send(jobBytes, metautil.AppendDDL) + if err != nil { + return true, errors.Trace(err) + } + } + // early exits to stop unnecessary scan + return true, nil + } + return true, nil + } + newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver))) allJobs := make([]*model.Job, 0) err = g.UseOneShotSession(store, !needDomain, func(se glue.Session) error { @@ -1014,42 +1046,31 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S return errors.Trace(err) } - historyJobs, err := ddl.GetAllHistoryDDLJobs(newestMeta) + historyJobsIter, err := ddl.GetLastHistoryDDLJobsIterator(newestMeta) if err != nil { return errors.Trace(err) } - log.Debug("get history jobs", zap.Int("jobs", len(historyJobs))) - allJobs = append(allJobs, historyJobs...) - count := 0 - for _, job := range allJobs { - if skipUnsupportedDDLJob(job) { - continue - } + _, err = handleJobsFn(allJobs) + if err != nil { + log.Error("failed to handle job", zap.Error(err)) + return errors.Trace(err) + } + count := len(allJobs) - if (job.State == model.JobStateDone || job.State == model.JobStateSynced) && - (job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) { - if job.BinlogInfo.DBInfo != nil { - // ignore all placement policy info during incremental backup for now. - job.BinlogInfo.DBInfo.PlacementPolicyRef = nil - } - if job.BinlogInfo.TableInfo != nil { - // ignore all placement policy info during incremental backup for now. - job.BinlogInfo.TableInfo.ClearPlacement() - } - jobBytes, err := json.Marshal(job) - if err != nil { - return errors.Trace(err) - } - err = metaWriter.Send(jobBytes, metautil.AppendDDL) - if err != nil { - return errors.Trace(err) - } - count++ + cacheJobs := make([]*model.Job, 0, ddl.DefNumHistoryJobs) + for { + cacheJobs, err = historyJobsIter.GetLastJobs(ddl.DefNumHistoryJobs, cacheJobs) + if err != nil || len(cacheJobs) == 0 { + return errors.Trace(err) + } + count += len(cacheJobs) + finish, err := handleJobsFn(cacheJobs) + if err != nil || finish { + log.Debug("get complete jobs", zap.Int("jobs", count)) + return errors.Trace(err) } } - log.Debug("get completed jobs", zap.Int("jobs", count)) - return nil } func (bc *Client) BuildProgressRangeTree(ranges []rtree.Range) (rtree.ProgressRangeTree, error) { From 6dc866677e36448aeeaf228501b75bc6f590e1e7 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 20 Jun 2024 16:04:06 +0800 Subject: [PATCH 2/7] address comment --- br/pkg/backup/client.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index ce66b817e9801..b88cd5da93c5f 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -1006,6 +1006,10 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S if skipUnsupportedDDLJob(job) { continue } + if job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion <= lastSchemaVersion { + // early exits to stop unnecessary scan + return true, nil + } if (job.State == model.JobStateDone || job.State == model.JobStateSynced) && (job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) { @@ -1026,8 +1030,6 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S return true, errors.Trace(err) } } - // early exits to stop unnecessary scan - return true, nil } return true, nil } From 44c9d008320a550cb26fea9f9f9c9d5b5404a85a Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 24 Jun 2024 16:30:25 +0800 Subject: [PATCH 3/7] add test --- br/pkg/backup/client_test.go | 59 ++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index db6051e84b0c2..55ddcf2e64a74 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -188,6 +188,65 @@ func TestGetTS(t *testing.T) { require.Equal(t, backupts, ts) } +func TestGetHistoryDDLJobs(t *testing.T) { + s := createBackupSuite(t) + + tk := testkit.NewTestKit(t, s.cluster.Storage) + lastTS1, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + require.NoErrorf(t, err, "Error get last ts: %s", err) + tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;") + tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);") + lastTS2, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + require.NoErrorf(t, err, "Error get last ts: %s", err) + tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;") + tk.MustExec("DROP TABLE test_db.test_table1;") + tk.MustExec("DROP DATABASE test_db;") + tk.MustExec("CREATE DATABASE test_db;") + tk.MustExec("USE test_db;") + tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));") + tk.MustExec("RENAME TABLE test_table1 to test_table;") + tk.MustExec("RENAME TABLE test_table to test_table2;") + tk.MustExec("RENAME TABLE test_table2 to test_table;") + lastTS3, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + require.NoErrorf(t, err, "Error get last ts: %s", err) + tk.MustExec("TRUNCATE TABLE test_table;") + ts, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + require.NoErrorf(t, err, "Error get last ts: %s", err) + + checkFn := func(lastTS uint64, ts uint64, jobsCount int) { + cipher := backuppb.CipherInfo{CipherType: encryptionpb.EncryptionMethod_PLAINTEXT} + metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher) + ctx := context.Background() + metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) + s.mockGlue.SetSession(tk.Session()) + err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.cluster.Storage, lastTS, ts, false) + require.NoErrorf(t, err, "Error get ddl jobs: %s", err) + err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL) + require.NoError(t, err, "Flush failed", err) + err = metaWriter.FlushBackupMeta(ctx) + require.NoError(t, err, "Finally flush backup meta failed", err) + + metaBytes, err := s.storage.ReadFile(ctx, metautil.MetaFile) + require.NoError(t, err) + mockMeta := &backuppb.BackupMeta{} + err = proto.Unmarshal(metaBytes, mockMeta) + require.NoError(t, err) + // check the schema version + metaReader := metautil.NewMetaReader(mockMeta, s.storage, &cipher) + allDDLJobsBytes, err := metaReader.ReadDDLs(ctx) + require.NoError(t, err) + var allDDLJobs []*model.Job + err = json.Unmarshal(allDDLJobsBytes, &allDDLJobs) + require.NoError(t, err) + require.Len(t, allDDLJobs, jobsCount) + } + + checkFn(lastTS1, ts, 11) + checkFn(lastTS2, ts, 9) + checkFn(lastTS1, lastTS2, 2) + checkFn(lastTS3, ts, 1) +} + func TestSkipUnsupportedDDLJob(t *testing.T) { s := createBackupSuite(t) From 2b5f7e6332ac53dd3092a5c2d01ca9f167a95b47 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 24 Jun 2024 16:55:55 +0800 Subject: [PATCH 4/7] fix the missing order issue --- br/pkg/backup/client.go | 54 +++++++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index b88cd5da93c5f..522c62dd4d19d 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -8,6 +8,7 @@ import ( "encoding/base64" "encoding/json" "reflect" + "sort" "strings" "time" @@ -1001,14 +1002,16 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S return errors.Trace(err) } - handleJobsFn := func(jobs []*model.Job) (bool, error) { + // determine whether the jobs need to be append into `allJobs` + appendJobsFn := func(jobs []*model.Job) ([]*model.Job, bool) { + appendJobs := make([]*model.Job, 0, len(jobs)) for _, job := range jobs { if skipUnsupportedDDLJob(job) { continue } if job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion <= lastSchemaVersion { // early exits to stop unnecessary scan - return true, nil + return appendJobs, true } if (job.State == model.JobStateDone || job.State == model.JobStateSynced) && @@ -1021,21 +1024,14 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S // ignore all placement policy info during incremental backup for now. job.BinlogInfo.TableInfo.ClearPlacement() } - jobBytes, err := json.Marshal(job) - if err != nil { - return true, errors.Trace(err) - } - err = metaWriter.Send(jobBytes, metautil.AppendDDL) - if err != nil { - return true, errors.Trace(err) - } + appendJobs = append(appendJobs, job) } } - return true, nil + return appendJobs, false } newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver))) - allJobs := make([]*model.Job, 0) + var allJobs []*model.Job err = g.UseOneShotSession(store, !needDomain, func(se glue.Session) error { allJobs, err = ddl.GetAllDDLJobs(se.GetSessionCtx()) if err != nil { @@ -1053,26 +1049,42 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S return errors.Trace(err) } - _, err = handleJobsFn(allJobs) - if err != nil { - log.Error("failed to handle job", zap.Error(err)) - return errors.Trace(err) - } count := len(allJobs) cacheJobs := make([]*model.Job, 0, ddl.DefNumHistoryJobs) for { cacheJobs, err = historyJobsIter.GetLastJobs(ddl.DefNumHistoryJobs, cacheJobs) - if err != nil || len(cacheJobs) == 0 { + if err != nil { return errors.Trace(err) } + if len(cacheJobs) == 0 { + // no more jobs + break + } count += len(cacheJobs) - finish, err := handleJobsFn(cacheJobs) - if err != nil || finish { - log.Debug("get complete jobs", zap.Int("jobs", count)) + jobs, finished := appendJobsFn(cacheJobs) + allJobs = append(allJobs, jobs...) + if finished { + // no more jobs between [LastTS, ts] + break + } + } + log.Debug("get complete jobs", zap.Int("jobs", count)) + // sort by job id with ascend order + sort.Slice(allJobs, func(i, j int) bool { + return allJobs[i].ID < allJobs[j].ID + }) + for _, job := range allJobs { + jobBytes, err := json.Marshal(job) + if err != nil { + return errors.Trace(err) + } + err = metaWriter.Send(jobBytes, metautil.AppendDDL) + if err != nil { return errors.Trace(err) } } + return nil } func (bc *Client) BuildProgressRangeTree(ranges []rtree.Range) (rtree.ProgressRangeTree, error) { From 7044fb9729bb631f7892683b3af3da1d1956e0b9 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 24 Jun 2024 17:00:48 +0800 Subject: [PATCH 5/7] update --- br/pkg/backup/client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 522c62dd4d19d..d2fc4eaaf7fde 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -1044,6 +1044,9 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S return errors.Trace(err) } + // filter out the jobs + allJobs, _ = appendJobsFn(allJobs) + historyJobsIter, err := ddl.GetLastHistoryDDLJobsIterator(newestMeta) if err != nil { return errors.Trace(err) From 4072a0b93f14f54439874976229795a8638ed646 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 24 Jun 2024 17:22:04 +0800 Subject: [PATCH 6/7] fix build --- br/pkg/backup/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/backup/BUILD.bazel b/br/pkg/backup/BUILD.bazel index 251b2c3b75385..760e37c44d26b 100644 --- a/br/pkg/backup/BUILD.bazel +++ b/br/pkg/backup/BUILD.bazel @@ -67,7 +67,7 @@ go_test( embed = [":backup"], flaky = True, race = "on", - shard_count = 10, + shard_count = 11, deps = [ "//br/pkg/conn", "//br/pkg/gluetidb/mock", From 034298f606f1afe5f55d21c7b4357c3040cc1415 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Mon, 24 Jun 2024 19:08:53 +0800 Subject: [PATCH 7/7] address comment --- br/pkg/backup/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index d2fc4eaaf7fde..606f6cf4d67be 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -1064,8 +1064,8 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S // no more jobs break } - count += len(cacheJobs) jobs, finished := appendJobsFn(cacheJobs) + count += len(jobs) allJobs = append(allJobs, jobs...) if finished { // no more jobs between [LastTS, ts]