diff --git a/pkg/ddl/ingest/backend.go b/pkg/ddl/ingest/backend.go index f02ffdbfeb815..89c3795da3048 100644 --- a/pkg/ddl/ingest/backend.go +++ b/pkg/ddl/ingest/backend.go @@ -229,7 +229,10 @@ func (bc *litBackendCtx) Flush(ctx context.Context, mode FlushMode) (flushed, im newTS, err = mgr.refreshTSAndUpdateCP() if err == nil { for _, ei := range bc.engines { - ei.openedEngine.SetTS(newTS) + err = bc.backend.SetTSAfterResetEngine(ei.uuid, newTS) + if err != nil { + return false, false, err + } } } } diff --git a/pkg/lightning/backend/backend.go b/pkg/lightning/backend/backend.go index 9dc9231f8e64c..1a87c5081e5cb 100644 --- a/pkg/lightning/backend/backend.go +++ b/pkg/lightning/backend/backend.go @@ -98,7 +98,10 @@ type EngineConfig struct { // when opening the engine, instead of removing it. KeepSortDir bool // TS is the preset timestamp of data in the engine. When it's 0, the used TS - // will be set lazily. + // will be set lazily. This is used by local backend. This field will be written + // to engineMeta.TS and take effect in below cases: + // - engineManager.openEngine + // - engineManager.closeEngine only for an external engine TS uint64 } @@ -319,13 +322,6 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterCon return engine.backend.LocalWriter(ctx, cfg, engine.uuid) } -// SetTS sets the TS of the engine. In most cases if the caller wants to specify -// TS it should use the TS field in EngineConfig. This method is only used after -// a ResetEngine. -func (engine *OpenedEngine) SetTS(ts uint64) { - engine.config.TS = ts -} - // UnsafeCloseEngine closes the engine without first opening it. // This method is "unsafe" as it does not follow the normal operation sequence // (Open -> Write -> Close -> Import). This method should only be used when one diff --git a/pkg/lightning/backend/local/engine_test.go b/pkg/lightning/backend/local/engine_test.go index e4bfe65a76e20..59f21b3b2bf59 100644 --- a/pkg/lightning/backend/local/engine_test.go +++ b/pkg/lightning/backend/local/engine_test.go @@ -23,6 +23,7 @@ import ( "path/filepath" "sync" "testing" + "time" "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/objstorage/objstorageprovider" @@ -34,6 +35,7 @@ import ( "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/lightning/log" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) func makePebbleDB(t *testing.T, opt *pebble.Options) (*pebble.DB, string) { @@ -68,6 +70,7 @@ func TestGetEngineSizeWhenImport(t *testing.T) { keyAdapter: common.NoopKeyAdapter{}, logger: log.L(), } + f.TS = oracle.GoTimeToTS(time.Now()) f.db.Store(db) // simulate import f.lock(importMutexStateImport) @@ -106,6 +109,7 @@ func TestIngestSSTWithClosedEngine(t *testing.T) { keyAdapter: common.NoopKeyAdapter{}, logger: log.L(), } + f.TS = oracle.GoTimeToTS(time.Now()) f.db.Store(db) f.sstIngester = dbSSTIngester{e: f} sstPath := path.Join(tmpPath, uuid.New().String()+".sst") @@ -142,6 +146,7 @@ func TestGetFirstAndLastKey(t *testing.T) { f := &Engine{ sstDir: tmpPath, } + f.TS = oracle.GoTimeToTS(time.Now()) f.db.Store(db) err := db.Set([]byte("a"), []byte("a"), nil) require.NoError(t, err) @@ -184,6 +189,7 @@ func TestIterOutputHasUniqueMemorySpace(t *testing.T) { f := &Engine{ sstDir: tmpPath, } + f.TS = oracle.GoTimeToTS(time.Now()) f.db.Store(db) err := db.Set([]byte("a"), []byte("a"), nil) require.NoError(t, err) diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index eaeda026560d6..4434caf9c13f3 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -1602,12 +1602,25 @@ func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) err } // ResetEngineSkipAllocTS is like ResetEngine but the inner TS of the engine is -// invalid. Caller must use OpenedEngine.SetTS to set a valid TS before import +// invalid. Caller must use SetTSAfterResetEngine to set a valid TS before import // the engine. func (local *Backend) ResetEngineSkipAllocTS(ctx context.Context, engineUUID uuid.UUID) error { return local.engineMgr.resetEngine(ctx, engineUUID, true) } +// SetTSAfterResetEngine allocates a new TS for the engine after it's reset. +// This is typically called after persisting the chosen TS of the engine to make +// sure TS is not changed after task failover. +func (local *Backend) SetTSAfterResetEngine(engineUUID uuid.UUID, ts uint64) error { + e := local.engineMgr.lockEngine(engineUUID, importMutexStateClose) + if e == nil { + return errors.Errorf("engine %s not found in SetTSAfterResetEngine", engineUUID.String()) + } + defer e.unlock() + e.engineMeta.TS = ts + return e.saveEngineMeta() +} + // CleanupEngine cleanup the engine and reclaim the space. func (local *Backend) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error { return local.engineMgr.cleanupEngine(ctx, engineUUID) diff --git a/pkg/lightning/backend/local/local_test.go b/pkg/lightning/backend/local/local_test.go index bac7821f33122..138d71b86bbf8 100644 --- a/pkg/lightning/backend/local/local_test.go +++ b/pkg/lightning/backend/local/local_test.go @@ -60,6 +60,7 @@ import ( "github.com/pingcap/tidb/pkg/util/hack" "github.com/pingcap/tidb/pkg/util/mathutil" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/http" @@ -353,6 +354,7 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) { keyAdapter: common.NoopKeyAdapter{}, logger: log.L(), } + f.TS = oracle.GoTimeToTS(time.Now()) f.db.Store(db) f.sstIngester = dbSSTIngester{e: f} f.wg.Add(1) @@ -587,6 +589,7 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) { }, logger: log.L(), } + f.TS = oracle.GoTimeToTS(time.Now()) f.db.Store(db) createSSTWriter := func() (*sstWriter, error) { @@ -1176,7 +1179,7 @@ func (m mockIngestData) NewIter(_ context.Context, lowerBound, upperBound []byte return &mockIngestIter{data: m, startIdx: i, endIdx: j, curIdx: i} } -func (m mockIngestData) GetTS() uint64 { return 0 } +func (m mockIngestData) GetTS() uint64 { return oracle.GoTimeToTS(time.Now()) } func (m mockIngestData) IncRef() {} @@ -1565,6 +1568,7 @@ func TestPartialWriteIngestBusy(t *testing.T) { keyAdapter: common.NoopKeyAdapter{}, logger: log.L(), } + f.TS = oracle.GoTimeToTS(time.Now()) f.db.Store(db) err = db.Set([]byte("a"), []byte("a"), nil) require.NoError(t, err) @@ -1708,6 +1712,7 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) { regionSplitKeysCache: [][]byte{{1}, {11}}, regionSplitSize: 1 << 30, } + f.TS = oracle.GoTimeToTS(time.Now()) f.db.Store(db) // keys starts with 0 is meta keys, so we start with 1. for i := byte(1); i <= 10; i++ { diff --git a/pkg/lightning/backend/local/region_job.go b/pkg/lightning/backend/local/region_job.go index 1e538c061af81..f6bb92990d3c4 100644 --- a/pkg/lightning/backend/local/region_job.go +++ b/pkg/lightning/backend/local/region_job.go @@ -43,6 +43,7 @@ import ( util2 "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/intest" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/util" "go.uber.org/zap" "google.golang.org/grpc" @@ -436,6 +437,20 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error { allPeers = append(allPeers, peer) } dataCommitTS := j.ingestData.GetTS() + intest.AssertFunc(func() bool { + timeOfTS := oracle.GetTimeFromTS(dataCommitTS) + now := time.Now() + if timeOfTS.After(now) { + return false + } + if now.Sub(timeOfTS) > 24*time.Hour { + return false + } + return true + }, "TS used in import should in [now-1d, now], but got %d", dataCommitTS) + if dataCommitTS == 0 { + return errors.New("data commitTS is 0") + } req.Chunk = &sst.WriteRequest_Batch{ Batch: &sst.WriteBatch{ CommitTs: dataCommitTS, diff --git a/tests/realtikvtest/addindextest2/BUILD.bazel b/tests/realtikvtest/addindextest2/BUILD.bazel index fe7e8b99d43b8..4bba0c9c929c9 100644 --- a/tests/realtikvtest/addindextest2/BUILD.bazel +++ b/tests/realtikvtest/addindextest2/BUILD.bazel @@ -25,5 +25,6 @@ go_test( "@com_github_phayes_freeport//:freeport", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//oracle", ], ) diff --git a/tests/realtikvtest/addindextest2/global_sort_test.go b/tests/realtikvtest/addindextest2/global_sort_test.go index 4db70c2bfff7b..f7f91e01b82db 100644 --- a/tests/realtikvtest/addindextest2/global_sort_test.go +++ b/tests/realtikvtest/addindextest2/global_sort_test.go @@ -20,6 +20,7 @@ import ( "strconv" "strings" "testing" + "time" "github.com/fsouza/fake-gcs-server/fakestorage" "github.com/phayes/freeport" @@ -37,6 +38,7 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/tests/realtikvtest" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) func init() { @@ -320,7 +322,11 @@ func TestIngestUseGivenTS(t *testing.T) { t.Cleanup(func() { tk.MustExec("set @@global.tidb_cloud_storage_uri = '';") }) - err = failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockTSForGlobalSort", `return(123456789)`) + + presetTS := oracle.GoTimeToTS(time.Now()) + failpointTerm := fmt.Sprintf(`return(%d)`, presetTS) + + err = failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockTSForGlobalSort", failpointTerm) require.NoError(t, err) tk.MustExec("create table t (a int);") @@ -336,10 +342,10 @@ func TestIngestUseGivenTS(t *testing.T) { require.NoError(t, err) tikvStore := dom.Store().(helper.Storage) newHelper := helper.NewHelper(tikvStore) - mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(idxKey, 123456789) + mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(idxKey, presetTS) require.NoError(t, err) require.NotNil(t, mvccResp) require.NotNil(t, mvccResp.Info) require.Greater(t, len(mvccResp.Info.Writes), 0) - require.Equal(t, uint64(123456789), mvccResp.Info.Writes[0].CommitTs) + require.Equal(t, presetTS, mvccResp.Info.Writes[0].CommitTs) } diff --git a/tests/realtikvtest/addindextest3/ingest_test.go b/tests/realtikvtest/addindextest3/ingest_test.go index d451a2e2ee213..00a7f70981a71 100644 --- a/tests/realtikvtest/addindextest3/ingest_test.go +++ b/tests/realtikvtest/addindextest3/ingest_test.go @@ -448,6 +448,34 @@ func TestAddIndexMockFlushError(t *testing.T) { require.True(t, strings.Contains(jobTp, "ingest"), jobTp) } +func TestAddIndexDiskQuotaTS(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("set @@global.tidb_enable_dist_task = 0;") + testAddIndexDiskQuotaTS(t, tk) + tk.MustExec("set @@global.tidb_enable_dist_task = 1;") + testAddIndexDiskQuotaTS(t, tk) +} + +func testAddIndexDiskQuotaTS(t *testing.T, tk *testkit.TestKit) { + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt=1;") + + tk.MustExec("create table t(id int primary key, b int, k int);") + tk.MustQuery("split table t by (30000);").Check(testkit.Rows("1 1")) + tk.MustExec("insert into t values(1, 1, 1);") + tk.MustExec("insert into t values(100000, 1, 1);") + + ingest.ForceSyncFlagForTest = true + tk.MustExec("alter table t add index idx_test(b);") + ingest.ForceSyncFlagForTest = false + tk.MustExec("update t set b = b + 1;") +} + func TestAddIndexRemoteDuplicateCheck(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store)