Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: let ignore columns be compatible with tidb backend (#27850) #30063

Open
wants to merge 5 commits into
base: release-5.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type tidbEncoder struct {
// the index of table columns for each data field.
// index == len(table.columns) means this field is `_tidb_rowid`
columnIdx []int
// the max index used in this chunk, due to the ignore-columns config, we can't
// directly check the total column count, so we fall back to only check that
// the there are enough columns.
columnCnt int
}

Expand Down Expand Up @@ -265,22 +268,27 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
cols := enc.tbl.Cols()

if len(enc.columnIdx) == 0 {
columnCount := 0
columnMaxIdx := -1
columnIdx := make([]int, len(columnPermutation))
for i := 0; i < len(columnPermutation); i++ {
columnIdx[i] = -1
}
for i, idx := range columnPermutation {
if idx >= 0 {
columnIdx[idx] = i
columnCount++
if idx > columnMaxIdx {
columnMaxIdx = idx
}
}
}
enc.columnIdx = columnIdx
enc.columnCnt = columnCount
enc.columnCnt = columnMaxIdx + 1
}

// TODO: since the column count doesn't exactly reflect the real column names, we only check the upper bound currently.
// See: tests/generated_columns/data/gencol.various_types.0.sql this sql has no columns, so encodeLoop will fill the
// column permutation with default, thus enc.columnCnt > len(row).
if len(row) > enc.columnCnt {
if len(row) < enc.columnCnt {
logger.Error("column count mismatch", zap.Ints("column_permutation", columnPermutation),
zap.Array("data", kv.RowArrayMarshaler(row)))
return nil, errors.Errorf("column count mismatch, expected %d, got %d", enc.columnCnt, len(row))
Expand All @@ -289,8 +297,12 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
var encoded strings.Builder
encoded.Grow(8 * len(row))
encoded.WriteByte('(')
cnt := 0
for i, field := range row {
if i != 0 {
if enc.columnIdx[i] < 0 {
continue
}
if cnt > 0 {
encoded.WriteByte(',')
}
datum := field
Expand All @@ -302,6 +314,7 @@ func (enc *tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, co
)
return nil, err
}
cnt++
}
encoded.WriteByte(')')
return tidbRow(encoded.String()), nil
Expand Down Expand Up @@ -456,7 +469,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
serverInfo := version.ParseServerInfo(versionStr)

rows, e := tx.Query(`
SELECT table_name, column_name, column_type, extra
SELECT table_name, column_name, column_type, generation_expression, extra
FROM information_schema.columns
WHERE table_schema = ?
ORDER BY table_name, ordinal_position;
Expand All @@ -472,8 +485,8 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
curTable *model.TableInfo
)
for rows.Next() {
var tableName, columnName, columnType, columnExtra string
if e := rows.Scan(&tableName, &columnName, &columnType, &columnExtra); e != nil {
var tableName, columnName, columnType, generationExpr, columnExtra string
if e := rows.Scan(&tableName, &columnName, &columnType, &generationExpr, &columnExtra); e != nil {
return e
}
if tableName != curTableName {
Expand Down Expand Up @@ -502,6 +515,7 @@ func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName st
FieldType: types.FieldType{
Flag: flag,
},
GeneratedExprString: generationExpr,
})
curColOffset++
}
Expand Down
153 changes: 140 additions & 13 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (s *mysqlSuite) TearDownTest(c *C) {

func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) {
s.mockDB.
ExpectExec("\\QREPLACE INTO `foo`.`bar`(`a`,`b`,`c`,`d`,`e`,`f`,`g`,`h`,`i`,`j`,`k`,`l`,`m`,`n`,`o`) VALUES(18446744073709551615,-9223372036854775808,0,NULL,7.5,5e-324,1.7976931348623157e+308,0,'甲乙丙\\r\\n\\0\\Z''\"\\\\`',x'000000abcdef',2557891634,'12.5',51)\\E").
ExpectExec("\\QREPLACE INTO `foo`.`bar`(`b`,`d`,`e`,`f`,`g`,`h`,`i`,`j`,`k`,`l`,`m`,`n`,`o`) VALUES(-9223372036854775808,NULL,7.5,5e-324,1.7976931348623157e+308,0,'甲乙丙\\r\\n\\0\\Z''\"\\\\`',x'000000abcdef',2557891634,'12.5',51)\\E").
WillReturnResult(sqlmock.NewResult(1, 1))

ctx := context.Background()
Expand All @@ -98,6 +98,9 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) {
perms = append(perms, i)
}
perms = append(perms, -1)
// skip column a,c due to ignore-columns
perms[0] = -1
perms[2] = -1
encoder, err := s.backend.NewEncoder(s.tbl, &kv.SessionOptions{SQLMode: 0, Timestamp: 1234567890})
c.Assert(err, IsNil)
row, err := encoder.Encode(logger, []types.Datum{
Expand All @@ -121,9 +124,15 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) {
row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

writer, err := engine.LocalWriter(ctx, nil)
<<<<<<< HEAD
c.Assert(err, IsNil)
err = writer.WriteRows(ctx, []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows)
c.Assert(err, IsNil)
=======
require.NoError(t, err)
err = writer.WriteRows(ctx, []string{"b", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o"}, dataRows)
require.NoError(t, err)
>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850)
kennytm marked this conversation as resolved.
Show resolved Hide resolved
st, err := writer.Close(ctx)
c.Assert(err, IsNil)
c.Assert(st, IsNil)
Expand All @@ -150,8 +159,13 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) {
c.Assert(err, IsNil)
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
<<<<<<< HEAD
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, 0)
c.Assert(err, IsNil)
=======
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "1.csv", 0)
require.NoError(t, err)
>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850)
row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

writer, err := engine.LocalWriter(ctx, nil)
Expand Down Expand Up @@ -194,8 +208,13 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) {
c.Assert(err, IsNil)
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
<<<<<<< HEAD
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1}, 0)
c.Assert(err, IsNil)
=======
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "3.csv", 0)
require.NoError(t, err)
>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

