Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Aug 26, 2021
1 parent a1587c8 commit 70a20dc
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 13 deletions.
16 changes: 8 additions & 8 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,27 +558,27 @@ func (rc *Controller) readColumnsAndCount(ctx context.Context, dataFileMeta mydu
}

// SchemaIsValid checks the import file and cluster schema is match.
func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTableMeta) ([]string, int, error) {
func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTableMeta) ([]string, error) {
msgs := make([]string, 0)
info, ok := rc.dbInfos[tableInfo.DB].Tables[tableInfo.Name]
if !ok {
msgs = append(msgs, fmt.Sprintf("TiDB schema `%s`.`%s` doesn't exists,"+
"please give a schema file in source dir or create table manually", tableInfo.DB, tableInfo.Name))
return msgs, 0, nil
return msgs, nil
}

igCols := make(map[string]struct{})
igCol, err := rc.cfg.Mydumper.IgnoreColumns.GetIgnoreColumns(tableInfo.DB, tableInfo.Name, rc.cfg.Mydumper.CaseSensitive)
if err != nil {
return nil, 0, errors.Trace(err)
return nil, errors.Trace(err)
}
for _, col := range igCol.Columns {
igCols[col] = struct{}{}
}

if len(tableInfo.DataFiles) == 0 {
log.L().Info("no data files detected", zap.String("db", tableInfo.DB), zap.String("table", tableInfo.Name))
return nil, 0, nil
return nil, nil
}

colCountFromTiDB := len(info.Core.Columns)
Expand Down Expand Up @@ -606,11 +606,11 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab

if tp := dataFileMeta.Type; tp != mydump.SourceTypeCSV && tp != mydump.SourceTypeSQL && tp != mydump.SourceTypeParquet {
msgs = append(msgs, fmt.Sprintf("file '%s' with unknown source type '%s'", dataFileMeta.Path, dataFileMeta.Type.String()))
return msgs, len(dataFiles), nil
return msgs, nil
}
colsFromDataFile, colCountFromDataFile, err := rc.readColumnsAndCount(ctx, dataFileMeta)
if err != nil {
return nil, len(dataFiles), errors.Trace(err)
return nil, errors.Trace(err)
}
if colsFromDataFile == nil && colCountFromDataFile == 0 {
log.L().Info("file contains no data, skip checking against schema validity", zap.String("path", dataFileMeta.Path))
Expand Down Expand Up @@ -676,10 +676,10 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab
}
}
if len(msgs) > 0 {
return msgs, len(dataFiles), nil
return msgs, nil
}
}
return msgs, len(dataFiles), nil
return msgs, nil
}

func (rc *Controller) SampleDataFromTable(ctx context.Context, dbName string, tableMeta *mydump.MDTableMeta, tableInfo *model.TableInfo) error {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1766,7 +1766,7 @@ func (rc *Controller) DataCheck(ctx context.Context) error {
}

if rc.cfg.App.CheckRequirements && noCheckpoint && rc.cfg.TikvImporter.Backend != config.BackendTiDB {
if msgs, _, err = rc.SchemaIsValid(ctx, tableInfo); err != nil {
if msgs, err = rc.SchemaIsValid(ctx, tableInfo); err != nil {
return errors.Trace(err)
}
if len(msgs) != 0 {
Expand Down
10 changes: 6 additions & 4 deletions br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2213,7 +2213,7 @@ func (s *tableRestoreSuite) TestSchemaIsValid(c *C) {
0,
},
// Case 4:
// table4 has two datafiles for table. we only check one random file.
// table4 has two datafiles for table. we only check the first file.
// we expect the check success.
{
[]*config.IgnoreColumns{
Expand Down Expand Up @@ -2262,7 +2262,10 @@ func (s *tableRestoreSuite) TestSchemaIsValid(c *C) {
FileMeta: mydump.SourceFileMeta{
FileSize: 1 * units.TiB,
Path: case2File,
Type: mydump.SourceTypeCSV,
// This type will make the check failed.
// but it's the second file for table.
// so it's unreachable so this case will success.
Type: mydump.SourceTypeIgnore,
},
},
},
Expand Down Expand Up @@ -2295,8 +2298,7 @@ func (s *tableRestoreSuite) TestSchemaIsValid(c *C) {
dbInfos: ca.dbInfos,
ioWorkers: worker.NewPool(context.Background(), 1, "io"),
}
msgs, checkedFilesCount, err := rc.SchemaIsValid(ctx, ca.tableMeta)
c.Assert(checkedFilesCount, Equals, ca.checkFilesCount)
msgs, err := rc.SchemaIsValid(ctx, ca.tableMeta)
c.Assert(err, IsNil)
c.Assert(msgs, HasLen, ca.MsgNum)
if len(msgs) > 0 {
Expand Down

0 comments on commit 70a20dc

Please sign in to comment.