Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: add back table empty check and add a switch config (#30887) #33881

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ type TikvImporter struct {
DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"`
RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"`
DuplicateDetection bool `toml:"duplicate-detection" json:"duplicate-detection"`
IncrementalImport bool `toml:"incremental-import" json:"incremental-import"`

EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"`
LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"`
Expand Down
106 changes: 103 additions & 3 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,24 @@ package restore
import (
"bytes"
"context"
"database/sql"
"fmt"
"io"
"path/filepath"
"reflect"
"sort"
"strings"
"sync"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
pdconfig "github.com/tikv/pd/server/config"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
Expand All @@ -36,11 +43,10 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/table/tables"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/api"
pdconfig "github.com/tikv/pd/server/config"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -94,10 +100,13 @@ func (rc *Controller) ClusterResource(ctx context.Context, localSource int64) er
}
clusterSource := localSource
if rc.taskMgr != nil {
clusterSource, err = rc.taskMgr.CheckClusterSource(ctx)
remoteSource, err := rc.taskMgr.CheckClusterSource(ctx)
if err != nil {
return errors.Trace(err)
}
if remoteSource > 0 {
clusterSource = remoteSource
}
}

replicaCount, err := rc.getReplicaCount(ctx)
Expand Down Expand Up @@ -651,3 +660,94 @@ outloop:
log.L().Info("Sample source data", zap.String("table", tableMeta.Name), zap.Float64("IndexRatio", tableMeta.IndexRatio), zap.Bool("IsSourceOrder", tableMeta.IsRowOrdered))
return nil
}

func (rc *Controller) checkTableEmpty(ctx context.Context) error {
if rc.cfg.TikvImporter.Backend == config.BackendTiDB || rc.cfg.TikvImporter.IncrementalImport {
return nil
}
db, _ := rc.tidbGlue.GetDB()

tableCount := 0
for _, db := range rc.dbMetas {
tableCount += len(db.Tables)
}

var lock sync.Mutex
tableNames := make([]string, 0)
concurrency := utils.MinInt(tableCount, rc.cfg.App.RegionConcurrency)
ch := make(chan string, concurrency)
eg, gCtx := errgroup.WithContext(ctx)
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
for tblName := range ch {
// skip tables that have checkpoint
if rc.cfg.Checkpoint.Enable {
_, err := rc.checkpointsDB.Get(gCtx, tblName)
switch {
case err == nil:
continue
case errors.IsNotFound(err):
default:
return errors.Trace(err)
}
}

hasData, err1 := tableContainsData(gCtx, db, tblName)
if err1 != nil {
return err1
}
if hasData {
lock.Lock()
tableNames = append(tableNames, tblName)
lock.Unlock()
}
}
return nil
})
}
loop:
for _, db := range rc.dbMetas {
for _, tbl := range db.Tables {
select {
case ch <- common.UniqueTable(tbl.DB, tbl.Name):
case <-gCtx.Done():
break loop
}

}
}
close(ch)
if err := eg.Wait(); err != nil {
if common.IsContextCanceledError(err) {
return nil
}
return errors.Annotate(err, "check table contains data failed")
}

if len(tableNames) > 0 {
// sort the failed names
sort.Strings(tableNames)
msg := fmt.Sprintf("table(s) [%s] are not empty", strings.Join(tableNames, ", "))
rc.checkTemplate.Collect(Critical, false, msg)
}
return nil
}

