Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning, ddl: set TS to engineMeta after ResetEngineSkipAllocTS #57998

Merged
merged 11 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,10 @@ func (bc *litBackendCtx) Flush(ctx context.Context, mode FlushMode) (flushed, im
newTS, err = mgr.refreshTSAndUpdateCP()
if err == nil {
for _, ei := range bc.engines {
ei.openedEngine.SetTS(newTS)
err = bc.backend.SetTSAfterResetEngine(ei.uuid, newTS)
if err != nil {
return false, false, err
}
}
}
}
Expand Down
12 changes: 4 additions & 8 deletions pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ type EngineConfig struct {
// when opening the engine, instead of removing it.
KeepSortDir bool
// TS is the preset timestamp of data in the engine. When it's 0, the used TS
// will be set lazily.
// will be set lazily. This is used by local backend. This field will be written
// to engineMeta.TS and take effect in below cases:
// - engineManager.openEngine
// - engineManager.closeEngine only for an external engine
TS uint64
}

Expand Down Expand Up @@ -319,13 +322,6 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterCon
return engine.backend.LocalWriter(ctx, cfg, engine.uuid)
}

// SetTS sets the TS of the engine. In most cases if the caller wants to specify
// TS it should use the TS field in EngineConfig. This method is only used after
// a ResetEngine.
func (engine *OpenedEngine) SetTS(ts uint64) {
engine.config.TS = ts
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
}

