Skip to content

Commit

Permalink
Merge branch 'release-5.2' into release-5.2-ead31402bccd
Browse files Browse the repository at this point in the history
  • Loading branch information
time-and-fate authored Apr 15, 2022
2 parents 0b4b4cf + 19dd2a6 commit 241380d
Show file tree
Hide file tree
Showing 121 changed files with 1,707 additions and 522 deletions.
2 changes: 0 additions & 2 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ func (r *testBackup) TestSendCreds(c *C) {
c.Assert(err, IsNil)
opts := &storage.ExternalStorageOptions{
SendCredentials: true,
SkipCheckPath: true,
}
_, err = storage.New(r.ctx, backend, opts)
c.Assert(err, IsNil)
Expand All @@ -260,7 +259,6 @@ func (r *testBackup) TestSendCreds(c *C) {
c.Assert(err, IsNil)
opts = &storage.ExternalStorageOptions{
SendCredentials: false,
SkipCheckPath: true,
}
_, err = storage.New(r.ctx, backend, opts)
c.Assert(err, IsNil)
Expand Down
10 changes: 8 additions & 2 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ const (
gRPCBackOffMaxDelay = 10 * time.Minute

// See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360
regionMaxKeyCount = 1_440_000
// lower the max-key-count to avoid tikv trigger region auto split
regionMaxKeyCount = 1_280_000
defaultRegionSplitSize = 96 * units.MiB

propRangeIndex = "tikv.range_index"
Expand Down Expand Up @@ -1499,7 +1500,12 @@ func (local *local) WriteToTiKV(
size := int64(0)
totalCount := int64(0)
firstLoop := true
regionMaxSize := local.regionSplitSize * 4 / 3
// if region-split-size <= 96MiB, we bump the threshold a bit to avoid too many retry split
// because the range-properties is not 100% accurate
regionMaxSize := regionSplitSize
if regionSplitSize <= defaultRegionSplitSize {
regionMaxSize = regionSplitSize * 4 / 3
}

for iter.First(); iter.Valid(); iter.Next() {
size += int64(len(iter.Key()) + len(iter.Value()))
Expand Down
19 changes: 13 additions & 6 deletions br/pkg/lightning/backend/noop/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (b noopBackend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) erro

// LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine.
func (b noopBackend) LocalWriter(context.Context, *backend.LocalWriterConfig, uuid.UUID) (backend.EngineWriter, error) {
return noopWriter{}, nil
return Writer{}, nil
}

func (b noopBackend) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table) error {
Expand Down Expand Up @@ -168,16 +168,23 @@ func (r noopRow) Size() uint64 {
func (r noopRow) ClassifyAndAppend(*kv.Rows, *verification.KVChecksum, *kv.Rows, *verification.KVChecksum) {
}

type noopWriter struct{}
// Writer define a local writer that do nothing.
type Writer struct{}

func (w noopWriter) AppendRows(context.Context, string, []string, kv.Rows) error {
func (w Writer) AppendRows(context.Context, string, []string, kv.Rows) error {
return nil
}

func (w noopWriter) IsSynced() bool {
func (w Writer) IsSynced() bool {
return true
}

func (w noopWriter) Close(context.Context) (backend.ChunkFlushStatus, error) {
return nil, nil
func (w Writer) Close(context.Context) (backend.ChunkFlushStatus, error) {
return trueStatus{}, nil
}

type trueStatus struct{}

func (s trueStatus) Flushed() bool {
return true
}
13 changes: 13 additions & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,19 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.
return errors.Annotate(err, "create storage failed")
}

// return expectedErr means at least meet one file
expectedErr := errors.New("Stop Iter")
walkErr := s.WalkDir(ctx, &storage.WalkOption{ListCount: 1}, func(string, int64) error {
// return an error when meet the first regular file to break the walk loop
return expectedErr
})
if !errors.ErrorEqual(walkErr, expectedErr) {
if walkErr == nil {
return errors.Errorf("data-source-dir '%s' doesn't exist or contains no files", taskCfg.Mydumper.SourceDir)
}
return errors.Annotatef(walkErr, "visit data-source-dir '%s' failed", taskCfg.Mydumper.SourceDir)
}

loadTask := log.L().Begin(zap.InfoLevel, "load data source")
var mdl *mydump.MDLoader
mdl, err = mydump.NewMyDumpLoaderWithStore(ctx, taskCfg, s)
Expand Down
50 changes: 41 additions & 9 deletions br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"context"
"path/filepath"
"sort"
"strings"

"github.com/pingcap/errors"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
Expand All @@ -29,12 +31,30 @@ import (

type MDDatabaseMeta struct {
Name string
SchemaFile string
SchemaFile FileInfo
Tables []*MDTableMeta
Views []*MDTableMeta
charSet string
}

func (m *MDDatabaseMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) (string, error) {
schema, err := ExportStatement(ctx, store, m.SchemaFile, m.charSet)
if err != nil {
log.L().Warn("failed to extract table schema",
zap.String("Path", m.SchemaFile.FileMeta.Path),
log.ShortError(err),
)
schema = nil
}
schemaStr := strings.TrimSpace(string(schema))
// set default if schema sql is empty
if len(schemaStr) == 0 {
schemaStr = "CREATE DATABASE IF NOT EXISTS " + common.EscapeIdentifier(m.Name)
}

return schemaStr, nil
}

type MDTableMeta struct {
DB string
Name string
Expand Down Expand Up @@ -218,7 +238,7 @@ func (s *mdLoaderSetup) setup(ctx context.Context, store storage.ExternalStorage
// setup database schema
if len(s.dbSchemas) != 0 {
for _, fileInfo := range s.dbSchemas {
if _, dbExists := s.insertDB(fileInfo.TableName.Schema, fileInfo.FileMeta.Path); dbExists && s.loader.router == nil {
if _, dbExists := s.insertDB(fileInfo); dbExists && s.loader.router == nil {
return errors.Errorf("invalid database schema file, duplicated item - %s", fileInfo.FileMeta.Path)
}
}
Expand Down Expand Up @@ -405,23 +425,29 @@ func (s *mdLoaderSetup) route() error {
return nil
}

func (s *mdLoaderSetup) insertDB(dbName string, path string) (*MDDatabaseMeta, bool) {
dbIndex, ok := s.dbIndexMap[dbName]
func (s *mdLoaderSetup) insertDB(f FileInfo) (*MDDatabaseMeta, bool) {
dbIndex, ok := s.dbIndexMap[f.TableName.Schema]
if ok {
return s.loader.dbs[dbIndex], true
}
s.dbIndexMap[dbName] = len(s.loader.dbs)
s.dbIndexMap[f.TableName.Schema] = len(s.loader.dbs)
ptr := &MDDatabaseMeta{
Name: dbName,
SchemaFile: path,
Name: f.TableName.Schema,
SchemaFile: f,
charSet: s.loader.charSet,
}
s.loader.dbs = append(s.loader.dbs, ptr)
return ptr, false
}

func (s *mdLoaderSetup) insertTable(fileInfo FileInfo) (*MDTableMeta, bool, bool) {
dbMeta, dbExists := s.insertDB(fileInfo.TableName.Schema, "")
dbFileInfo := FileInfo{
TableName: filter.Table{
Schema: fileInfo.TableName.Schema,
},
FileMeta: SourceFileMeta{Type: SourceTypeSchemaSchema},
}
dbMeta, dbExists := s.insertDB(dbFileInfo)
tableIndex, ok := s.tableIndexMap[fileInfo.TableName]
if ok {
return dbMeta.Tables[tableIndex], dbExists, true
Expand All @@ -441,7 +467,13 @@ func (s *mdLoaderSetup) insertTable(fileInfo FileInfo) (*MDTableMeta, bool, bool
}

func (s *mdLoaderSetup) insertView(fileInfo FileInfo) (bool, bool) {
dbMeta, dbExists := s.insertDB(fileInfo.TableName.Schema, "")
dbFileInfo := FileInfo{
TableName: filter.Table{
Schema: fileInfo.TableName.Schema,
},
FileMeta: SourceFileMeta{Type: SourceTypeSchemaSchema},
}
dbMeta, dbExists := s.insertDB(dbFileInfo)
_, ok := s.tableIndexMap[fileInfo.TableName]
if ok {
meta := &MDTableMeta{
Expand Down
29 changes: 19 additions & 10 deletions br/pkg/lightning/mydump/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ func (s *testMydumpLoaderSuite) TestTableInfoNotFound(c *C) {
loader, err := md.NewMyDumpLoader(ctx, s.cfg)
c.Assert(err, IsNil)
for _, dbMeta := range loader.GetDatabases() {
dbSQL, err := dbMeta.GetSchema(ctx, store)
c.Assert(err, IsNil)
c.Assert(dbSQL, Equals, "CREATE DATABASE IF NOT EXISTS `db`")
for _, tblMeta := range dbMeta.Tables {
sql, err := tblMeta.GetSchema(ctx, store)
c.Assert(sql, Equals, "")
Expand Down Expand Up @@ -271,8 +274,14 @@ func (s *testMydumpLoaderSuite) TestDataWithoutSchema(c *C) {
mdl, err := md.NewMyDumpLoader(context.Background(), s.cfg)
c.Assert(err, IsNil)
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{{
Name: "db",
SchemaFile: "",
Name: "db",
SchemaFile: md.FileInfo{
TableName: filter.Table{
Schema: "db",
Name: "",
},
FileMeta: md.SourceFileMeta{Type: md.SourceTypeSchemaSchema},
},
Tables: []*md.MDTableMeta{{
DB: "db",
Name: "tbl",
Expand Down Expand Up @@ -301,7 +310,7 @@ func (s *testMydumpLoaderSuite) TestTablesWithDots(c *C) {
c.Assert(err, IsNil)
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{{
Name: "db",
SchemaFile: "db-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "db", Name: ""}, FileMeta: md.SourceFileMeta{Path: "db-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "db",
Expand Down Expand Up @@ -395,7 +404,7 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{
{
Name: "a1",
SchemaFile: "a1-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: ""}, FileMeta: md.SourceFileMeta{Path: "a1-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "a1",
Expand Down Expand Up @@ -426,11 +435,11 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
},
{
Name: "d0",
SchemaFile: "d0-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "d0", Name: ""}, FileMeta: md.SourceFileMeta{Path: "d0-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
},
{
Name: "b",
SchemaFile: "a0-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "b", Name: ""}, FileMeta: md.SourceFileMeta{Path: "a0-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "b",
Expand All @@ -448,7 +457,7 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
},
{
Name: "c",
SchemaFile: "c0-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "c", Name: ""}, FileMeta: md.SourceFileMeta{Path: "c0-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "c",
Expand All @@ -462,7 +471,7 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
},
{
Name: "v",
SchemaFile: "e0-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "v", Name: ""}, FileMeta: md.SourceFileMeta{Path: "e0-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "v",
Expand Down Expand Up @@ -551,7 +560,7 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) {
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{
{
Name: "d1",
SchemaFile: filepath.FromSlash("d1/schema.sql"),
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "d1", Name: ""}, FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d1/schema.sql"), Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "d1",
Expand Down Expand Up @@ -604,7 +613,7 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) {
},
{
Name: "d2",
SchemaFile: filepath.FromSlash("d2/schema.sql"),
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "d2", Name: ""}, FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d2/schema.sql"), Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "d2",
Expand Down
16 changes: 8 additions & 8 deletions br/pkg/lightning/restore/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,8 @@ func newTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanCon
}
}

func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "fetch tso from pd failed")
}
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, oracle.ComposeTS(physicalTS, logicalTS)).
func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo, ts uint64) (*RemoteChecksum, error) {
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, ts).
SetConcurrency(e.distSQLScanConcurrency).
Build()
if err != nil {
Expand Down Expand Up @@ -326,12 +322,16 @@ func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo

func (e *tikvChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name)
err := e.manager.addOneJob(ctx, tbl, oracle.ComposeTS(time.Now().Unix()*1000, 0))
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "fetch tso from pd failed")
}
ts := oracle.ComposeTS(physicalTS, logicalTS)
if err := e.manager.addOneJob(ctx, tbl, ts); err != nil {
return nil, errors.Trace(err)
}

return e.checksumDB(ctx, tableInfo)
return e.checksumDB(ctx, tableInfo, ts)
}

type tableChecksumTS struct {
Expand Down
Loading

0 comments on commit 241380d

Please sign in to comment.