Skip to content

Commit

Permalink
backend/tidb: add rebase auto id for tidb backend (pingcap#428)
Browse files Browse the repository at this point in the history
* add rebase autoid for tidb backend

* add fetch auto id and a unit test

* avoiding create checksum manager for tidb backend

* fix unit test

* reset the change auto id code since we can depend the logic in tidb side

* also rebase auto random id

* fix auto random

* fix sql

* fix sql

* don't disable pd schedulers for import backend

* simplify the codes

* fix test

* fix test

* update mock
  • Loading branch information
glorv authored Oct 27, 2020
1 parent aa83de1 commit 3ecec63
Show file tree
Hide file tree
Showing 14 changed files with 184 additions and 107 deletions.
9 changes: 4 additions & 5 deletions lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ type AbstractBackend interface {
MaxChunkSize() int

// ShouldPostProcess returns whether KV-specific post-processing should be
// performed for this backend. Post-processing includes checksum, adjusting
// auto-increment ID, and analyze.
// performed for this backend. Post-processing includes checksum and analyze.
ShouldPostProcess() bool

// NewEncoder creates an encoder of a TiDB table.
Expand Down Expand Up @@ -140,7 +139,7 @@ type AbstractBackend interface {
// * State (must be model.StatePublic)
// * Offset (must be 0, 1, 2, ...)
// - PKIsHandle (true = do not generate _tidb_rowid)
FetchRemoteTableModels(schemaName string) ([]*model.TableInfo, error)
FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error)
}

func fetchRemoteTableModelsFromTLS(tls *common.TLS, schema string) ([]*model.TableInfo, error) {
Expand Down Expand Up @@ -209,8 +208,8 @@ func (be Backend) CheckRequirements() error {
return be.abstract.CheckRequirements()
}

func (be Backend) FetchRemoteTableModels(schemaName string) ([]*model.TableInfo, error) {
return be.abstract.FetchRemoteTableModels(schemaName)
func (be Backend) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
return be.abstract.FetchRemoteTableModels(ctx, schemaName)
}

// OpenEngine opens an engine with the given table name and engine ID.
Expand Down
43 changes: 0 additions & 43 deletions lightning/backend/checkreq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,49 +30,6 @@ var _ = Suite(&checkReqSuite{})

type checkReqSuite struct{}

func (s *checkReqSuite) TestExtractTiDBVersion(c *C) {
vers, err := extractTiDBVersion("5.7.10-TiDB-v2.1.0-rc.1-7-g38c939f")
c.Assert(err, IsNil)
c.Assert(*vers, Equals, *semver.New("2.1.0-rc.1"))

vers, err = extractTiDBVersion("5.7.10-TiDB-v2.0.4-1-g06a0bf5")
c.Assert(err, IsNil)
c.Assert(*vers, Equals, *semver.New("2.0.4"))

vers, err = extractTiDBVersion("5.7.10-TiDB-v2.0.7")
c.Assert(err, IsNil)
c.Assert(*vers, Equals, *semver.New("2.0.7"))

vers, err = extractTiDBVersion("8.0.12-TiDB-v3.0.5-beta.12")
c.Assert(err, IsNil)
c.Assert(*vers, Equals, *semver.New("3.0.5-beta.12"))

vers, err = extractTiDBVersion("5.7.25-TiDB-v3.0.0-beta-211-g09beefbe0-dirty")
c.Assert(err, IsNil)
c.Assert(*vers, Equals, *semver.New("3.0.0-beta"))

vers, err = extractTiDBVersion("8.0.12-TiDB-v3.0.5-dirty")
c.Assert(err, IsNil)
c.Assert(*vers, Equals, *semver.New("3.0.5"))

vers, err = extractTiDBVersion("8.0.12-TiDB-v3.0.5-beta.12-dirty")
c.Assert(err, IsNil)
c.Assert(*vers, Equals, *semver.New("3.0.5-beta.12"))

vers, err = extractTiDBVersion("5.7.10-TiDB-v2.1.0-rc.1-7-g38c939f-dirty")
c.Assert(err, IsNil)
c.Assert(*vers, Equals, *semver.New("2.1.0-rc.1"))

_, err = extractTiDBVersion("")
c.Assert(err, ErrorMatches, "not a valid TiDB version.*")

_, err = extractTiDBVersion("8.0.12")
c.Assert(err, ErrorMatches, "not a valid TiDB version.*")

_, err = extractTiDBVersion("not-a-valid-version")
c.Assert(err, NotNil)
}

func (s *checkReqSuite) TestCheckVersion(c *C) {
err := checkVersion("TiNB", *semver.New("2.1.0"), *semver.New("2.3.5"))
c.Assert(err, IsNil)
Expand Down
28 changes: 2 additions & 26 deletions lightning/backend/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,38 +253,14 @@ func (importer *importer) CheckRequirements() error {
return nil
}

func extractTiDBVersion(version string) (*semver.Version, error) {
// version format: "5.7.10-TiDB-v2.1.0-rc.1-7-g38c939f"
// ^~~~~~~~~^ we only want this part
// version format: "5.7.10-TiDB-v2.0.4-1-g06a0bf5"
// ^~~~^
// version format: "5.7.10-TiDB-v2.0.7"
// ^~~~^
// version format: "5.7.25-TiDB-v3.0.0-beta-211-g09beefbe0-dirty"
// ^~~~~~~~~^
// The version is generated by `git describe --tags` on the TiDB repository.
versions := strings.Split(strings.TrimSuffix(version, "-dirty"), "-")
end := len(versions)
switch end {
case 3, 4:
case 5, 6:
end -= 2
default:
return nil, errors.Errorf("not a valid TiDB version: %s", version)
}
rawVersion := strings.Join(versions[2:end], "-")
rawVersion = strings.TrimPrefix(rawVersion, "v")
return semver.NewVersion(rawVersion)
}

func checkTiDBVersion(tls *common.TLS, requiredVersion semver.Version) error {
var status struct{ Version string }
err := tls.GetJSON("/status", &status)
if err != nil {
return err
}

version, err := extractTiDBVersion(status.Version)
version, err := common.ExtractTiDBVersion(status.Version)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -328,6 +304,6 @@ func checkVersion(component string, expected, actual semver.Version) error {
)
}

func (importer *importer) FetchRemoteTableModels(schema string) ([]*model.TableInfo, error) {
func (importer *importer) FetchRemoteTableModels(ctx context.Context, schema string) ([]*model.TableInfo, error) {
return fetchRemoteTableModelsFromTLS(importer.tls, schema)
}
2 changes: 1 addition & 1 deletion lightning/backend/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,7 @@ func (local *local) CheckRequirements() error {
return nil
}

func (local *local) FetchRemoteTableModels(schemaName string) ([]*model.TableInfo, error) {
func (local *local) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
return fetchRemoteTableModelsFromTLS(local.tls, schemaName)
}

Expand Down
79 changes: 71 additions & 8 deletions lightning/backend/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"database/sql"
"encoding/hex"
"fmt"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -279,9 +280,8 @@ func (be *tidbBackend) CheckRequirements() error {
}

func (be *tidbBackend) NewEncoder(tbl table.Table, options *SessionOptions) Encoder {
var se *session
se := newSession(options)
if options.SQLMode.HasStrictMode() {
se = newSession(options)
se.vars.SkipUTF8Check = false
se.vars.SkipASCIICheck = false
}
Expand Down Expand Up @@ -356,14 +356,24 @@ func (be *tidbBackend) WriteRows(ctx context.Context, _ uuid.UUID, tableName str
return errors.Trace(err)
}

func (be *tidbBackend) FetchRemoteTableModels(schemaName string) (tables []*model.TableInfo, err error) {
func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName string) (tables []*model.TableInfo, err error) {
s := common.SQLWithRetry{
DB: be.db,
Logger: log.L(),
}
err = s.Transact(context.Background(), "fetch table columns", func(c context.Context, tx *sql.Tx) error {

err = s.Transact(ctx, "fetch table columns", func(c context.Context, tx *sql.Tx) error {
var versionStr string
if err = tx.QueryRowContext(ctx, "SELECT version()").Scan(&versionStr); err != nil {
return err
}
tidbVersion, err := common.ExtractTiDBVersion(versionStr)
if err != nil {
return err
}

rows, e := tx.Query(`
SELECT table_name, column_name
SELECT table_name, column_name, column_type, extra
FROM information_schema.columns
WHERE table_schema = ?
ORDER BY table_name, ordinal_position;
Expand All @@ -379,8 +389,8 @@ func (be *tidbBackend) FetchRemoteTableModels(schemaName string) (tables []*mode
curTable *model.TableInfo
)
for rows.Next() {
var tableName, columnName string
if e := rows.Scan(&tableName, &columnName); e != nil {
var tableName, columnName, columnType, columnExtra string
if e := rows.Scan(&tableName, &columnName, &columnType, &columnExtra); e != nil {
return e
}
if tableName != curTableName {
Expand All @@ -394,14 +404,67 @@ func (be *tidbBackend) FetchRemoteTableModels(schemaName string) (tables []*mode
curColOffset = 0
}

// see: https://github.com/pingcap/parser/blob/3b2fb4b41d73710bc6c4e1f4e8679d8be6a4863e/types/field_type.go#L185-L191
var flag uint
if strings.HasSuffix(columnType, "unsigned") {
flag |= mysql.UnsignedFlag
}
if strings.Contains(columnExtra, "auto_increment") {
flag |= mysql.AutoIncrementFlag
}
curTable.Columns = append(curTable.Columns, &model.ColumnInfo{
Name: model.NewCIStr(columnName),
Offset: curColOffset,
State: model.StatePublic,
FieldType: types.FieldType{
Flag: flag,
},
})
curColOffset++
}
return rows.Err()
if rows.Err() != nil {
return rows.Err()
}
// for version < v4.0.0 we can use `show table next_row_id` to fetch auto id info, so about should be enough
if tidbVersion.Major < 4 {
return nil
}
// init auto id column for each table
for _, tbl := range tables {
tblName := common.UniqueTable(schemaName, tbl.Name.O)
rows, e = tx.Query(fmt.Sprintf("SHOW TABLE %s next_row_id", tblName))
if e != nil {
return e
}
for rows.Next() {
var (
dbName, tblName, columnName, idType string
nextID int64
)
if e := rows.Scan(&dbName, &tblName, &columnName, &nextID, &idType); e != nil {
_ = rows.Close()
return e
}
for _, col := range tbl.Columns {
if col.Name.O == columnName {
switch idType {
case "AUTO_INCREMENT":
col.Flag |= mysql.AutoIncrementFlag
case "AUTO_RANDOM":
col.Flag |= mysql.PriKeyFlag
tbl.PKIsHandle = true
// set a stub here, since we don't really need the real value
tbl.AutoRandomBits = 1
}
}
}
}
rows.Close()
if rows.Err() != nil {
return rows.Err()
}
}
return nil
})
return
}
13 changes: 7 additions & 6 deletions lightning/backend/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) {
indexChecksum := verification.MakeKVChecksum(0, 0, 0)

cols := s.tbl.Cols()
perms := make([]int, 0, len(s.tbl.Cols()))
perms := make([]int, 0, len(s.tbl.Cols())+1)
for i := 0; i < len(cols); i++ {
perms = append(perms, i)
}
perms = append(perms, -1)
encoder := s.backend.NewEncoder(s.tbl, &kv.SessionOptions{SQLMode: 0, Timestamp: 1234567890, RowFormatVersion: "1"})
row, err := encoder.Encode(logger, []types.Datum{
types.NewUintDatum(18446744073709551615),
Expand Down Expand Up @@ -133,7 +134,7 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) {
encoder := ignoreBackend.NewEncoder(s.tbl, &kv.SessionOptions{})
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
}, 1, []int{0})
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1})
c.Assert(err, IsNil)
row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)

Expand Down Expand Up @@ -161,7 +162,7 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) {
encoder := ignoreBackend.NewEncoder(s.tbl, &kv.SessionOptions{})
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
}, 1, []int{0})
}, 1, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -1})
c.Assert(err, IsNil)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)
Expand All @@ -187,16 +188,16 @@ func (s *mysqlSuite) TestStrictMode(c *C) {
logger := log.L()
_, err = encoder.Encode(logger, []types.Datum{
types.NewStringDatum("test"),
}, 1, []int{0})
}, 1, []int{0, 1, -1})
c.Assert(err, IsNil)

_, err = encoder.Encode(logger, []types.Datum{
types.NewStringDatum("\xff\xff\xff\xff"),
}, 1, []int{0})
}, 1, []int{0, 1, -1})
c.Assert(err, ErrorMatches, `.*incorrect utf8 value .* for column s0`)

_, err = encoder.Encode(logger, []types.Datum{
types.NewStringDatum("非 ASCII 字符串"),
}, 1, []int{1})
}, 1, []int{1, 0, -1})
c.Assert(err, ErrorMatches, ".*incorrect ascii value .* for column s1")
}
25 changes: 25 additions & 0 deletions lightning/common/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package common

import (
"fmt"
"strings"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -71,3 +72,27 @@ func FetchPDVersion(tls *TLS, pdAddr string) (*semver.Version, error) {

return semver.NewVersion(rawVersion)
}

func ExtractTiDBVersion(version string) (*semver.Version, error) {
// version format: "5.7.10-TiDB-v2.1.0-rc.1-7-g38c939f"
// ^~~~~~~~~^ we only want this part
// version format: "5.7.10-TiDB-v2.0.4-1-g06a0bf5"
// ^~~~^
// version format: "5.7.10-TiDB-v2.0.7"
// ^~~~^
// version format: "5.7.25-TiDB-v3.0.0-beta-211-g09beefbe0-dirty"
// ^~~~~~~~~^
// The version is generated by `git describe --tags` on the TiDB repository.
versions := strings.Split(strings.TrimSuffix(version, "-dirty"), "-")
end := len(versions)
switch end {
case 3, 4:
case 5, 6:
end -= 2
default:
return nil, errors.Errorf("not a valid TiDB version: %s", version)
}
rawVersion := strings.Join(versions[2:end], "-")
rawVersion = strings.TrimPrefix(rawVersion, "v")
return semver.NewVersion(rawVersion)
}
Loading

0 comments on commit 3ecec63

Please sign in to comment.