// UnsafeCloseEngine closes the engine without first opening it.
// This method is "unsafe" as it does not follow the normal operation sequence
// (Open -> Write -> Close -> Import). This method should only be used when one
Expand Down
6 changes: 6 additions & 0 deletions pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"sync"
"testing"
"time"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func makePebbleDB(t *testing.T, opt *pebble.Options) (*pebble.DB, string) {
Expand Down Expand Up @@ -68,6 +70,7 @@ func TestGetEngineSizeWhenImport(t *testing.T) {
keyAdapter: common.NoopKeyAdapter{},
logger: log.L(),
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
// simulate import
f.lock(importMutexStateImport)
Expand Down Expand Up @@ -106,6 +109,7 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
keyAdapter: common.NoopKeyAdapter{},
logger: log.L(),
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
f.sstIngester = dbSSTIngester{e: f}
sstPath := path.Join(tmpPath, uuid.New().String()+".sst")
Expand Down Expand Up @@ -142,6 +146,7 @@ func TestGetFirstAndLastKey(t *testing.T) {
f := &Engine{
sstDir: tmpPath,
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
err := db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
Expand Down Expand Up @@ -184,6 +189,7 @@ func TestIterOutputHasUniqueMemorySpace(t *testing.T) {
f := &Engine{
sstDir: tmpPath,
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
err := db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
Expand Down
15 changes: 14 additions & 1 deletion pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1602,12 +1602,25 @@ func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) err
}

// ResetEngineSkipAllocTS is like ResetEngine but the inner TS of the engine is
// invalid. Caller must use OpenedEngine.SetTS to set a valid TS before import
// invalid. Caller must use SetTSAfterResetEngine to set a valid TS before import
// the engine.
func (local *Backend) ResetEngineSkipAllocTS(ctx context.Context, engineUUID uuid.UUID) error {
return local.engineMgr.resetEngine(ctx, engineUUID, true)
}

// SetTSAfterResetEngine allocates a new TS for the engine after it's reset.
// This is typically called after persisting the chosen TS of the engine to make
// sure TS is not changed after task failover.
func (local *Backend) SetTSAfterResetEngine(engineUUID uuid.UUID, ts uint64) error {
e := local.engineMgr.lockEngine(engineUUID, importMutexStateClose)
if e == nil {
return errors.Errorf("engine %s not found in SetTSAfterResetEngine", engineUUID.String())
}
defer e.unlock()
e.engineMeta.TS = ts
return e.saveEngineMeta()
}

// CleanupEngine cleanup the engine and reclaim the space.
func (local *Backend) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error {
return local.engineMgr.cleanupEngine(ctx, engineUUID)
Expand Down
7 changes: 6 additions & 1 deletion pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/pingcap/tidb/pkg/util/mathutil"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/http"
Expand Down Expand Up @@ -353,6 +354,7 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) {
keyAdapter: common.NoopKeyAdapter{},
logger: log.L(),
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
f.sstIngester = dbSSTIngester{e: f}
f.wg.Add(1)
Expand Down Expand Up @@ -587,6 +589,7 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) {
},
logger: log.L(),
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)

createSSTWriter := func() (*sstWriter, error) {
Expand Down Expand Up @@ -1176,7 +1179,7 @@ func (m mockIngestData) NewIter(_ context.Context, lowerBound, upperBound []byte
return &mockIngestIter{data: m, startIdx: i, endIdx: j, curIdx: i}
}

func (m mockIngestData) GetTS() uint64 { return 0 }
func (m mockIngestData) GetTS() uint64 { return oracle.GoTimeToTS(time.Now()) }

func (m mockIngestData) IncRef() {}

Expand Down Expand Up @@ -1565,6 +1568,7 @@ func TestPartialWriteIngestBusy(t *testing.T) {
keyAdapter: common.NoopKeyAdapter{},
logger: log.L(),
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
err = db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1708,6 +1712,7 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) {
regionSplitKeysCache: [][]byte{{1}, {11}},
regionSplitSize: 1 << 30,
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
// keys starts with 0 is meta keys, so we start with 1.
for i := byte(1); i <= 10; i++ {
Expand Down
15 changes: 15 additions & 0 deletions pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
util2 "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -436,6 +437,20 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
allPeers = append(allPeers, peer)
}
dataCommitTS := j.ingestData.GetTS()
intest.AssertFunc(func() bool {
timeOfTS := oracle.GetTimeFromTS(dataCommitTS)
now := time.Now()
if timeOfTS.After(now) {
return false
}
if now.Sub(timeOfTS) > 24*time.Hour {
return false
}
return true
}, "TS used in import should in [now-1d, now], but got %d", dataCommitTS)
if dataCommitTS == 0 {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
return errors.New("data commitTS is 0")
}
req.Chunk = &sst.WriteRequest_Batch{
Batch: &sst.WriteBatch{
CommitTs: dataCommitTS,
Expand Down
1 change: 1 addition & 0 deletions tests/realtikvtest/addindextest2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ go_test(
"@com_github_phayes_freeport//:freeport",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
],
)
12 changes: 9 additions & 3 deletions tests/realtikvtest/addindextest2/global_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/fsouza/fake-gcs-server/fakestorage"
"github.com/phayes/freeport"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func init() {
Expand Down Expand Up @@ -320,7 +322,11 @@ func TestIngestUseGivenTS(t *testing.T) {
t.Cleanup(func() {
tk.MustExec("set @@global.tidb_cloud_storage_uri = '';")
})
err = failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockTSForGlobalSort", `return(123456789)`)

presetTS := oracle.GoTimeToTS(time.Now())
failpointTerm := fmt.Sprintf(`return(%d)`, presetTS)

err = failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockTSForGlobalSort", failpointTerm)
require.NoError(t, err)

tk.MustExec("create table t (a int);")
Expand All @@ -336,10 +342,10 @@ func TestIngestUseGivenTS(t *testing.T) {
require.NoError(t, err)
tikvStore := dom.Store().(helper.Storage)
newHelper := helper.NewHelper(tikvStore)
mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(idxKey, 123456789)
mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(idxKey, presetTS)
require.NoError(t, err)
require.NotNil(t, mvccResp)
require.NotNil(t, mvccResp.Info)
require.Greater(t, len(mvccResp.Info.Writes), 0)
require.Equal(t, uint64(123456789), mvccResp.Info.Writes[0].CommitTs)
require.Equal(t, presetTS, mvccResp.Info.Writes[0].CommitTs)
}
21 changes: 21 additions & 0 deletions tests/realtikvtest/addindextest3/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,27 @@ func TestAddIndexMockFlushError(t *testing.T) {
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}

func TestAddIndexDiskQuotaTS(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know add-index with DXF shouldn't have such issue, but can you add a case to cover it too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I can add one test for DXF enabled. But we should leave more test improvements in future in a separate PR. Like iterate all variables combinations like {DXF = on/off, disk quota = on/off, ...} and provide a helper function to prepare environment, and run the testing logic will all combinations.

store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec("set @@tidb_ddl_reorg_worker_cnt=1;")
tk.MustExec("set @@global.tidb_enable_dist_task = 0;")

tk.MustExec("create table t(id int primary key, b int, k int);")
tk.MustQuery("split table t by (30000);").Check(testkit.Rows("1 1"))
tk.MustExec("insert into t values(1, 1, 1);")
tk.MustExec("insert into t values(100000, 1, 1);")

ingest.ForceSyncFlagForTest = true
tk.MustExec("alter table t add index idx_test(b);")
ingest.ForceSyncFlagForTest = false
tk.MustExec("update t set b = b + 1;")
}

func TestAddIndexRemoteDuplicateCheck(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
Expand Down