Skip to content

Commit

Permalink
lightning: local backend support keyspace (#40628)
Browse files Browse the repository at this point in the history
ref #40531
  • Loading branch information
iosmanthus authored Feb 1, 2023
1 parent 6a18630 commit 83d50c0
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 57 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ go_test(
"//br/pkg/restore/split",
"//br/pkg/utils",
"//ddl",
"//keyspace",
"//kv",
"//parser",
"//parser/ast",
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ type DuplicateManager struct {
tableName string
splitCli split.SplitClient
tikvCli *tikv.KVStore
tikvCodec tikv.Codec
errorMgr *errormanager.ErrorManager
decoder *kv.TableKVDecoder
logger log.Logger
Expand All @@ -401,6 +402,7 @@ func NewDuplicateManager(
tableName string,
splitCli split.SplitClient,
tikvCli *tikv.KVStore,
tikvCodec tikv.Codec,
errMgr *errormanager.ErrorManager,
sessOpts *kv.SessionOptions,
concurrency int,
Expand All @@ -417,6 +419,7 @@ func NewDuplicateManager(
tableName: tableName,
splitCli: splitCli,
tikvCli: tikvCli,
tikvCodec: tikvCodec,
errorMgr: errMgr,
decoder: decoder,
logger: logger,
Expand All @@ -439,6 +442,10 @@ func (m *DuplicateManager) RecordDataConflictError(ctx context.Context, stream D
if err != nil {
return errors.Trace(err)
}
key, err = m.tikvCodec.DecodeKey(key)
if err != nil {
return errors.Trace(err)
}
m.hasDupe.Store(true)

h, err := m.decoder.DecodeHandleFromRowKey(key)
Expand Down Expand Up @@ -504,6 +511,10 @@ func (m *DuplicateManager) RecordIndexConflictError(ctx context.Context, stream
if err != nil {
return errors.Trace(err)
}
key, err = m.tikvCodec.DecodeKey(key)
if err != nil {
return errors.Trace(err)
}
m.hasDupe.Store(true)

h, err := m.decoder.DecodeHandleFromIndex(indexInfo, key, val)
Expand Down Expand Up @@ -581,6 +592,11 @@ func (m *DuplicateManager) buildDupTasks() ([]dupTask, error) {
putToTaskFunc(ranges, indexInfo)
})
}

// Encode all the tasks
for i := range tasks {
tasks[i].StartKey, tasks[i].EndKey = m.tikvCodec.EncodeRange(tasks[i].StartKey, tasks[i].EndKey)
}
return tasks, nil
}

Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/local/duplicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/keyspace"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -52,7 +53,7 @@ func TestBuildDupTask(t *testing.T) {
{&lkv.SessionOptions{IndexID: info.Indices[1].ID}, false},
}
for _, tc := range testCases {
dupMgr, err := local.NewDuplicateManager(tbl, "t", nil, nil, nil,
dupMgr, err := local.NewDuplicateManager(tbl, "t", nil, nil, keyspace.CodecV1, nil,
tc.sessOpt, 4, atomic.NewBool(false), log.FromContext(context.Background()))
require.NoError(t, err)
tasks, err := local.BuildDuplicateTaskForTest(dupMgr)
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/hack"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -1045,6 +1046,8 @@ type Writer struct {
batchSize int64

lastMetaSeq int32

tikvCodec tikv.Codec
}

func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
Expand Down Expand Up @@ -1127,6 +1130,10 @@ func (w *Writer) AppendRows(ctx context.Context, tableName string, columnNames [
return errorEngineClosed
}

for i := range kvs {
kvs[i].Key = w.tikvCodec.EncodeKey(kvs[i].Key)
}

w.Lock()
defer w.Unlock()

Expand Down
54 changes: 35 additions & 19 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,13 @@ func checkTiFlashVersion(ctx context.Context, g glue.Glue, checkCtx *backend.Che
type local struct {
engines sync.Map // sync version of map[uuid.UUID]*Engine

pdCtl *pdutil.PdController
splitCli split.SplitClient
tikvCli *tikvclient.KVStore
tls *common.TLS
pdAddr string
g glue.Glue
pdCtl *pdutil.PdController
splitCli split.SplitClient
tikvCli *tikvclient.KVStore
tls *common.TLS
pdAddr string
g glue.Glue
tikvCodec tikvclient.Codec

localStoreDir string

Expand Down Expand Up @@ -419,6 +420,7 @@ func NewLocalBackend(
g glue.Glue,
maxOpenFiles int,
errorMgr *errormanager.ErrorManager,
keyspaceName string,
) (backend.Backend, error) {
localFile := cfg.TikvImporter.SortedKVDir
rangeConcurrency := cfg.TikvImporter.RangeConcurrency
Expand Down Expand Up @@ -460,8 +462,19 @@ func NewLocalBackend(
if err != nil {
return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
}
rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()))
pdCliForTiKV := tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient())

var pdCliForTiKV *tikvclient.CodecPDClient
if keyspaceName == "" {
pdCliForTiKV = tikvclient.NewCodecPDClient(tikvclient.ModeTxn, pdCtl.GetPDClient())
} else {
pdCliForTiKV, err = tikvclient.NewCodecPDClientWithKeyspace(tikvclient.ModeTxn, pdCtl.GetPDClient(), keyspaceName)
if err != nil {
return backend.MakeBackend(nil), common.ErrCreatePDClient.Wrap(err).GenWithStackByArgs()
}
}

tikvCodec := pdCliForTiKV.GetCodec()
rpcCli := tikvclient.NewRPCClient(tikvclient.WithSecurity(tls.ToTiKVSecurityConfig()), tikvclient.WithCodec(tikvCodec))
tikvCli, err := tikvclient.NewKVStore("lightning-local-backend", pdCliForTiKV, spkv, rpcCli)
if err != nil {
return backend.MakeBackend(nil), common.ErrCreateKVClient.Wrap(err).GenWithStackByArgs()
Expand All @@ -484,13 +497,14 @@ func NewLocalBackend(
LastAlloc = alloc
}
local := &local{
engines: sync.Map{},
pdCtl: pdCtl,
splitCli: splitCli,
tikvCli: tikvCli,
tls: tls,
pdAddr: cfg.TiDB.PdAddr,
g: g,
engines: sync.Map{},
pdCtl: pdCtl,
splitCli: splitCli,
tikvCli: tikvCli,
tls: tls,
pdAddr: cfg.TiDB.PdAddr,
g: g,
tikvCodec: tikvCodec,

localStoreDir: localFile,
rangeConcurrency: worker.NewPool(ctx, rangeConcurrency, "range"),
Expand Down Expand Up @@ -975,6 +989,7 @@ func (local *local) WriteToTiKV(
Start: firstKey,
End: lastKey,
},
ApiVersion: local.tikvCodec.GetAPIVersion(),
}

leaderID := region.Leader.GetId()
Expand Down Expand Up @@ -1676,7 +1691,7 @@ func (local *local) CollectLocalDuplicateRows(ctx context.Context, tbl table.Tab
}()

atomicHasDupe := atomic.NewBool(false)
duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli,
duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec,
local.errorMgr, opts, local.dupeConcurrency, atomicHasDupe, log.FromContext(ctx))
if err != nil {
return false, errors.Trace(err)
Expand All @@ -1694,7 +1709,7 @@ func (local *local) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Ta
}()

atomicHasDupe := atomic.NewBool(false)
duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli,
duplicateManager, err := NewDuplicateManager(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec,
local.errorMgr, opts, local.dupeConcurrency, atomicHasDupe, log.FromContext(ctx))
if err != nil {
return false, errors.Trace(err)
Expand Down Expand Up @@ -1908,16 +1923,17 @@ func (local *local) LocalWriter(ctx context.Context, cfg *backend.LocalWriterCon
return nil, errors.Errorf("could not find engine for %s", engineUUID.String())
}
engine := e.(*Engine)
return openLocalWriter(cfg, engine, local.localWriterMemCacheSize, local.bufferPool.NewBuffer())
return openLocalWriter(cfg, engine, local.tikvCodec, local.localWriterMemCacheSize, local.bufferPool.NewBuffer())
}

func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) {
func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, tikvCodec tikvclient.Codec, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) {
w := &Writer{
engine: engine,
memtableSizeLimit: cacheSize,
kvBuffer: kvBuffer,
isKVSorted: cfg.IsKVSorted,
isWriteBatchSorted: true,
tikvCodec: tikvCodec,
}
// pre-allocate a long enough buffer to avoid a lot of runtime.growslice
// this can help save about 3% of CPU.
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/keyspace"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -332,7 +333,7 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) {
pool := membuf.NewPool()
defer pool.Destroy()
kvBuffer := pool.NewBuffer()
w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, 1024, kvBuffer)
w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, keyspace.CodecV1, 1024, kvBuffer)
require.NoError(t, err)

ctx := context.Background()
Expand Down Expand Up @@ -1290,6 +1291,7 @@ func TestCheckPeersBusy(t *testing.T) {
bufferPool: membuf.NewPool(),
supportMultiIngest: true,
shouldCheckWriteStall: true,
tikvCodec: keyspace.CodecV1,
}

db, tmpPath := makePebbleDB(t, nil)
Expand Down
40 changes: 40 additions & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,38 @@ var (
taskCfgRecorderKey = "taskCfgRecorderKey"
)

func getKeyspaceName(g glue.Glue) (string, error) {
db, err := g.GetDB()
if err != nil {
return "", err
}
if db == nil {
return "", nil
}

rows, err := db.Query("show config where Type = 'tidb' and name = 'keyspace-name'")
if err != nil {
return "", err
}
//nolint: errcheck
defer rows.Close()

var (
_type string
_instance string
_name string
value string
)
if rows.Next() {
err = rows.Scan(&_type, &_instance, &_name, &value)
if err != nil {
return "", err
}
}

return value, rows.Err()
}

func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *options) (err error) {
build.LogInfo(build.Lightning)
o.logger.Info("cfg", zap.Stringer("cfg", taskCfg))
Expand Down Expand Up @@ -541,6 +573,13 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
dbMetas := mdl.GetDatabases()
web.BroadcastInitProgress(dbMetas)

keyspaceName, err := getKeyspaceName(g)
if err != nil {
o.logger.Error("fail to get keyspace name", zap.Error(err))
return errors.Trace(err)
}
o.logger.Info("acquired keyspace name", zap.String("keyspaceName", keyspaceName))

param := &restore.ControllerParam{
DBMetas: dbMetas,
Status: &l.status,
Expand All @@ -550,6 +589,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
CheckpointStorage: o.checkpointStorage,
CheckpointName: o.checkpointName,
DupIndicator: o.dupIndicator,
KeyspaceName: keyspaceName,
}

var procedure *restore.Controller
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_library(
"//br/pkg/version/build",
"//ddl",
"//errno",
"//keyspace",
"//kv",
"//meta/autoid",
"//parser",
Expand Down
Loading

0 comments on commit 83d50c0

Please sign in to comment.