func tableContainsData(ctx context.Context, db common.DBExecutor, tableName string) (bool, error) {
query := "select 1 from " + tableName + " limit 1"
exec := common.SQLWithRetry{
DB: db,
Logger: log.L(),
}
var dump int
err := exec.QueryRow(ctx, "check table empty", query, &dump)

switch {
case errors.ErrorEqual(err, sql.ErrNoRows):
return false, nil
case err != nil:
return false, errors.Trace(err)
default:
return true, nil
}
}
55 changes: 55 additions & 0 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,3 +930,58 @@ func (m noopTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, check
func (m noopTableMetaMgr) FinishTable(ctx context.Context) error {
return nil
}

type singleMgrBuilder struct{}

func (b singleMgrBuilder) Init(context.Context) error {
return nil
}

func (b singleMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr {
return &singleTaskMetaMgr{
pd: pd,
}
}

func (b singleMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr {
return noopTableMetaMgr{}
}

type singleTaskMetaMgr struct {
pd *pdutil.PdController
}

func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error {
return nil
}

func (m *singleTaskMetaMgr) CheckClusterSource(ctx context.Context) (int64, error) {
return 0, nil
}

func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) {
return m.pd.RemoveSchedulers(ctx)
}

func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
return true, nil
}

func (m *singleTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) {
return true, true, nil
}

func (m *singleTaskMetaMgr) Cleanup(ctx context.Context) error {
return nil
}

func (m *singleTaskMetaMgr) CleanupTask(ctx context.Context) error {
return nil
}

func (m *singleTaskMetaMgr) CleanupAllMetas(ctx context.Context) error {
return nil
}

func (m *singleTaskMetaMgr) Close() {
}
12 changes: 10 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,9 @@ func NewRestoreControllerWithPauser(
}

var metaBuilder metaMgrBuilder
switch cfg.TikvImporter.Backend {
case config.BackendLocal, config.BackendImporter:
isSSTImport := cfg.TikvImporter.Backend == config.BackendLocal || cfg.TikvImporter.Backend == config.BackendImporter
switch {
case isSSTImport && cfg.TikvImporter.IncrementalImport:
// TODO: support Lightning via SQL
db, err := g.GetDB()
if err != nil {
Expand All @@ -349,6 +350,8 @@ func NewRestoreControllerWithPauser(
schema: cfg.App.MetaSchemaName,
needChecksum: cfg.PostRestore.Checksum != config.OpLevelOff,
}
case isSSTImport:
metaBuilder = singleMgrBuilder{}
default:
metaBuilder = noopMetaMgrBuilder{}
}
Expand Down Expand Up @@ -1777,6 +1780,11 @@ func (rc *Controller) DataCheck(ctx context.Context) error {
} else {
rc.checkTemplate.Collect(Critical, true, "table schemas are valid")
}

if err = rc.checkTableEmpty(ctx); err != nil {
return errors.Trace(err)
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions br/tests/lightning_duplicate_detection/config1.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[tikv-importer]
backend = "local"
duplicate-detection = true
incremental-import = true

[checkpoint]
enable = true
Expand Down
1 change: 1 addition & 0 deletions br/tests/lightning_duplicate_detection/config2.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[tikv-importer]
backend = "local"
duplicate-detection = true
incremental-import = true

[checkpoint]
enable = true
Expand Down
2 changes: 2 additions & 0 deletions br/tests/lightning_incremental/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[tikv-importer]
incremental-import = true
17 changes: 14 additions & 3 deletions br/tests/lightning_local_backend/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,23 @@ check_cluster_version 4 0 0 'local backend' || exit 0

ENGINE_COUNT=6

# First, verify that inject with not leader error is fine.
rm -f "$TEST_DIR/lightning-local.log"
# Test check table contains data
rm -f "/tmp/tidb_lightning_checkpoint_local_backend_test.pb"
rm -rf $TEST_DIR/lightning.log
run_sql 'DROP DATABASE IF EXISTS cpeng;'
export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader")'
run_sql 'CREATE DATABASE cpeng;'
run_sql 'CREATE TABLE cpeng.a (c int);'
run_sql 'CREATE TABLE cpeng.b (c int);'
run_sql "INSERT INTO cpeng.a values (1), (2);"
run_sql "INSERT INTO cpeng.b values (3);"
! run_lightning --backend local --enable-checkpoint=0
grep -Fq 'table(s) [`cpeng`.`a`, `cpeng`.`b`] are not empty' $TEST_DIR/lightning.log


# First, verify that inject with not leader error is fine.
export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader")'
rm -f "$TEST_DIR/lightning-local.log"
run_sql 'DROP DATABASE IF EXISTS cpeng;'
run_lightning --backend local --enable-checkpoint=1 --log-file "$TEST_DIR/lightning-local.log" --config "tests/$TEST_NAME/config.toml"

# Check that everything is correctly imported
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
create table pre_rebase (pk varchar(6) primary key) auto_increment=70000;
create table pre_rebase (pk varchar(6) primary key /*T![clustered_index] NONCLUSTERED */) auto_increment=70000;
9 changes: 7 additions & 2 deletions br/tests/lightning_tidb_rowid/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,13 @@ for BACKEND in local importer tidb; do

run_sql 'SELECT count(*), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.pre_rebase'
check_contains 'count(*): 1'
check_contains 'min(_tidb_rowid): 70000'
check_contains 'max(_tidb_rowid): 70000'
if [ "$BACKEND" == 'tidb' ]; then
check_contains 'min(_tidb_rowid): 70000'
check_contains 'max(_tidb_rowid): 70000'
else
check_contains 'min(_tidb_rowid): 1'
check_contains 'max(_tidb_rowid): 1'
fi
run_sql 'INSERT INTO rowid.pre_rebase VALUES ("?")'
run_sql 'SELECT _tidb_rowid > 70000 FROM rowid.pre_rebase WHERE pk = "?"'
check_contains '_tidb_rowid > 70000: 1'
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ require (
google.golang.org/grpc v1.29.1
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v2 v2.4.0
modernc.org/mathutil v1.2.2
modernc.org/mathutil v1.4.1
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1159,8 +1159,8 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
honnef.co/go/tools v0.2.0 h1:ws8AfbgTX3oIczLPNPCu5166oBg9ST2vNs0rcht+mDE=
honnef.co/go/tools v0.2.0/go.mod h1:lPVVZ2BS5TfnjLyizF7o7hv7j9/L+8cZY2hLyjP9cGY=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
modernc.org/mathutil v1.2.2 h1:+yFk8hBprV+4c0U9GjFtL+dV3N8hOJ8JCituQcMShFY=
modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/mathutil v1.4.1 h1:ij3fYGe8zBF4Vu+g0oT7mB06r8sqGWKuJu1yXeR4by8=
modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
moul.io/zapgorm2 v1.1.0/go.mod h1:emRfKjNqSzVj5lcgasBdovIXY1jSOwFz2GQZn1Rddks=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
Expand Down