Expand Down Expand Up @@ -250,10 +269,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_3_x(c *C) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v3.0.18"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "int(10)", "auto_increment"))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "int(10)", "", "auto_increment"))
s.mockDB.ExpectCommit()

bk := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup)
Expand Down Expand Up @@ -282,10 +301,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_0(c *C) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.0"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "bigint(20) unsigned", "auto_increment"))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "bigint(20) unsigned", "", "auto_increment"))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID"}).
AddRow("test", "t", "id", int64(1)))
Expand Down Expand Up @@ -317,10 +336,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_increment(c *C) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.7"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "bigint(20)", ""))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "bigint(20)", "", ""))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}).
AddRow("test", "t", "id", int64(1), "AUTO_INCREMENT"))
Expand Down Expand Up @@ -352,10 +371,10 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_random(c *C) {
s.mockDB.ExpectBegin()
s.mockDB.ExpectQuery("SELECT version()").
WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.7"))
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
s.mockDB.ExpectQuery("\\QSELECT table_name, column_name, column_type, generation_expression, extra FROM information_schema.columns WHERE table_schema = ? ORDER BY table_name, ordinal_position;\\E").
WithArgs("test").
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "extra"}).
AddRow("t", "id", "bigint(20)", ""))
WillReturnRows(sqlmock.NewRows([]string{"table_name", "column_name", "column_type", "generation_expression", "extra"}).
AddRow("t", "id", "bigint(20)", "1 + 2", ""))
s.mockDB.ExpectQuery("SHOW TABLE `test`.`t` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}).
AddRow("test", "t", "id", int64(1), "AUTO_RANDOM"))
Expand All @@ -378,8 +397,116 @@ func (s *mysqlSuite) TestFetchRemoteTableModels_4_x_auto_random(c *C) {
FieldType: types.FieldType{
Flag: mysql.PriKeyFlag,
},
GeneratedExprString: "1 + 2",
},
},
},
<<<<<<< HEAD
})
=======
}, tableInfos)
}

func TestWriteRowsErrorDowngrading(t *testing.T) {
t.Parallel()
nonRetryableError := sql.ErrNoRows
s := createMysqlSuite(t)
defer s.TearDownTest(t)
// First, batch insert, fail and rollback.
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1),(2),(3),(4),(5)\\E").
WillReturnError(nonRetryableError)
// Then, insert row-by-row due to the non-retryable error.
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "7.csv", int64(0), nonRetryableError.Error(), "(1)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(2)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), nonRetryableError.Error(), "(2)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(3)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)").
WillReturnResult(driver.ResultNoRows)
// the forth row will exceed the error threshold, won't record this error
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(4)\\E").
WillReturnError(nonRetryableError)

ctx := context.Background()
logger := log.L()

ignoreBackend := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup,
errormanager.New(s.dbHandle, &config.Config{
App: config.Lightning{
TaskInfoSchemaName: "tidb_lightning_errors",
MaxError: config.MaxError{
Type: *atomic.NewInt64(3),
},
},
}),
)
engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
require.NoError(t, err)

dataRows := ignoreBackend.MakeEmptyRows()
dataChecksum := verification.MakeKVChecksum(0, 0, 0)
indexRows := ignoreBackend.MakeEmptyRows()
indexChecksum := verification.MakeKVChecksum(0, 0, 0)

encoder, err := ignoreBackend.NewEncoder(s.tbl, &kv.SessionOptions{})
require.NoError(t, err)
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "7.csv", 0)
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(2),
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "8.csv", 0)
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(3),
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "9.csv", 0)
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(4),
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "10.csv", 0)
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

row, err = encoder.Encode(logger, []types.Datum{
types.NewIntDatum(5),
}, 1, []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, "11.csv", 0)
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

writer, err := engine.LocalWriter(ctx, nil)
require.NoError(t, err)
err = writer.WriteRows(ctx, []string{"a"}, dataRows)
require.Error(t, err)
st, err := writer.Close(ctx)
require.NoError(t, err)
require.Nil(t, st)
>>>>>>> c68791566... lightning: let ignore columns be compatible with tidb backend (#27850)
}
8 changes: 8 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,14 @@ type IgnoreColumns struct {
Columns []string `toml:"columns" json:"columns"`
}

func (ic *IgnoreColumns) ColumnsMap() map[string]struct{} {
columnMap := make(map[string]struct{}, len(ic.Columns))
for _, c := range ic.Columns {
columnMap[c] = struct{}{}
}
return columnMap
}

// GetIgnoreColumns gets Ignore config by schema name/regex and table name/regex.
func (igCols AllIgnoreColumns) GetIgnoreColumns(db string, table string, caseSensitive bool) (*IgnoreColumns, error) {
if !caseSensitive {
Expand Down
Loading