Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into move-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter committed Apr 1, 2024
2 parents 6465f4d + 3cfea6a commit 2d73df1
Show file tree
Hide file tree
Showing 63 changed files with 879 additions and 506 deletions.
2 changes: 1 addition & 1 deletion br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func BuildBackupSchemas(
// Treat cached table as normal table.
tableInfo.TableCacheStatusType = model.TableCacheStatusDisable

if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() {
if tableInfo.ContainsAutoRandomBits() {
// this table has auto_random id, we need backup and rebase in restoration
var globalAutoRandID int64
globalAutoRandID, err = autoIDAccess.RandomID().Get()
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (db *DB) CreateTablePostRestore(ctx context.Context, table *metautil.Table,
utils.EncloseName(table.DB.Name.O),
utils.EncloseName(table.Info.Name.O),
table.Info.AutoIncID)
} else if table.Info.PKIsHandle && table.Info.ContainsAutoRandomBits() {
} else if table.Info.ContainsAutoRandomBits() {
restoreMetaSQL = fmt.Sprintf(
"alter table %s.%s auto_random_base = %d",
utils.EncloseName(table.DB.Name.O),
Expand Down
8 changes: 2 additions & 6 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,10 @@ func (rs *RegionSplitter) executeSplitByKeys(
return err
}
splitKeyMap := split.GetSplitKeysOfRegions(sortedKeys, regions, splitContext.isRawKv)
regionMap := make(map[uint64]*split.RegionInfo)
for _, region := range regions {
regionMap[region.Region.GetId()] = region
}
workerPool := util.NewWorkerPool(uint(splitContext.storeCount)+1, "split keys")
eg, ectx := errgroup.WithContext(ctx)
for regionID, splitKeys := range splitKeyMap {
region := regionMap[regionID]
for region, splitKeys := range splitKeyMap {
region := region
keys := splitKeys
sctx := splitContext
workerPool.ApplyOnErrorGroup(eg, func() error {
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,9 @@ func (c *pdClient) hasHealthyRegion(ctx context.Context, regionID uint64) (bool,
func (c *pdClient) SplitWaitAndScatter(
ctx context.Context, region *RegionInfo, keys [][]byte,
) (*RegionInfo, []*RegionInfo, error) {
failpoint.Inject("failToSplit", func(_ failpoint.Value) {
failpoint.Return(nil, nil, errors.New("retryable error"))
})
if len(keys) == 0 {
return region, []*RegionInfo{region}, nil
}
Expand Down
46 changes: 35 additions & 11 deletions br/pkg/restore/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,34 +288,58 @@ func (b *BackoffMayNotCountBackoffer) Attempt() int {
}

// GetSplitKeysOfRegions checks every input key is necessary to split region on
// it. Returns a map from original region ID to split keys belongs to each
// region.
// it. Returns a map from region to split keys belongs to it.
//
// The key will be skipped if it's the region boundary.
//
// prerequisite:
// - sortedKeys are sorted in ascending order.
// - sortedRegions are continuous and sorted in ascending order by start key.
// - sortedRegions can cover all keys in sortedKeys.
// PaginateScanRegion should satisfy the above prerequisites.
func GetSplitKeysOfRegions(sortedKeys [][]byte, sortedRegions []*RegionInfo, isRawKV bool) map[uint64][][]byte {
splitKeyMap := make(map[uint64][][]byte, len(sortedRegions))
func GetSplitKeysOfRegions(
sortedKeys [][]byte,
sortedRegions []*RegionInfo,
isRawKV bool,
) map[*RegionInfo][][]byte {
splitKeyMap := make(map[*RegionInfo][][]byte, len(sortedRegions))
curKeyIndex := 0
splitKey := codec.EncodeBytesExt(nil, sortedKeys[curKeyIndex], isRawKV)

for _, region := range sortedRegions {
for ; curKeyIndex < len(sortedKeys); curKeyIndex += 1 {
for {
if len(sortedKeys[curKeyIndex]) == 0 {
continue
// should not happen?
goto nextKey
}
splitKey := codec.EncodeBytesExt(nil, sortedKeys[curKeyIndex], isRawKV)
// If splitKey is the boundary of the region, don't need to split on it.
if bytes.Equal(splitKey, region.Region.GetStartKey()) {
continue
goto nextKey
}
// If splitKey is not in a region, we should move to the next region.
// If splitKey is not in this region, we should move to the next region.
if !region.ContainsInterior(splitKey) {
break
}
regionID := region.Region.GetId()
splitKeyMap[regionID] = append(splitKeyMap[regionID], sortedKeys[curKeyIndex])

splitKeyMap[region] = append(splitKeyMap[region], sortedKeys[curKeyIndex])

nextKey:
curKeyIndex++
if curKeyIndex >= len(sortedKeys) {
return splitKeyMap
}
splitKey = codec.EncodeBytesExt(nil, sortedKeys[curKeyIndex], isRawKV)
}
}
lastKey := sortedKeys[len(sortedKeys)-1]
endOfLastRegion := sortedRegions[len(sortedRegions)-1].Region.GetEndKey()
if !bytes.Equal(lastKey, endOfLastRegion) {
log.Error("in getSplitKeysOfRegions, regions don't cover all keys",
zap.String("firstKey", hex.EncodeToString(sortedKeys[0])),
zap.String("lastKey", hex.EncodeToString(lastKey)),
zap.String("firstRegionStartKey", hex.EncodeToString(sortedRegions[0].Region.GetStartKey())),
zap.String("lastRegionEndKey", hex.EncodeToString(endOfLastRegion)),
)
}
return splitKeyMap
}
50 changes: 26 additions & 24 deletions br/pkg/restore/split/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,19 +359,14 @@ func TestGetSplitKeyPerRegion(t *testing.T) {
}
result := GetSplitKeysOfRegions(sortedKeys, sortedRegions, false)
require.Equal(t, 3, len(result))
require.Equal(t, [][]byte{[]byte("b"), []byte("d")}, result[1])
require.Equal(t, [][]byte{[]byte("g"), []byte("j")}, result[2])
require.Equal(t, [][]byte{[]byte("l")}, result[3])
require.Equal(t, [][]byte{[]byte("b"), []byte("d")}, result[sortedRegions[0]])
require.Equal(t, [][]byte{[]byte("g"), []byte("j")}, result[sortedRegions[1]])
require.Equal(t, [][]byte{[]byte("l")}, result[sortedRegions[2]])

// test case moved from lightning
sortedRegions = sortedRegions[:0]
tableID := int64(1)
peers := make([]*metapb.Peer, 1)
peers[0] = &metapb.Peer{
Id: 1,
StoreId: 1,
}
keys := []int64{10, 100, 500, 1000, 999999, -1}
keys := []int64{1, 10, 100, 1000, 10000, -1}
sortedRegions = make([]*RegionInfo, 0, len(keys))
start := tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(0))
regionStart := codec.EncodeBytes([]byte{}, start)
for i, end := range keys {
Expand All @@ -382,26 +377,25 @@ func TestGetSplitKeyPerRegion(t *testing.T) {
}
region := &RegionInfo{
Region: &metapb.Region{
Id: uint64(i),
Peers: peers,
StartKey: regionStart,
EndKey: regionEndKey,
RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1},
Id: uint64(i),
StartKey: regionStart,
EndKey: regionEndKey,
},
}
sortedRegions = append(sortedRegions, region)
regionStart = regionEndKey
}

checkKeys := map[int64]int{
0: -1,
5: 0,
99: 1,
100: -1,
512: 3,
8888: 4,
999999: -1,
100000000: 5,
0: -1,
5: 1,
6: 1,
7: 1,
50: 2,
60: 2,
70: 2,
100: -1,
50000: 5,
}
expected := map[uint64][][]byte{}
sortedKeys = make([][]byte, 0, len(checkKeys))
Expand All @@ -414,9 +408,17 @@ func TestGetSplitKeyPerRegion(t *testing.T) {
}
expected[uint64(idx)] = append(expected[uint64(idx)], key)
}

slices.SortFunc(sortedKeys, bytes.Compare)
for i := range expected {
slices.SortFunc(expected[i], bytes.Compare)
}

got := GetSplitKeysOfRegions(sortedKeys, sortedRegions, false)
require.Equal(t, expected, got)
require.Equal(t, len(expected), len(got))
for region, gotKeys := range got {
require.Equal(t, expected[region.Region.GetId()], gotKeys)
}
}

func checkRegionsBoundaries(t *testing.T, regions []*RegionInfo, expected [][]byte) {
Expand Down
13 changes: 6 additions & 7 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,21 +265,20 @@ func TestClearCache(t *testing.T) {
return nil
}
failedStoreID := uint64(0)
hasFailed := false
hasFailed := atomic.NewBool(false)
for _, s := range c.stores {
s.clientMu.Lock()
sid := s.GetID()
s.onGetRegionCheckpoint = func(glftrr *logbackup.GetLastFlushTSOfRegionRequest) error {
// mark this store cache cleared
failedStoreID = s.GetID()
if !hasFailed {
hasFailed = true
// mark one store failed is enough
if hasFailed.CompareAndSwap(false, true) {
// mark this store cache cleared
failedStoreID = sid
return errors.New("failed to get checkpoint")
}
return nil
}
s.clientMu.Unlock()
// mark one store failed is enough
break
}
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
Expand Down
71 changes: 71 additions & 0 deletions br/tests/br_autorandom/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/bin/bash
#
# Copyright 2024 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu
. run_services
CUR=$(cd `dirname $0`; pwd)

# const value
PREFIX="autorandom" # NOTICE: don't start with 'br' because `restart services` would remove file/directory br*.
res_file="$TEST_DIR/sql_res.$TEST_NAME.txt"

# start a new cluster
echo "restart a services"
restart_services

# prepare the data
echo "prepare the data"
run_sql "CREATE TABLE test.common (a BIGINT UNSIGNED AUTO_RANDOM(1), b VARCHAR(255), uid INT, c VARCHAR(255) DEFAULT 'c', PRIMARY KEY (a, b), UNIQUE INDEX (uid));"
run_sql "INSERT INTO test.common (b, uid, c) values ('a', 1, 'a');"
run_sql "INSERT INTO test.common (b, uid, c) values ('a', 2, 'a');"
run_sql "INSERT INTO test.common (b, uid, c) values ('a', 3, 'a');"
run_sql "INSERT INTO test.common (b, uid, c) values ('a', 4, 'a');"
run_sql "INSERT INTO test.common (b, uid, c) values ('a', 5, 'a');"
run_sql "INSERT INTO test.common (b, uid, c) values ('a', 6, 'a');"
run_sql "INSERT INTO test.common (b, uid, c) values ('a', 7, 'a');"
run_sql "INSERT INTO test.common (b, uid, c) values ('a', 8, 'a');"
run_sql "INSERT INTO test.common (b, uid, c) values ('a', 9, 'a');"
run_sql "INSERT INTO test.common (b, uid, c) values ('a', 10, 'a');"

run_sql "CREATE TABLE test.pk (a BIGINT UNSIGNED AUTO_RANDOM(1), uid INT, c VARCHAR(255) DEFAULT 'c', PRIMARY KEY (a), UNIQUE INDEX (uid));"
run_sql "INSERT INTO test.pk (uid, c) values (1, 'a');"
run_sql "INSERT INTO test.pk (uid, c) values (2, 'a');"
run_sql "INSERT INTO test.pk (uid, c) values (3, 'a');"
run_sql "INSERT INTO test.pk (uid, c) values (4, 'a');"
run_sql "INSERT INTO test.pk (uid, c) values (5, 'a');"
run_sql "INSERT INTO test.pk (uid, c) values (6, 'a');"
run_sql "INSERT INTO test.pk (uid, c) values (7, 'a');"
run_sql "INSERT INTO test.pk (uid, c) values (8, 'a');"
run_sql "INSERT INTO test.pk (uid, c) values (9, 'a');"
run_sql "INSERT INTO test.pk (uid, c) values (10, 'a');"

# backup & restore
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$PREFIX/full"
echo "restart a services"
restart_services
run_br --pd $PD_ADDR restore full -s "local://$TEST_DIR/$PREFIX/full"

# new workload
for i in `seq 1 9`; do
run_sql "INSERT INTO test.common (b, uid) values ('a', 10) on duplicate key update c = 'b';"
run_sql "INSERT INTO test.pk (uid) values (10) on duplicate key update c = 'b';"
done

# check consistency
run_sql "SELECT COUNT(*) AS RESCNT FROM test.common WHERE uid < 10 AND c = 'b';"
check_contains "RESCNT: 0"
run_sql "SELECT COUNT(*) AS RESCNT FROM test.pk WHERE uid < 10 AND c = 'b';"
check_contains "RESCNT: 0"
2 changes: 1 addition & 1 deletion br/tests/run_group_br_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ groups=(
["G05"]='br_range br_rawkv br_replica_read br_restore_TDE_enable br_restore_log_task_enable br_s3 br_shuffle_leader br_shuffle_region br_single_table'
["G06"]='br_skip_checksum br_small_batch_size br_split_region_fail br_systables br_table_filter br_txn br_stats'
["G07"]='br_clustered_index br_crypter br_table_partition br_tidb_placement_policy br_tiflash br_tikv_outage'
["G08"]='br_tikv_outage2 br_ttl br_views_and_sequences br_z_gc_safepoint'
["G08"]='br_tikv_outage2 br_ttl br_views_and_sequences br_z_gc_safepoint br_autorandom'
)

# Get other cases not in groups, to avoid missing any case
Expand Down
4 changes: 2 additions & 2 deletions lightning/tests/lightning_local_backend/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ grep -Fq 'table(s) [`cpeng`.`a`, `cpeng`.`b`] are not empty' $TEST_DIR/lightning


# First, verify that inject with not leader error is fine.
export GO_FAILPOINTS='github.com/pingcap/tidb/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader");github.com/pingcap/tidb/pkg/lightning/backend/local/failToSplit=5*return("")'
export GO_FAILPOINTS='github.com/pingcap/tidb/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader");github.com/pingcap/tidb/br/pkg/restore/split/failToSplit=5*return("");github.com/pingcap/tidb/pkg/lightning/backend/local/failToSplit=5*return("")'
rm -f "$TEST_DIR/lightning-local.log"
run_sql 'DROP DATABASE IF EXISTS cpeng;'
run_lightning --backend local --enable-checkpoint=1 --log-file "$TEST_DIR/lightning-local.log" --config "$CUR/config.toml" -L debug
grep -Eq "split regions.*retryable error" "$TEST_DIR/lightning-local.log"
grep -q "retryable error" "$TEST_DIR/lightning-local.log"

# Check that everything is correctly imported
run_sql 'SELECT count(*), sum(c) FROM cpeng.a'
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func buildTableScan(ctx context.Context, c *copr.CopContextBase, startTS uint64,
SetStartTS(startTS).
SetKeyRanges([]kv.KeyRange{{StartKey: start, EndKey: end}}).
SetKeepOrder(true).
SetFromSessionVars(c.SessionContext.GetSessionVars()).
SetFromSessionVars(c.SessionContext.GetDistSQLCtx()).
SetFromInfoSchema(c.SessionContext.GetDomainInfoSchema()).
SetConcurrency(1).
Build()
Expand All @@ -284,7 +284,7 @@ func buildTableScan(ctx context.Context, c *copr.CopContextBase, startTS uint64,
if err != nil {
return nil, err
}
return distsql.Select(ctx, c.SessionContext, kvReq, c.FieldTypes)
return distsql.Select(ctx, c.SessionContext.GetDistSQLCtx(), kvReq, c.FieldTypes)
}

func fetchTableScanResult(
Expand Down Expand Up @@ -353,7 +353,7 @@ func buildDAGPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*m
return nil, err
}
dagReq.Executors = append(dagReq.Executors, execPB)
distsql.SetEncodeType(sCtx, dagReq)
distsql.SetEncodeType(sCtx.GetDistSQLCtx(), dagReq)
return dagReq, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func buildDescTableScanDAG(ctx sessionctx.Context, tbl table.PhysicalTable, hand
tblScanExec := constructDescTableScanPB(tbl.GetPhysicalID(), tbl.Meta(), handleCols)
dagReq.Executors = append(dagReq.Executors, tblScanExec)
dagReq.Executors = append(dagReq.Executors, constructLimitPB(limit))
distsql.SetEncodeType(ctx, dagReq)
distsql.SetEncodeType(ctx.GetDistSQLCtx(), dagReq)
return dagReq, nil
}

Expand Down Expand Up @@ -528,7 +528,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table.
} else {
ranges = ranger.FullIntRange(false)
}
builder = b.SetHandleRanges(sctx.GetSessionVars().StmtCtx, tbl.GetPhysicalID(), tbl.Meta().IsCommonHandle, ranges)
builder = b.SetHandleRanges(sctx.GetDistSQLCtx(), tbl.GetPhysicalID(), tbl.Meta().IsCommonHandle, ranges)
builder.SetDAGRequest(dagPB).
SetStartTS(startTS).
SetKeepOrder(true).
Expand All @@ -547,7 +547,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx *JobContext, startTS uint64, tbl table.
return nil, errors.Trace(err)
}

result, err := distsql.Select(ctx.ddlJobCtx, sctx, kvReq, getColumnsTypes(handleCols))
result, err := distsql.Select(ctx.ddlJobCtx, sctx.GetDistSQLCtx(), kvReq, getColumnsTypes(handleCols))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 2d73df1

Please sign in to comment.