Skip to content

Commit

Permalink
Merge branch 'fix_regression_having_ppd' of github.com:wjhuang2016/ti…
Browse files Browse the repository at this point in the history
…db into fix_regression_having_ppd
  • Loading branch information
wjhuang2016 committed Mar 17, 2022
2 parents 4581b6a + af1b7a1 commit 269797f
Show file tree
Hide file tree
Showing 66 changed files with 1,599 additions and 1,091 deletions.
4 changes: 2 additions & 2 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func BuildBackupRangeAndSchema(
if !isFullBackup {
// according to https://github.com/pingcap/tidb/issues/32290.
// ignore placement policy when not in full backup
tableInfo.PlacementPolicyRef = nil
tableInfo.ClearPlacement()
}

if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() {
Expand Down Expand Up @@ -478,7 +478,7 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastB
}
if job.BinlogInfo.TableInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.TableInfo.PlacementPolicyRef = nil
job.BinlogInfo.TableInfo.ClearPlacement()
}
jobBytes, err := json.Marshal(job)
if err != nil {
Expand Down
16 changes: 10 additions & 6 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/resolver"
split "github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
Expand Down Expand Up @@ -128,22 +127,23 @@ type importClientFactoryImpl struct {
conns *common.GRPCConns
splitCli split.SplitClient
tls *common.TLS
resolveBuilder *resolver.Builder
tcpConcurrency int
}

func newImportClientFactoryImpl(splitCli split.SplitClient, tls *common.TLS, tcpConcurrency int) *importClientFactoryImpl {
resolveBuilder := resolver.NewBuilder(splitCli)
return &importClientFactoryImpl{
conns: common.NewGRPCConns(),
splitCli: splitCli,
tls: tls,
resolveBuilder: resolveBuilder,
tcpConcurrency: tcpConcurrency,
}
}

func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {
store, err := f.splitCli.GetStore(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}
opt := grpc.WithInsecure()
if f.tls.TLSConfig() != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(f.tls.TLSConfig()))
Expand All @@ -152,17 +152,21 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64)

bfConf := backoff.DefaultConfig
bfConf.MaxDelay = gRPCBackOffMaxDelay
// we should use peer address for tiflash. for tikv, peer address is empty
addr := store.GetPeerAddress()
if addr == "" {
addr = store.GetAddress()
}
conn, err := grpc.DialContext(
ctx,
f.resolveBuilder.Target(storeID),
addr,
opt,
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: gRPCKeepAliveTime,
Timeout: gRPCKeepAliveTimeout,
PermitWithoutStream: true,
}),
grpc.WithResolvers(f.resolveBuilder),
)
cancel()
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,6 @@ func (c *testClient) SetStoresLabel(ctx context.Context, stores []uint64, labelK
return nil
}

func (c *testClient) InvalidateStoreCache(storeID uint64) {}

