Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

restore: add glue.Glue interface and other function #456

Merged
merged 42 commits into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
b4aeb1e
save my work
lance6716 Nov 2, 2020
a5666a6
add notes
lance6716 Nov 4, 2020
5b79eda
save work
lance6716 Nov 6, 2020
51e8c0e
save work
lance6716 Nov 6, 2020
02f3ac5
fix unit test
lance6716 Nov 6, 2020
84d27f3
remove tidbMgr in RestoreController
lance6716 Nov 6, 2020
114d7f1
remove some comments
lance6716 Nov 6, 2020
d13bc83
remove some comments
lance6716 Nov 6, 2020
22d2f4e
change logger in SQLWithRetry
lance6716 Nov 6, 2020
c55f5aa
Merge branch 'master' into glue
lance6716 Nov 6, 2020
d0d01b9
revert replace log.Logger to *zap.Logger
lance6716 Nov 8, 2020
4fc56ec
Merge branch 'glue' of github.com:lance6716/tidb-lightning into glue
lance6716 Nov 8, 2020
e85c0b2
replace tab to space
lance6716 Nov 9, 2020
5811245
try another port to fix CI
lance6716 Nov 9, 2020
d75e0b9
remove some comment
lance6716 Nov 9, 2020
19d3aa5
*: more glue
lance6716 Nov 9, 2020
c264392
report info to host TiDB
lance6716 Nov 9, 2020
f50ecb0
Merge pull request #1 from lance6716/glue-more
lance6716 Nov 9, 2020
9e595e8
Merge branch 'master' into glue
lance6716 Nov 9, 2020
dbee87d
fix CI
lance6716 Nov 9, 2020
e081a6d
Merge branch 'glue-more' into glue
lance6716 Nov 9, 2020
a4b9fc8
Merge branch 'master' of https://github.com/pingcap/tidb-lightning in…
lance6716 Nov 10, 2020
f184e32
Merge branch 'master' into glue
lance6716 Nov 11, 2020
6e4da0a
address comment
lance6716 Nov 12, 2020
679b62d
Merge branch 'glue' of github.com:lance6716/tidb-lightning into glue
lance6716 Nov 12, 2020
832f65c
address comment
lance6716 Nov 12, 2020
4140470
Merge branch 'master' into glue
lance6716 Nov 12, 2020
33150d2
rename a method in interface
lance6716 Nov 12, 2020
7b368d8
save work
lance6716 Nov 12, 2020
aa921dd
try fix CI
lance6716 Nov 12, 2020
71581a3
could work
lance6716 Nov 12, 2020
8fb2745
change ctx usage
lance6716 Nov 12, 2020
481dbf4
try fix CI
lance6716 Nov 12, 2020
04a0b7f
try fix CI
lance6716 Nov 13, 2020
8978d8b
refine function interface
lance6716 Nov 13, 2020
70188c4
refine some fucntion interface
lance6716 Nov 13, 2020
15376ff
debug CI
lance6716 Nov 13, 2020
ff5c065
address comment
lance6716 Nov 13, 2020
1b446e5
Merge branch 'master' into glue
lance6716 Nov 13, 2020
14cc270
remove debug log
lance6716 Nov 15, 2020
1da5464
Merge branch 'master' into glue
lance6716 Nov 16, 2020
e880ad1
address comment
lance6716 Nov 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lightning/backend/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (be *tidbBackend) WriteRows(ctx context.Context, _ uuid.UUID, tableName str
func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName string) (tables []*model.TableInfo, err error) {
s := common.SQLWithRetry{
DB: be.db,
Logger: log.L(),
Logger: log.L().Logger,
}

err = s.Transact(ctx, "fetch table columns", func(c context.Context, tx *sql.Tx) error {
Expand Down
20 changes: 10 additions & 10 deletions lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string, t

sql := common.SQLWithRetry{
DB: db,
Logger: log.With(zap.String("schema", schemaName)),
Logger: log.With(zap.String("schema", schemaName)).Logger,
HideQueryLog: true,
}
err := sql.Exec(ctx, "create checkpoints database", fmt.Sprintf(`
Expand Down Expand Up @@ -512,7 +512,7 @@ func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string, t
func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error {
// We can have at most 65535 placeholders https://stackoverflow.com/q/4922345/
// Since this step is not performance critical, we just insert the rows one-by-one.
s := common.SQLWithRetry{DB: cpdb.db, Logger: log.L()}
s := common.SQLWithRetry{DB: cpdb.db, Logger: log.L().Logger}
err := s.Transact(ctx, "insert checkpoints", func(c context.Context, tx *sql.Tx) error {
taskStmt, err := tx.PrepareContext(c, fmt.Sprintf(`
REPLACE INTO %s.%s (id, task_id, source_dir, backend, importer_addr, tidb_host, tidb_port, pd_addr, sorted_kv_dir, lightning_ver)
Expand Down Expand Up @@ -569,7 +569,7 @@ func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, cfg *config.Conf
func (cpdb *MySQLCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error) {
s := common.SQLWithRetry{
DB: cpdb.db,
Logger: log.L(),
Logger: log.L().Logger,
}

taskQuery := fmt.Sprintf(
Expand Down Expand Up @@ -602,7 +602,7 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab

s := common.SQLWithRetry{
DB: cpdb.db,
Logger: log.With(zap.String("table", tableName)),
Logger: log.With(zap.String("table", tableName)).Logger,
}
err := s.Transact(ctx, "read checkpoint", func(c context.Context, tx *sql.Tx) error {
// 1. Populate the engines.
Expand Down Expand Up @@ -698,7 +698,7 @@ func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*Tab
func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error {
s := common.SQLWithRetry{
DB: cpdb.db,
Logger: log.With(zap.String("table", tableName)),
Logger: log.With(zap.String("table", tableName)).Logger,
}
err := s.Transact(ctx, "update engine checkpoints", func(c context.Context, tx *sql.Tx) error {
engineStmt, err := tx.PrepareContext(c, fmt.Sprintf(`
Expand Down Expand Up @@ -773,7 +773,7 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi
UPDATE %s.%s SET status = ? WHERE (table_name, engine_id) = (?, ?);
`, cpdb.schema, CheckpointTableNameEngine)

s := common.SQLWithRetry{DB: cpdb.db, Logger: log.L()}
s := common.SQLWithRetry{DB: cpdb.db, Logger: log.L().Logger}
err := s.Transact(context.Background(), "update checkpoints", func(c context.Context, tx *sql.Tx) error {
chunkStmt, e := tx.PrepareContext(c, chunkQuery)
if e != nil {
Expand Down Expand Up @@ -1119,7 +1119,7 @@ func (*NullCheckpointsDB) DumpChunks(context.Context, io.Writer) error {
func (cpdb *MySQLCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error {
s := common.SQLWithRetry{
DB: cpdb.db,
Logger: log.With(zap.String("table", tableName)),
Logger: log.With(zap.String("table", tableName)).Logger,
}

if tableName == "all" {
Expand Down Expand Up @@ -1151,7 +1151,7 @@ func (cpdb *MySQLCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int6
newSchema := fmt.Sprintf("`%s.%d.bak`", cpdb.schema[1:len(cpdb.schema)-1], taskID)
s := common.SQLWithRetry{
DB: cpdb.db,
Logger: log.With(zap.Int64("taskID", taskID)),
Logger: log.With(zap.Int64("taskID", taskID)).Logger,
}

createSchemaQuery := "CREATE SCHEMA IF NOT EXISTS " + newSchema
Expand Down Expand Up @@ -1188,7 +1188,7 @@ func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, table

s := common.SQLWithRetry{
DB: cpdb.db,
Logger: log.With(zap.String("table", tableName)),
Logger: log.With(zap.String("table", tableName)).Logger,
}
err := s.Transact(ctx, "ignore error checkpoints", func(c context.Context, tx *sql.Tx) error {
if _, e := tx.ExecContext(c, engineQuery, tableName); e != nil {
Expand Down Expand Up @@ -1239,7 +1239,7 @@ func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tabl

s := common.SQLWithRetry{
DB: cpdb.db,
Logger: log.With(zap.String("table", tableName)),
Logger: log.With(zap.String("table", tableName)).Logger,
}
err := s.Transact(ctx, "destroy error checkpoints", func(c context.Context, tx *sql.Tx) error {
// Obtain the list of tables
Expand Down
4 changes: 2 additions & 2 deletions lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ func IsEmptyDir(name string) bool {
// SQLWithRetry constructs a retryable transaction.
type SQLWithRetry struct {
DB *sql.DB
Logger log.Logger
Logger *zap.Logger
kennytm marked this conversation as resolved.
Show resolved Hide resolved
HideQueryLog bool
}

func (t SQLWithRetry) perform(ctx context.Context, parentLogger log.Logger, purpose string, action func() error) error {
func (t SQLWithRetry) perform(ctx context.Context, parentLogger *zap.Logger, purpose string, action func() error) error {
var err error
outside:
for i := 0; i < defaultMaxRetry; i++ {
Expand Down
2 changes: 1 addition & 1 deletion lightning/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (s *utilSuite) TestSQLWithRetry(c *C) {

sqlWithRetry := &common.SQLWithRetry{
DB: db,
Logger: log.L(),
Logger: log.L().Logger,
}
aValue := new(int)

Expand Down
91 changes: 91 additions & 0 deletions lightning/glue/glue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package glue

import (
"context"
"database/sql"

"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-lightning/lightning/common"
"go.uber.org/zap"
)

type Glue interface {
GetSQLExecutor() SQLExecutor
GetParser() *parser.Parser
GetTables(context.Context, string) ([]*model.TableInfo, error)
OwnsSQLExecutor() bool
}

type SQLExecutor interface {
ExecuteWithLog(ctx context.Context, query string, purpose string, logger *zap.Logger) error
ObtainStringLog(ctx context.Context, query string, purpose string, logger *zap.Logger) (string, error)
Close()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why pass a logger through the interface?

What about passing a logger when we creating an SQLExecutor instance?

func NewExternalTiDBGlue(db *sql.DB, sqlMode mysql.SQLMode, logger *zap.Logger) *ExternalTiDBGlue { ... }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some functions need a logger from Task, and that Task is often built from logger with different fields. It seems that pinning a logger at New is not very flexible


type ExternalTiDBGlue struct {
db *sql.DB
parser *parser.Parser
kennytm marked this conversation as resolved.
Show resolved Hide resolved
}

func NewExternalTiDBGlue(db *sql.DB, sqlMode mysql.SQLMode) *ExternalTiDBGlue {
p := parser.New()
p.SetSQLMode(sqlMode)

return &ExternalTiDBGlue{db: db, parser: p}
}

func (e ExternalTiDBGlue) GetSQLExecutor() SQLExecutor {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
return e
}

func (e ExternalTiDBGlue) ExecuteWithLog(ctx context.Context, query string, purpose string, logger *zap.Logger) error {
sql := common.SQLWithRetry{
DB: e.db,
Logger: logger,
}
return sql.Exec(ctx, purpose, query)
}

func (e ExternalTiDBGlue) ObtainStringLog(ctx context.Context, query string, purpose string, logger *zap.Logger) (string, error) {
var s string
err := common.SQLWithRetry{
DB: e.db,
Logger: logger,
}.QueryRow(ctx, purpose, query, &s)
return s, err
}

func (e ExternalTiDBGlue) GetParser() *parser.Parser {
return e.parser
}

func (e ExternalTiDBGlue) GetDB() *sql.DB {
return e.db
}

func (e ExternalTiDBGlue) GetTables(context.Context, string) ([]*model.TableInfo, error) {
return nil, nil
}

func (e ExternalTiDBGlue) OwnsSQLExecutor() bool {
return true
}

func (e ExternalTiDBGlue) Close() {
e.db.Close()
}
11 changes: 8 additions & 3 deletions lightning/restore/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pingcap/br/pkg/checksum"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb-lightning/lightning/glue"
tidbcfg "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
Expand Down Expand Up @@ -78,7 +79,11 @@ func newChecksumManager(rc *RestoreController) (ChecksumManager, error) {

manager = newTiKVChecksumManager(store.(tikv.Storage).GetClient(), pdCli)
} else {
manager = newTiDBChecksumExecutor(rc.tidbMgr.db)
e, ok := rc.tidbGlue.GetSQLExecutor().(glue.ExternalTiDBGlue)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
return nil, errors.New("can't use lightning via SQL with PD version less than v4.0.0")
}
manager = newTiDBChecksumExecutor(e.GetDB())
}

return manager, nil
Expand Down Expand Up @@ -119,7 +124,7 @@ func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *TidbTabl
// +---------+------------+---------------------+-----------+-------------+

cs := RemoteChecksum{}
err = common.SQLWithRetry{DB: e.db, Logger: task.Logger}.QueryRow(ctx, "compute remote checksum",
err = common.SQLWithRetry{DB: e.db, Logger: task.Logger.Logger}.QueryRow(ctx, "compute remote checksum",
"ADMIN CHECKSUM TABLE "+tableName, &cs.Schema, &cs.Table, &cs.Checksum, &cs.TotalKVs, &cs.TotalBytes,
)
dur := task.End(zap.ErrorLevel, err)
Expand All @@ -132,7 +137,7 @@ func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *TidbTabl

// DoChecksum do checksum for tables.
// table should be in <db>.<table>, format. e.g. foo.bar
func DoChecksum(ctx context.Context, db *sql.DB, table *TidbTableInfo) (*RemoteChecksum, error) {
func DoChecksum(ctx context.Context, table *TidbTableInfo) (*RemoteChecksum, error) {
var err error
manager, ok := ctx.Value(&checksumManagerKey).(ChecksumManager)
if !ok {
Expand Down
8 changes: 4 additions & 4 deletions lightning/restore/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (s *checksumSuite) TestDoChecksum(c *C) {
mock.ExpectClose()

ctx := MockDoChecksumCtx(db)
checksum, err := DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"})
checksum, err := DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t"})
c.Assert(err, IsNil)
c.Assert(*checksum, DeepEquals, RemoteChecksum{
Schema: "test",
Expand Down Expand Up @@ -91,7 +91,7 @@ func (s *checksumSuite) TestDoChecksumParallel(c *C) {
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
checksum, err := DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"})
checksum, err := DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t"})
c.Assert(err, IsNil)
c.Assert(*checksum, DeepEquals, RemoteChecksum{
Schema: "test",
Expand Down Expand Up @@ -130,7 +130,7 @@ func (s *checksumSuite) TestIncreaseGCLifeTimeFail(c *C) {
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
_, err = DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"})
_, err = DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t"})
c.Assert(err, ErrorMatches, "update GC lifetime failed: update gc error: context canceled")
wg.Done()
}()
Expand Down Expand Up @@ -158,7 +158,7 @@ func (s *checksumSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) {
mock.ExpectClose()

ctx := MockDoChecksumCtx(db)
_, err = DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"})
_, err = DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t"})
c.Assert(err, ErrorMatches, "compute remote checksum failed: mock syntax error.*")

c.Assert(db.Close(), IsNil)
Expand Down
Loading