Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: pick the first file to check schema #27607

Merged
merged 12 commits into from
Aug 27, 2021
29 changes: 20 additions & 9 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"context"
"fmt"
"io"
"math/rand"
"path/filepath"
"reflect"
"sort"
"strconv"
"strings"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -559,27 +561,27 @@ func (rc *Controller) readColumnsAndCount(ctx context.Context, dataFileMeta mydu
}

// SchemaIsValid checks the import file and cluster schema is match.
func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTableMeta) ([]string, error) {
func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTableMeta) ([]string, int, error) {
3pointer marked this conversation as resolved.
Show resolved Hide resolved
msgs := make([]string, 0)
info, ok := rc.dbInfos[tableInfo.DB].Tables[tableInfo.Name]
if !ok {
msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s` doesn't exists,"+
"please give a schema file in source dir or create table manually", tableInfo.DB, tableInfo.Name))
return msgs, nil
return msgs, 0, nil
}

igCols := make(map[string]struct{})
igCol, err := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(tableInfo.DB, tableInfo.Name, rc.cfg.Mydumper.CaseSensitive)
if err != nil {
return nil, errors.Trace(err)
return nil, 0, errors.Trace(err)
}
for _, col := range igCol.Columns {
igCols[col] = struct{}{}
}

if len(tableInfo.DataFiles) == 0 {
log.L().Info("no data files detected", zap.String("db", tableInfo.DB), zap.String("table", tableInfo.Name))
return nil, nil
return nil, 0, nil
}

colCountFromTiDB := len(info.Core.Columns)
Expand All @@ -594,17 +596,26 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab
// tidb_rowid have a default value.
defaultCols[model.ExtraHandleName.String()] = struct{}{}

for _, dataFile := range tableInfo.DataFiles {
// only check the one random file of this table.
dataFiles := make([]mydump.FileInfo, 0, 1)
if len(tableInfo.DataFiles) > 0 {
rand.Seed(time.Now().Unix())
index := rand.Intn(len(tableInfo.DataFiles))
3pointer marked this conversation as resolved.
Show resolved Hide resolved
dataFiles = append(dataFiles, tableInfo.DataFiles[index])
log.L().Info("pick random files to check", zap.String("db", tableInfo.DB),
zap.String("table", tableInfo.Name), zap.String("path", tableInfo.DataFiles[index].FileMeta.Path))
}
for _, dataFile := range dataFiles {
// get columns name from data file.
dataFileMeta := dataFile.FileMeta

if tp := dataFileMeta.Type; tp != mydump.SourceTypeCSV && tp != mydump.SourceTypeSQL && tp != mydump.SourceTypeParquet {
msgs = append(msgs, fmt.Sprintf("file '%s' with unknown source type '%s'", dataFileMeta.Path, dataFileMeta.Type.String()))
return msgs, nil
return msgs, len(dataFiles), nil
}
colsFromDataFile, colCountFromDataFile, err := rc.readColumnsAndCount(ctx, dataFileMeta)
if err != nil {
return nil, errors.Trace(err)
return nil, len(dataFiles), errors.Trace(err)
}
if colsFromDataFile == nil && colCountFromDataFile == 0 {
log.L().Info("file contains no data, skip checking against schema validity", zap.String("path", dataFileMeta.Path))
Expand Down Expand Up @@ -670,10 +681,10 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab
}
}
if len(msgs) > 0 {
return msgs, nil
return msgs, len(dataFiles), nil
}
}
return msgs, nil
return msgs, len(dataFiles), nil
}

func (rc *Controller) SampleDataFromTable(ctx context.Context, dbName string, tableMeta *mydump.MDTableMeta, tableInfo *model.TableInfo) error {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1766,7 +1766,7 @@ func (rc *Controller) DataCheck(ctx context.Context) error {
}

if rc.cfg.App.CheckRequirements && noCheckpoint && rc.cfg.TikvImporter.Backend != config.BackendTiDB {
if msgs, err = rc.SchemaIsValid(ctx, tableInfo); err != nil {
if msgs, _, err = rc.SchemaIsValid(ctx, tableInfo); err != nil {
return errors.Trace(err)
}
if len(msgs) != 0 {
Expand Down
75 changes: 70 additions & 5 deletions br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1900,10 +1900,11 @@ func (s *tableRestoreSuite) TestSchemaIsValid(c *C) {
ignoreColumns []*config.IgnoreColumns
expectMsg string
// MsgNum == 0 means the check passed.
MsgNum int
hasHeader bool
dbInfos map[string]*checkpoints.TidbDBInfo
tableMeta *mydump.MDTableMeta
MsgNum int
hasHeader bool
dbInfos map[string]*checkpoints.TidbDBInfo
tableMeta *mydump.MDTableMeta
checkFilesCount int
}{
// Case 1:
// csv has one column without header.
Expand Down Expand Up @@ -1956,6 +1957,7 @@ func (s *tableRestoreSuite) TestSchemaIsValid(c *C) {
},
},
},
1,
},
// Case 2.1:
// csv has two columns(colA, colB) with the header.
Expand Down Expand Up @@ -2000,6 +2002,7 @@ func (s *tableRestoreSuite) TestSchemaIsValid(c *C) {
},
},
},
1,
},
// Case 2.2:
// csv has two columns(colA, colB) with the header.
Expand Down Expand Up @@ -2051,6 +2054,7 @@ func (s *tableRestoreSuite) TestSchemaIsValid(c *C) {
},
},
},
1,
},
// Case 2.3:
// csv has two columns(colA, colB) with the header.
Expand Down Expand Up @@ -2110,6 +2114,7 @@ func (s *tableRestoreSuite) TestSchemaIsValid(c *C) {
},
},
},
1,
},
// Case 2.4:
// csv has two columns(colA, colB) with the header.
Expand Down Expand Up @@ -2168,6 +2173,7 @@ func (s *tableRestoreSuite) TestSchemaIsValid(c *C) {
},
},
},
1,
},
// Case 3:
// table3's schema file not found.
Expand Down Expand Up @@ -2204,6 +2210,64 @@ func (s *tableRestoreSuite) TestSchemaIsValid(c *C) {
},
},
},
0,
},
// Case 4:
// table4 has two datafiles for table. we only check one random file.
// we expect the check success.
{
[]*config.IgnoreColumns{
{
DB: "db1",
Table: "table2",
Columns: []string{"cola"},
},
},
"",
0,
true,
map[string]*checkpoints.TidbDBInfo{
"db1": {
Name: "db1",
Tables: map[string]*checkpoints.TidbTableInfo{
"table2": {
ID: 1,
DB: "db1",
Name: "table2",
Core: &model.TableInfo{
Columns: []*model.ColumnInfo{
{
// colB has the default value
Name: model.NewCIStr("colB"),
DefaultIsExpr: true,
},
},
},
},
},
},
},
&mydump.MDTableMeta{
DB: "db1",
Name: "table2",
DataFiles: []mydump.FileInfo{
{
FileMeta: mydump.SourceFileMeta{
FileSize: 1 * units.TiB,
Path: case2File,
Type: mydump.SourceTypeCSV,
},
},
{
FileMeta: mydump.SourceFileMeta{
FileSize: 1 * units.TiB,
Path: case2File,
Type: mydump.SourceTypeCSV,
},
},
},
},
1,
},
}

Expand Down Expand Up @@ -2231,7 +2295,8 @@ func (s *tableRestoreSuite) TestSchemaIsValid(c *C) {
dbInfos: ca.dbInfos,
ioWorkers: worker.NewPool(context.Background(), 1, "io"),
}
msgs, err := rc.SchemaIsValid(ctx, ca.tableMeta)
msgs, checkedFilesCount, err := rc.SchemaIsValid(ctx, ca.tableMeta)
c.Assert(checkedFilesCount, Equals, ca.checkFilesCount)
c.Assert(err, IsNil)
c.Assert(msgs, HasLen, ca.MsgNum)
if len(msgs) > 0 {
Expand Down