func cloneRegion(region *restore.RegionInfo) *restore.RegionInfo {
r := &metapb.Region{}
if region.Region != nil {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.
if walkErr == nil {
return common.ErrEmptySourceDir.GenWithStackByArgs(taskCfg.Mydumper.SourceDir)
}
return common.NormalizeError(err)
return common.NormalizeOrWrapErr(common.ErrStorageUnknown, walkErr)
}

loadTask := log.L().Begin(zap.InfoLevel, "load data source")
Expand Down
24 changes: 19 additions & 5 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,7 @@ func (rc *Controller) checkTableEmpty(ctx context.Context) error {
concurrency := utils.MinInt(tableCount, rc.cfg.App.RegionConcurrency)
ch := make(chan string, concurrency)
eg, gCtx := errgroup.WithContext(ctx)

for i := 0; i < concurrency; i++ {
eg.Go(func() error {
for tblName := range ch {
Expand Down Expand Up @@ -1125,17 +1126,23 @@ func (rc *Controller) checkTableEmpty(ctx context.Context) error {
return nil
})
}
loop:
for _, db := range rc.dbMetas {
for _, tbl := range db.Tables {
ch <- common.UniqueTable(tbl.DB, tbl.Name)
select {
case ch <- common.UniqueTable(tbl.DB, tbl.Name):
case <-gCtx.Done():
break loop
}

}
}
close(ch)
if err := eg.Wait(); err != nil {
if common.IsContextCanceledError(err) {
return nil
}
return errors.Trace(err)
return errors.Annotate(err, "check table contains data failed")
}

if len(tableNames) > 0 {
Expand All @@ -1147,13 +1154,20 @@ func (rc *Controller) checkTableEmpty(ctx context.Context) error {
return nil
}

func tableContainsData(ctx context.Context, db utils.QueryExecutor, tableName string) (bool, error) {
func tableContainsData(ctx context.Context, db utils.DBExecutor, tableName string) (bool, error) {
failpoint.Inject("CheckTableEmptyFailed", func() {
failpoint.Return(false, errors.New("mock error"))
})
query := "select 1 from " + tableName + " limit 1"
exec := common.SQLWithRetry{
DB: db,
Logger: log.L(),
}
var dump int
err := db.QueryRowContext(ctx, query).Scan(&dump)
err := exec.QueryRow(ctx, "check table empty", query, &dump)

switch {
case err == sql.ErrNoRows:
case errors.ErrorEqual(err, sql.ErrNoRows):
return false, nil
case err != nil:
return false, errors.Trace(err)
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/lightning/restore/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/worker"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -475,6 +476,9 @@ func TestCheckTableEmpty(t *testing.T) {
require.NoError(t, err)
rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.MatchExpectationsInOrder(false)
// test auto retry retryable error
mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1").
WillReturnError(mysql.NewErr(errno.ErrPDServerTimeout))
mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1").
Expand Down Expand Up @@ -543,6 +547,18 @@ func TestCheckTableEmpty(t *testing.T) {
err = rc.checkTableEmpty(ctx)
require.NoError(t, err)
require.NoError(t, mock.ExpectationsWereMet())

err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/restore/CheckTableEmptyFailed", `return`)
require.NoError(t, err)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/restore/CheckTableEmptyFailed")
}()

// restrict the concurrency to ensure there are more tables than workers
rc.cfg.App.RegionConcurrency = 1
// test check tables not stuck but return the right error
err = rc.checkTableEmpty(ctx)
require.Regexp(t, ".*check table contains data failed: mock error.*", err.Error())
}

func TestLocalResource(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
}

if status == metaStatusChecksuming {
return common.ErrAllocTableRowIDs.GenWithStack("target table is calculating checksum, please wait unit the checksum is finished and try again.")
return common.ErrAllocTableRowIDs.GenWithStack("Target table is calculating checksum. Please wait until the checksum is finished and try again.")
}

if metaTaskID == m.taskID {
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func NewRestoreControllerWithPauser(

backend, err = local.NewLocalBackend(ctx, tls, cfg, g, maxOpenFiles, errorMgr)
if err != nil {
return nil, errors.Annotate(err, "build local backend failed")
return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err)
}
err = verifyLocalFile(ctx, cpdb, cfg.TikvImporter.SortedKVDir)
if err != nil {
Expand Down Expand Up @@ -1666,12 +1666,10 @@ func (rc *Controller) doCompact(ctx context.Context, level int32) error {
}

func (rc *Controller) switchToImportMode(ctx context.Context) {
log.L().Info("switch to import mode")
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Import)
}

func (rc *Controller) switchToNormalMode(ctx context.Context) {
log.L().Info("switch to normal mode")
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Normal)
}

Expand All @@ -1681,6 +1679,8 @@ func (rc *Controller) switchTiKVMode(ctx context.Context, mode sstpb.SwitchMode)
return
}

log.L().Info("switch import mode", zap.Stringer("mode", mode))

// It is fine if we miss some stores which did not switch to Import mode,
// since we're running it periodically, so we exclude disconnected stores.
// But it is essential all stores be switched back to Normal mode to allow
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,8 @@ func (tr *TableRestore) postProcess(
}

// tidb backend don't need checksum & analyze
if !rc.backend.ShouldPostProcess() {
tr.logger.Debug("skip checksum & analyze, not supported by this backend")
if rc.cfg.PostRestore.Checksum == config.OpLevelOff && rc.cfg.PostRestore.Analyze == config.OpLevelOff {
tr.logger.Debug("skip checksum & analyze, either because not supported by this backend or manually disabled")
err := rc.saveStatusCheckpoint(ctx, tr.tableName, checkpoints.WholeTableEngineID, nil, checkpoints.CheckpointStatusAnalyzeSkipped)
return false, errors.Trace(err)
}
Expand Down
118 changes: 0 additions & 118 deletions br/pkg/resolver/resolver.go

This file was deleted.

Loading

0 comments on commit 269797f

Please sign in to comment.