Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: save TS into metadata for external engine, and prepare to use it as startTS #51283

Merged
merged 17 commits into from
Feb 26, 2024
Merged
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -5858,13 +5858,13 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sha256 = "bd7e32754f3923cd66a74d7b10f06aa46af512b81fca73f5c1a22286b4f563a3",
strip_prefix = "github.com/pingcap/[email protected]20240206021635-05a3758a1d24",
sha256 = "4670b3192a5c733e5c8fe68c28509da8294bd7a3e95cd1da151d45b66900f584",
strip_prefix = "github.com/pingcap/[email protected]20240223081609-df7de9808067",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240206021635-05a3758a1d24.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240206021635-05a3758a1d24.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240206021635-05a3758a1d24.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240206021635-05a3758a1d24.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240223081609-df7de9808067.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240223081609-df7de9808067.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240223081609-df7de9808067.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240223081609-df7de9808067.zip",
],
)
go_repository(
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type ExternalEngineConfig struct {
// TotalKVCount can be an estimated value.
TotalKVCount int64
CheckHotspot bool
TSOfClose uint64
}

// CheckCtx contains all parameters used in CheckRequirements
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,8 @@ func (m *MemoryIngestData) NewIter(
}
}

// GetTS implements IngestData.GetTS.
func (m *MemoryIngestData) GetTS() uint64 {
// GetStartTS implements IngestData.GetStartTS.
func (m *MemoryIngestData) GetStartTS() uint64 {
return m.ts
}

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/external/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestMemoryIngestData(t *testing.T) {
ts: 123,
}

require.EqualValues(t, 123, data.GetTS())
require.EqualValues(t, 123, data.GetStartTS())
testGetFirstAndLastKey(t, data, nil, nil, []byte("key1"), []byte("key5"))
testGetFirstAndLastKey(t, data, []byte("key1"), []byte("key6"), []byte("key1"), []byte("key5"))
testGetFirstAndLastKey(t, data, []byte("key2"), []byte("key5"), []byte("key2"), []byte("key4"))
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestMemoryIngestData(t *testing.T) {
data.keys = encodedKeys
data.values = encodedValues

require.EqualValues(t, 234, data.GetTS())
require.EqualValues(t, 234, data.GetStartTS())
testGetFirstAndLastKey(t, data, nil, nil, []byte("key1"), []byte("key5"))
testGetFirstAndLastKey(t, data, []byte("key1"), []byte("key6"), []byte("key1"), []byte("key5"))
testGetFirstAndLastKey(t, data, []byte("key2"), []byte("key5"), []byte("key2"), []byte("key4"))
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const (
// engineMeta contains some field that is necessary to continue the engine restore/import process.
// These field should be written to disk when we update chunk checkpoint
type engineMeta struct {
TS uint64 `json:"ts"`
StartTS uint64 `json:"ts"`
// Length is the number of KV pairs stored by the engine.
Length atomic.Int64 `json:"length"`
// TotalSize is the total pre-compressed KV byte size stored by engine.
Expand Down Expand Up @@ -1050,9 +1050,9 @@ func (e *Engine) NewIter(
)
}

// GetTS implements IngestData interface.
func (e *Engine) GetTS() uint64 {
return e.TS
// GetStartTS implements IngestData interface.
func (e *Engine) GetStartTS() uint64 {
return e.StartTS
}

// IncRef implements IngestData interface.
Expand Down
15 changes: 5 additions & 10 deletions br/pkg/lightning/backend/local/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,6 @@ func (em *engineManager) closeEngine(
store.Close()
}
}()
physical, logical, err := em.GetTS(ctx)
if err != nil {
return err
}
ts := oracle.ComposeTS(physical, logical)
externalEngine := external.NewExternalEngine(
store,
externalCfg.DataFiles,
Expand All @@ -308,7 +303,7 @@ func (em *engineManager) closeEngine(
em.duplicateDB,
em.DuplicateDetectOpt,
em.WorkerConcurrency,
ts,
externalCfg.TSOfClose,
externalCfg.TotalFileSize,
externalCfg.TotalKVCount,
externalCfg.CheckHotspot,
Expand Down Expand Up @@ -339,7 +334,7 @@ func (em *engineManager) closeEngine(
engine.db.Store(db)
engine.sstIngester = dbSSTIngester{e: engine}
if err = engine.loadEngineMeta(); err != nil {
return err
return errors.Trace(err)
}
em.engines.Store(engineUUID, engine)
return nil
Expand Down Expand Up @@ -424,7 +419,7 @@ func (em *engineManager) resetEngine(ctx context.Context, engineUUID uuid.UUID)
}
failpoint.Inject("mockAllocateTSErr", func() {
// mock generate timestamp error when reset engine.
localEngine.TS = 0
localEngine.StartTS = 0
mockGRPCErr, _ := status.FromError(errors.Errorf("mock generate timestamp error"))
failpoint.Return(errors.Trace(mockGRPCErr.Err()))
})
Expand All @@ -435,15 +430,15 @@ func (em *engineManager) resetEngine(ctx context.Context, engineUUID uuid.UUID)
}

func (em *engineManager) allocateTSIfNotExists(ctx context.Context, engine *Engine) error {
if engine.TS > 0 {
if engine.StartTS > 0 {
return nil
}
physical, logical, err := em.GetTS(ctx)
if err != nil {
return err
}
ts := oracle.ComposeTS(physical, logical)
engine.TS = ts
engine.StartTS = ts
return engine.saveEngineMeta()
}

Expand Down
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,11 @@ func NewBackend(
pdAddrs = pdSvcDiscovery.GetServiceURLs()
// TODO(lance6716): if PD client can support creating a client with external
// service discovery, we can directly pass pdSvcDiscovery.
} else {
}
if len(pdAddrs) == 0 {
pdAddrs = strings.Split(config.PDAddr, ",")
}

pdCli, err := pd.NewClientWithContext(
ctx, pdAddrs, tls.ToPDSecurityOption(),
pd.WithGRPCDialOptions(maxCallMsgSize...),
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,7 @@ func (m mockIngestData) NewIter(_ context.Context, lowerBound, upperBound []byte
return &mockIngestIter{data: m, startIdx: i, endIdx: j, curIdx: i}
}

func (m mockIngestData) GetTS() uint64 { return 0 }
func (m mockIngestData) GetStartTS() uint64 { return 0 }

func (m mockIngestData) IncRef() {}

Expand Down
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
End: lastKey,
},
ApiVersion: apiVersion,
StartTs: j.ingestData.GetStartTS(),
}

failpoint.Inject("changeEpochVersion", func(val failpoint.Value) {
Expand Down Expand Up @@ -318,7 +319,8 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
clients = append(clients, wstream)
allPeers = append(allPeers, peer)
}
dataCommitTS := j.ingestData.GetTS()
dataCommitTS := j.ingestData.GetStartTS()
// TODO(lance6716): use local.pdCli.GetTS() to get a new TS as commit TS after deduplication is ready.
req.Chunk = &sst.WriteRequest_Batch{
Batch: &sst.WriteBatch{
CommitTs: dataCommitTS,
Expand Down
7 changes: 5 additions & 2 deletions br/pkg/lightning/common/ingest_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ type IngestData interface {
// these memories will be allocated from given bufPool and be released when the
// iterator is closed or ForwardIter.ReleaseBuf is called.
NewIter(ctx context.Context, lowerBound, upperBound []byte, bufPool *membuf.Pool) ForwardIter
// GetTS will be used as the start/commit TS of the data.
GetTS() uint64
// GetStartTS return the startTS when ingest. This TS should keep unchanged
// during retries of ingest. For local engine, it's set when engine is
// opened/reset to avoid change too much code. For external engine, it's set when
// engine is closed.
GetStartTS() uint64
// IncRef should be called every time when IngestData is referred by regionJob.
// Multiple regionJob can share one IngestData. Same amount of DecRef should be
// called to release the IngestData.
Expand Down
12 changes: 11 additions & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,9 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {

// Disable GC because TiDB enables GC already.
urlsWithScheme := rc.pdCli.GetServiceDiscovery().GetServiceURLs()
if len(urlsWithScheme) == 0 {
urlsWithScheme = strings.Split(rc.cfg.TiDB.PdAddr, ",")
}
// remove URL scheme
urlsWithoutScheme := make([]string, 0, len(urlsWithScheme))
for _, u := range urlsWithScheme {
Expand Down Expand Up @@ -1840,7 +1843,11 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
}

func (rc *Controller) registerTaskToPD(ctx context.Context) (undo func(), _ error) {
etcdCli, err := dialEtcdWithCfg(ctx, rc.cfg, rc.pdCli.GetServiceDiscovery().GetServiceURLs())
pdAddr := rc.pdCli.GetServiceDiscovery().GetServiceURLs()
if len(pdAddr) == 0 {
pdAddr = strings.Split(rc.cfg.TiDB.PdAddr, ",")
}
etcdCli, err := dialEtcdWithCfg(ctx, rc.cfg, pdAddr)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -2142,6 +2149,9 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
}
if isLocalBackend(rc.cfg) {
pdAddrs := rc.pdCli.GetServiceDiscovery().GetServiceURLs()
if len(pdAddrs) == 0 {
pdAddrs = strings.Split(rc.cfg.TiDB.PdAddr, ",")
}
pdController, err := pdutil.NewPdController(
ctx, pdAddrs, rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption(),
)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ replace (
// fix potential security issue(CVE-2020-26160) introduced by indirect dependency.
github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible
github.com/go-ldap/ldap/v3 => github.com/YangKeao/ldap/v3 v3.4.5-0.20230421065457-369a3bab1117
// remove it after https://github.com/pingcap/kvproto/pull/1224 is merged
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will merge kvproto PR after this PR is approved

github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24 => github.com/pingcap/kvproto v0.0.0-20240223081609-df7de9808067
github.com/pingcap/tidb/pkg/parser => ./pkg/parser

// TODO: `sourcegraph.com/sourcegraph/appdash` has been archived, and the original host has been removed.
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -713,8 +713,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24 h1:pdA3DvkChrIp91JQO89ICT1x/SemOAm7vC848acr5Ik=
github.com/pingcap/kvproto v0.0.0-20240206021635-05a3758a1d24/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240223081609-df7de9808067 h1:K0YStutk7w3eMuYljP+rpLOCI/XHhLzRF4P+8CJiBZQ=
github.com/pingcap/kvproto v0.0.0-20240223081609-df7de9808067/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20230317032135-a0d097d16e22 h1:2SOzvGvE8beiC1Y4g9Onkvu6UmuBBOeWRGQEjJaT/JY=
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type BackfillSubTaskMeta struct {
RangeSplitKeys [][]byte `json:"range_split_keys,omitempty"`
DataFiles []string `json:"data-files,omitempty"`
StatFiles []string `json:"stat-files,omitempty"`
TSOfClose uint64 `json:"ts_of_close,omitempty"`
// Each group of MetaGroups represents a different index kvs meta.
MetaGroups []*external.SortedKVMeta `json:"meta_groups,omitempty"`
// Only used for adding one single index.
Expand Down
12 changes: 12 additions & 0 deletions pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -419,6 +420,16 @@ func splitSubtaskMetaForOneKVMetaGroup(
// Skip global sort for empty table.
return nil, nil
}
pdCli := store.GetPDClient()
p, l, err := pdCli.GetTS(ctx)
if err != nil {
return nil, err
}
ts := oracle.ComposeTS(p, l)
failpoint.Inject("mockTSForGlobalSort", func(val failpoint.Value) {
i := val.(int)
ts = uint64(i)
})
splitter, err := getRangeSplitter(
ctx, store, cloudStorageURI, int64(kvMeta.TotalKVSize), instanceCnt, kvMeta.MultipleFilesStats, logger)
if err != nil {
Expand Down Expand Up @@ -460,6 +471,7 @@ func splitSubtaskMetaForOneKVMetaGroup(
DataFiles: dataFiles,
StatFiles: statFiles,
RangeSplitKeys: rangeSplitKeys,
TSOfClose: ts,
}
metaBytes, err := json.Marshal(m)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/backfilling_import_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
TotalFileSize: int64(all.TotalKVSize),
TotalKVCount: 0,
CheckHotspot: true,
TSOfClose: sm.TSOfClose,
},
}, engineUUID)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/disttask/importinto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"//br/pkg/logutil",
"//br/pkg/storage",
"//br/pkg/utils",
"//pkg/config",
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/planner",
"//pkg/disttask/framework/proto",
Expand Down Expand Up @@ -66,7 +67,9 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
Expand Down
26 changes: 26 additions & 0 deletions pkg/disttask/importinto/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"math"
"strconv"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/config"
verify "github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/storage"
tidb "github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/disttask/framework/planner"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/domain/infosync"
Expand All @@ -38,6 +40,8 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -332,6 +336,16 @@ func generateMergeSortSpecs(planCtx planner.PlanCtx) ([]planner.PipelineSpec, er
return result, nil
}

func createPDClientByGlobalCfg() (pd.Client, error) {
tidbCfg := tidb.GetGlobalConfig()
addrs := strings.Split(tidbCfg.Path, ",")
return pd.NewClient(addrs, pd.SecurityOption{
CAPath: tidbCfg.Security.ClusterSSLCA,
CertPath: tidbCfg.Security.ClusterSSLCert,
KeyPath: tidbCfg.Security.ClusterSSLKey,
})
}

func generateWriteIngestSpecs(planCtx planner.PlanCtx, p *LogicalPlan) ([]planner.PipelineSpec, error) {
ctx := planCtx.Ctx
controller, err2 := buildControllerForPlan(p)
Expand Down Expand Up @@ -362,6 +376,17 @@ func generateWriteIngestSpecs(planCtx planner.PlanCtx, p *LogicalPlan) ([]planne
},
}, nil)
})
pdCli, err := createPDClientByGlobalCfg()
if err != nil {
return nil, err
}
pTS, lTS, err := pdCli.GetTS(ctx)
if err != nil {
pdCli.Close()
return nil, err
}
ts := oracle.ComposeTS(pTS, lTS)
pdCli.Close()
specs := make([]planner.PipelineSpec, 0, 16)
for kvGroup, kvMeta := range kvMetas {
splitter, err1 := getRangeSplitter(ctx, controller.GlobalSortStore, kvMeta)
Expand Down Expand Up @@ -409,6 +434,7 @@ func generateWriteIngestSpecs(planCtx planner.PlanCtx, p *LogicalPlan) ([]planne
StatFiles: statFiles,
RangeSplitKeys: rangeSplitKeys,
RangeSplitSize: splitter.GetRangeSplitSize(),
TSOfClose: ts,
}
specs = append(specs, &WriteIngestSpec{m})

Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/importinto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type WriteIngestStepMeta struct {
StatFiles []string `json:"stat-files"`
RangeSplitKeys [][]byte `json:"range-split-keys"`
RangeSplitSize int64 `json:"range-split-size"`
TSOfClose uint64 `json:"ts-of-close"`

Result Result
}
Expand Down
Loading
Loading