Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: save TS into metadata for external engine, and prepare to use it as startTS #51283

Merged
merged 17 commits into from
Feb 26, 2024
Merged
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5858,13 +5858,13 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sha256 = "bd7e32754f3923cd66a74d7b10f06aa46af512b81fca73f5c1a22286b4f563a3",
strip_prefix = "github.com/pingcap/[email protected]20240206021635-05a3758a1d24",
sha256 = "4670b3192a5c733e5c8fe68c28509da8294bd7a3e95cd1da151d45b66900f584",
strip_prefix = "github.com/pingcap/[email protected]20240223081609-df7de9808067",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240206021635-05a3758a1d24.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240206021635-05a3758a1d24.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240206021635-05a3758a1d24.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240206021635-05a3758a1d24.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240223081609-df7de9808067.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240223081609-df7de9808067.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240223081609-df7de9808067.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240223081609-df7de9808067.zip",
],
)
go_repository(
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type ExternalEngineConfig struct {
// TotalKVCount can be an estimated value.
TotalKVCount int64
CheckHotspot bool
TSOfClose uint64
}

// CheckCtx contains all parameters used in CheckRequirements
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,8 @@ func (m *MemoryIngestData) NewIter(
}
}

// GetTS implements IngestData.GetTS.
func (m *MemoryIngestData) GetTS() uint64 {
// GetStartTS implements IngestData.GetStartTS.
func (m *MemoryIngestData) GetStartTS() uint64 {
return m.ts
}

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestMemoryIngestData(t *testing.T) {
ts: 123,
}

require.EqualValues(t, 123, data.GetTS())
require.EqualValues(t, 123, data.GetStartTS())
testGetFirstAndLastKey(t, data, nil, nil, []byte("key1"), []byte("key5"))
testGetFirstAndLastKey(t, data, []byte("key1"), []byte("key6"), []byte("key1"), []byte("key5"))
testGetFirstAndLastKey(t, data, []byte("key2"), []byte("key5"), []byte("key2"), []byte("key4"))
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestMemoryIngestData(t *testing.T) {
data.keys = encodedKeys
data.values = encodedValues

require.EqualValues(t, 234, data.GetTS())
require.EqualValues(t, 234, data.GetStartTS())
testGetFirstAndLastKey(t, data, nil, nil, []byte("key1"), []byte("key5"))
testGetFirstAndLastKey(t, data, []byte("key1"), []byte("key6"), []byte("key1"), []byte("key5"))
testGetFirstAndLastKey(t, data, []byte("key2"), []byte("key5"), []byte("key2"), []byte("key4"))
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const (
// engineMeta contains some field that is necessary to continue the engine restore/import process.
// These field should be written to disk when we update chunk checkpoint
type engineMeta struct {
TS uint64 `json:"ts"`
StartTS uint64 `json:"ts"`
// Length is the number of KV pairs stored by the engine.
Length atomic.Int64 `json:"length"`
// TotalSize is the total pre-compressed KV byte size stored by engine.
Expand Down Expand Up @@ -1050,9 +1050,9 @@ func (e *Engine) NewIter(
)
}

// GetTS implements IngestData interface.
func (e *Engine) GetTS() uint64 {
return e.TS
// GetStartTS implements IngestData interface.
func (e *Engine) GetStartTS() uint64 {
return e.StartTS
}

// IncRef implements IngestData interface.
Expand Down
30 changes: 9 additions & 21 deletions br/pkg/lightning/backend/local/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/docker/go-units"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/external"
"github.com/pingcap/tidb/br/pkg/lightning/common"
Expand All @@ -38,7 +37,6 @@ import (
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/status"
)

var (
Expand Down Expand Up @@ -290,11 +288,6 @@ func (em *engineManager) closeEngine(
store.Close()
}
}()
physical, logical, err := em.GetTS(ctx)
if err != nil {
return err
}
ts := oracle.ComposeTS(physical, logical)
externalEngine := external.NewExternalEngine(
store,
externalCfg.DataFiles,
Expand All @@ -308,7 +301,7 @@ func (em *engineManager) closeEngine(
em.duplicateDB,
em.DuplicateDetectOpt,
em.WorkerConcurrency,
ts,
externalCfg.TSOfClose,
externalCfg.TotalFileSize,
externalCfg.TotalKVCount,
externalCfg.CheckHotspot,
Expand Down Expand Up @@ -339,7 +332,7 @@ func (em *engineManager) closeEngine(
engine.db.Store(db)
engine.sstIngester = dbSSTIngester{e: engine}
if err = engine.loadEngineMeta(); err != nil {
return err
return errors.Trace(err)
}
em.engines.Store(engineUUID, engine)
return nil
Expand Down Expand Up @@ -404,6 +397,7 @@ func (em *engineManager) resetEngine(ctx context.Context, engineUUID uuid.UUID)
return nil
}
defer localEngine.unlock()
oldStartTS := localEngine.StartTS
if err := localEngine.Close(); err != nil {
return err
}
Expand All @@ -413,37 +407,31 @@ func (em *engineManager) resetEngine(ctx context.Context, engineUUID uuid.UUID)
db, err := em.openEngineDB(engineUUID, false)
if err == nil {
localEngine.db.Store(db)
localEngine.engineMeta = engineMeta{}
localEngine.engineMeta = engineMeta{StartTS: oldStartTS}
if err = localEngine.saveEngineMeta(); err != nil {
return errors.Trace(err)
}
if !common.IsDirExists(localEngine.sstDir) {
if err := os.Mkdir(localEngine.sstDir, 0o750); err != nil {
return errors.Trace(err)
}
}
if err = em.allocateTSIfNotExists(ctx, localEngine); err != nil {
return errors.Trace(err)
}
failpoint.Inject("mockAllocateTSErr", func() {
// mock generate timestamp error when reset engine.
localEngine.TS = 0
mockGRPCErr, _ := status.FromError(errors.Errorf("mock generate timestamp error"))
failpoint.Return(errors.Trace(mockGRPCErr.Err()))
})
}
localEngine.pendingFileSize.Store(0)

return err
}

func (em *engineManager) allocateTSIfNotExists(ctx context.Context, engine *Engine) error {
if engine.TS > 0 {
if engine.StartTS > 0 {
return nil
}
physical, logical, err := em.GetTS(ctx)
if err != nil {
return err
}
ts := oracle.ComposeTS(physical, logical)
engine.TS = ts
engine.StartTS = ts
return engine.saveEngineMeta()
}

Expand Down
1 change: 0 additions & 1 deletion br/pkg/lightning/backend/local/engine_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func TestEngineManager(t *testing.T) {

// reset non-existent engine should work
require.NoError(t, em.resetEngine(ctx, uuid.New()))
storeHelper.EXPECT().GetTS(gomock.Any()).Return(int64(0), int64(0), nil)
require.NoError(t, em.resetEngine(ctx, engine1ID))
require.Equal(t, 1, syncMapLen(&em.engines))
_, ok = em.engines.Load(engine1ID)
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,11 @@ func NewBackend(
pdAddrs = pdSvcDiscovery.GetServiceURLs()
// TODO(lance6716): if PD client can support creating a client with external
// service discovery, we can directly pass pdSvcDiscovery.
} else {
}
if len(pdAddrs) == 0 {
pdAddrs = strings.Split(config.PDAddr, ",")
}

pdCli, err := pd.NewClientWithContext(
ctx, pdAddrs, tls.ToPDSecurityOption(),
pd.WithGRPCDialOptions(maxCallMsgSize...),
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,7 @@ func (m mockIngestData) NewIter(_ context.Context, lowerBound, upperBound []byte
return &mockIngestIter{data: m, startIdx: i, endIdx: j, curIdx: i}
}

func (m mockIngestData) GetTS() uint64 { return 0 }
func (m mockIngestData) GetStartTS() uint64 { return 0 }

func (m mockIngestData) IncRef() {}

Expand Down
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
End: lastKey,
},
ApiVersion: apiVersion,
StartTs: j.ingestData.GetStartTS(),
}

failpoint.Inject("changeEpochVersion", func(val failpoint.Value) {
Expand Down Expand Up @@ -318,7 +319,8 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
clients = append(clients, wstream)
allPeers = append(allPeers, peer)
}
dataCommitTS := j.ingestData.GetTS()
dataCommitTS := j.ingestData.GetStartTS()
// TODO(lance6716): use local.pdCli.GetTS() to get a new TS as commit TS after deduplication is ready.
req.Chunk = &sst.WriteRequest_Batch{
Batch: &sst.WriteBatch{
CommitTs: dataCommitTS,
Expand Down
7 changes: 5 additions & 2 deletions br/pkg/lightning/common/ingest_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ type IngestData interface {
// these memories will be allocated from given bufPool and be released when the
// iterator is closed or ForwardIter.ReleaseBuf is called.
NewIter(ctx context.Context, lowerBound, upperBound []byte, bufPool *membuf.Pool) ForwardIter
// GetTS will be used as the start/commit TS of the data.
GetTS() uint64
// GetStartTS return the startTS when ingest. This TS should keep unchanged
// during retries of ingest. For local engine, it's set when engine is opened to
// avoid change too much code. For external engine, it's set when engine is
// closed.
GetStartTS() uint64
// IncRef should be called every time when IngestData is referred by regionJob.
// Multiple regionJob can share one IngestData. Same amount of DecRef should be
// called to release the IngestData.
Expand Down
12 changes: 11 additions & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,9 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {

// Disable GC because TiDB enables GC already.
urlsWithScheme := rc.pdCli.GetServiceDiscovery().GetServiceURLs()
if len(urlsWithScheme) == 0 {
urlsWithScheme = strings.Split(rc.cfg.TiDB.PdAddr, ",")
}
// remove URL scheme
urlsWithoutScheme := make([]string, 0, len(urlsWithScheme))
for _, u := range urlsWithScheme {
Expand Down Expand Up @@ -1840,7 +1843,11 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
}

func (rc *Controller) registerTaskToPD(ctx context.Context) (undo func(), _ error) {
etcdCli, err := dialEtcdWithCfg(ctx, rc.cfg, rc.pdCli.GetServiceDiscovery().GetServiceURLs())
pdAddr := rc.pdCli.GetServiceDiscovery().GetServiceURLs()
if len(pdAddr) == 0 {
pdAddr = strings.Split(rc.cfg.TiDB.PdAddr, ",")
}
etcdCli, err := dialEtcdWithCfg(ctx, rc.cfg, pdAddr)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -2142,6 +2149,9 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
}
if isLocalBackend(rc.cfg) {
pdAddrs := rc.pdCli.GetServiceDiscovery().GetServiceURLs()
if len(pdAddrs) == 0 {
pdAddrs = strings.Split(rc.cfg.TiDB.PdAddr, ",")
}
pdController, err := pdutil.NewPdController(
ctx, pdAddrs, rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption(),
)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ replace (
// fix potential security issue(CVE-2020-26160) introduced by indirect dependency.
github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible
github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117
// remove it after https://github.com/pingcap/kvproto/pull/1224 is merged
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will merge kvproto PR after this PR is approved

github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24 => github.com/pingcap/kvproto v0.0.0-20240223081609-df7de9808067
github.com/pingcap/tidb/pkg/parser => ./pkg/parser

// TODO: `sourcegraph.com/sourcegraph/appdash` has been archived, and the original host has been removed.
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -713,8 +713,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24 h1:pdA3DvkChrIp91JQO89ICT1x/SemOAm7vC848acr5Ik=
github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240223081609-df7de9808067 h1:K0YStutk7w3eMuYljP+rpLOCI/XHhLzRF4P+8CJiBZQ=
github.com/pingcap/kvproto v0.0.0-20240223081609-df7de9808067/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g9Onkvu6UmuBBOeWRGQEjJaT/JY=
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type BackfillSubTaskMeta struct {
RangeSplitKeys [][]byte `json:"range_split_keys,omitempty"`
DataFiles []string `json:"data-files,omitempty"`
StatFiles []string `json:"stat-files,omitempty"`
TSOfClose uint64 `json:"ts_of_close,omitempty"`
// Each group of MetaGroups represents a different index kvs meta.
MetaGroups []*external.SortedKVMeta `json:"meta_groups,omitempty"`
// Only used for adding one single index.
Expand Down
13 changes: 13 additions & 0 deletions pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -419,6 +420,15 @@ func splitSubtaskMetaForOneKVMetaGroup(
// Skip global sort for empty table.
return nil, nil
}
pdCli := store.GetPDClient()
physical, logical, err := pdCli.GetTS(ctx)
if err != nil {
return nil, err
}
failpoint.Inject("mockPhysicalTSForGlobalSort", func(val failpoint.Value) {
i := val.(int)
physical = int64(i)
})
splitter, err := getRangeSplitter(
ctx, store, cloudStorageURI, int64(kvMeta.TotalKVSize), instanceCnt, kvMeta.MultipleFilesStats, logger)
if err != nil {
Expand Down Expand Up @@ -451,6 +461,8 @@ func splitSubtaskMetaForOneKVMetaGroup(
return nil, errors.Errorf("invalid range, startKey: %s, endKey: %s",
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
}
ts := oracle.ComposeTS(physical, logical)
logical++
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
m := &BackfillSubTaskMeta{
MetaGroups: []*external.SortedKVMeta{{
StartKey: startKey,
Expand All @@ -460,6 +472,7 @@ func splitSubtaskMetaForOneKVMetaGroup(
DataFiles: dataFiles,
StatFiles: statFiles,
RangeSplitKeys: rangeSplitKeys,
TSOfClose: ts,
}
metaBytes, err := json.Marshal(m)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/backfilling_import_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
TotalFileSize: int64(all.TotalKVSize),
TotalKVCount: 0,
CheckHotspot: true,
TSOfClose: sm.TSOfClose,
},
}, engineUUID)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/disttask/importinto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"//br/pkg/logutil",
"//br/pkg/storage",
"//br/pkg/utils",
"//pkg/config",
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/planner",
"//pkg/disttask/framework/proto",
Expand Down Expand Up @@ -66,7 +67,9 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
Expand Down
Loading