Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: optimize mpp probe #39932

Merged
merged 33 commits into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d6eda84
add mpp probe realize
hackersean Dec 14, 2022
8aab258
add mpp probe realize
hackersean Dec 14, 2022
e9ce294
optmize the mpp probe in query
hackersean Dec 14, 2022
9859e9b
Merge branch 'master' into sean/optimize-mpp-probe
hackersean Dec 14, 2022
aedd8a4
bugfix and optmize log
hackersean Dec 15, 2022
b86ec48
add metrics and fix unit test.
hackersean Dec 15, 2022
d6c576f
clean unse code for mppStoreLastFailTime
hackersean Dec 16, 2022
aafaba4
add unit test and fix some bugs
hackersean Dec 16, 2022
5840aaf
Merge remote-tracking branch 'origin/master' into sean/optimize-mpp-p…
hackersean Dec 19, 2022
0c9ff8e
use store manage background goroutine
hackersean Dec 19, 2022
7d2705d
fix DATA RACE
hackersean Dec 20, 2022
3864a7e
stop background goroutine when store close
hackersean Dec 20, 2022
4275876
add log for store
hackersean Dec 20, 2022
1232ccb
fix goleak in unittest
hackersean Dec 20, 2022
40a14d3
Merge remote-tracking branch 'origin/master' into sean/optimize-mpp-p…
hackersean Dec 20, 2022
ac92481
Merge remote-tracking branch 'origin/master' into sean/optimize-mpp-p…
hackersean Dec 20, 2022
417a18b
Merge remote-tracking branch 'origin/master' into sean/optimize-mpp-p…
hackersean Dec 20, 2022
1fe1b6f
move wg.Done to defer
hackersean Dec 20, 2022
8a74a87
Merge branch 'sean/optimize-mpp-probe' of github.com:hackersean/tidb …
hackersean Dec 20, 2022
80554ef
Merge branch 'sean/optimize-mpp-probe' of github.com:hackersean/tidb …
hackersean Dec 20, 2022
099bbd7
Merge branch 'sean/optimize-mpp-probe' of github.com:hackersean/tidb …
hackersean Dec 20, 2022
529a5ae
Merge branch 'sean/optimize-mpp-probe' of github.com:hackersean/tidb …
hackersean Dec 20, 2022
5926d8f
Merge branch 'sean/optimize-mpp-probe' of github.com:hackersean/tidb …
hackersean Dec 20, 2022
93ae0f3
add logs
hackersean Dec 20, 2022
b4347c6
add logs
hackersean Dec 20, 2022
d060004
Merge branch 'sean/optimize-mpp-probe' of github.com:hackersean/tidb …
hackersean Dec 20, 2022
454028b
fix unitest goleak
hackersean Dec 20, 2022
d385629
optimize lock for query
hackersean Dec 21, 2022
616973d
move background goroutine to main
hackersean Dec 21, 2022
ec3410b
Merge branch 'master' into sean/optimize-mpp-probe
hawkingrei Dec 21, 2022
59b45b8
optmize for metris and some code
hackersean Dec 21, 2022
18226b2
fix some problem
hackersean Dec 22, 2022
54dc11e
Merge branch 'master' into sean/optimize-mpp-probe
hackersean Dec 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package kv

import (
"context"
"sync"
"time"

"github.com/pingcap/kvproto/pkg/mpp"
Expand Down Expand Up @@ -81,7 +80,7 @@ type MPPDispatchRequest struct {
type MPPClient interface {
// ConstructMPPTasks schedules task for a plan fragment.
// TODO:: This interface will be refined after we support more executors.
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, *sync.Map, time.Duration) ([]MPPTaskMeta, error)
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64) Response
Expand Down
2 changes: 2 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TokenGauge)
prometheus.MustRegister(ConfigStatus)
prometheus.MustRegister(TiFlashQueryTotalCounter)
prometheus.MustRegister(TiFlashFailedMPPStoreState)
prometheus.MustRegister(SmallTxnWriteDuration)
prometheus.MustRegister(TxnWriteThroughput)
prometheus.MustRegister(LoadSysVarCacheCounter)
Expand Down Expand Up @@ -238,6 +239,7 @@ func ToggleSimplifiedMode(simplified bool) {
InfoCacheCounters,
ReadFromTableCacheCounter,
TiFlashQueryTotalCounter,
TiFlashFailedMPPStoreState,
CampaignOwnerCounter,
NonTransactionalDMLCount,
MemoryUsage,
Expand Down
8 changes: 8 additions & 0 deletions metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,14 @@ var (
Help: "Counter of TiFlash queries.",
}, []string{LblType, LblResult})

TiFlashFailedMPPStoreState = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "tiflash_failed_store",
Help: "Statues of failed tiflash mpp store,-1 means detector heartbeat,0 means reachable,1 means abnormal.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does detector heartbeat means?

Copy link
Contributor Author

@hackersean hackersean Dec 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metric in probe's cycle,it is used to indicate that the prober is working

}, []string{LblAddress})

PDAPIExecutionHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Expand Down
2 changes: 1 addition & 1 deletion planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic
logutil.BgLogger().Warn("MPP store fail ttl is invalid", zap.Error(err))
ttl = 30 * time.Second
}
metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, e.ctx.GetSessionVars().MPPStoreLastFailTime, ttl)
metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, ttl)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
3 changes: 0 additions & 3 deletions sessionctx/sessionstates/session_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package sessionstates

import (
"time"

"github.com/pingcap/tidb/errno"
ptypes "github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/sessionctx/stmtctx"
Expand Down Expand Up @@ -79,7 +77,6 @@ type SessionStates struct {
FoundInPlanCache bool `json:"in-plan-cache,omitempty"`
FoundInBinding bool `json:"in-binding,omitempty"`
SequenceLatestValues map[int64]int64 `json:"seq-values,omitempty"`
MPPStoreLastFailTime map[string]time.Time `json:"store-fail-time,omitempty"`
LastAffectedRows int64 `json:"affected-rows,omitempty"`
LastInsertID uint64 `json:"last-insert-id,omitempty"`
Warnings []stmtctx.SQLWarn `json:"warnings,omitempty"`
Expand Down
19 changes: 0 additions & 19 deletions sessionctx/sessionstates/session_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
Expand Down Expand Up @@ -378,23 +376,6 @@ func TestSessionCtx(t *testing.T) {
tk.MustQuery("select nextval(test.s)").Check(testkit.Rows("2"))
},
},
{
// check MPPStoreLastFailTime
setFunc: func(tk *testkit.TestKit) any {
m := sync.Map{}
m.Store("store1", time.Now())
tk.Session().GetSessionVars().MPPStoreLastFailTime = &m
return tk.Session().GetSessionVars().MPPStoreLastFailTime
},
checkFunc: func(tk *testkit.TestKit, param any) {
failTime := tk.Session().GetSessionVars().MPPStoreLastFailTime
tm, ok := failTime.Load("store1")
require.True(t, ok)
v, ok := (param.(*sync.Map)).Load("store1")
require.True(t, ok)
require.True(t, tm.(time.Time).Equal(v.(time.Time)))
},
},
{
// check FoundInPlanCache
setFunc: func(tk *testkit.TestKit) any {
Expand Down
13 changes: 0 additions & 13 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,9 +1171,6 @@ type SessionVars struct {
// TemporaryTableData stores committed kv values for temporary table for current session.
TemporaryTableData TemporaryTableData

// MPPStoreLastFailTime records the lastest fail time that a TiFlash store failed. It maps store address(string) to fail time(time.Time).
MPPStoreLastFailTime *sync.Map

// MPPStoreFailTTL indicates the duration that protect TiDB from sending task to a new recovered TiFlash.
MPPStoreFailTTL string

Expand Down Expand Up @@ -1713,7 +1710,6 @@ func NewSessionVars(hctx HookContext) *SessionVars {
AllowFallbackToTiKV: make(map[kv.StoreType]struct{}),
CTEMaxRecursionDepth: DefCTEMaxRecursionDepth,
TMPTableSize: DefTiDBTmpTableMaxSize,
MPPStoreLastFailTime: new(sync.Map),
MPPStoreFailTTL: DefTiDBMPPStoreFailTTL,
Rng: mathutil.NewWithTime(),
StatsLoadSyncWait: StatsLoadSyncWait.Load(),
Expand Down Expand Up @@ -2336,12 +2332,6 @@ func (s *SessionVars) EncodeSessionStates(ctx context.Context, sessionStates *se
}
sessionStates.LastFoundRows = s.LastFoundRows
sessionStates.SequenceLatestValues = s.SequenceState.GetAllStates()
sessionStates.MPPStoreLastFailTime = make(map[string]time.Time, 0)
s.MPPStoreLastFailTime.Range(
func(key, value interface{}) bool {
sessionStates.MPPStoreLastFailTime[key.(string)] = value.(time.Time)
return true
})
sessionStates.FoundInPlanCache = s.PrevFoundInPlanCache
sessionStates.FoundInBinding = s.PrevFoundInBinding

Expand Down Expand Up @@ -2377,9 +2367,6 @@ func (s *SessionVars) DecodeSessionStates(ctx context.Context, sessionStates *se
}
s.LastFoundRows = sessionStates.LastFoundRows
s.SequenceState.SetAllStates(sessionStates.SequenceLatestValues)
for k, v := range sessionStates.MPPStoreLastFailTime {
s.MPPStoreLastFailTime.Store(k, v)
}
s.FoundInPlanCache = sessionStates.FoundInPlanCache
s.FoundInBinding = sessionStates.FoundInBinding

Expand Down
5 changes: 5 additions & 0 deletions store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"coprocessor_cache.go",
"key_ranges.go",
"mpp.go",
"mpp_probe.go",
"region_cache.go",
"store.go",
],
Expand Down Expand Up @@ -66,6 +67,7 @@ go_test(
"coprocessor_test.go",
"key_ranges_test.go",
"main_test.go",
"mpp_probe_test.go",
],
embed = [":copr"],
flaky = True,
Expand All @@ -75,12 +77,15 @@ go_test(
"//store/driver/backoff",
"//testkit/testsetup",
"//util/paging",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_stathat_consistent//:consistent",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@org_uber_go_goleak//:goleak",
],
)
71 changes: 22 additions & 49 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/log"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -295,12 +294,11 @@ func balanceBatchCopTaskWithContinuity(storeTaskMap map[uint64]*batchCopTask, ca
//
// The second balance strategy: Not only consider the region count between TiFlash stores, but also try to make the regions' range continuous(stored in TiFlash closely).
// If balanceWithContinuity is true, the second balance strategy is enable.
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, mppStoreLastFailTime *sync.Map, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
if len(originalTasks) == 0 {
log.Info("Batch cop task balancer got an empty task set.")
return originalTasks
}
isMPP := mppStoreLastFailTime != nil
// for mpp, we still need to detect the store availability
if len(originalTasks) <= 1 && !isMPP {
return originalTasks
Expand Down Expand Up @@ -331,45 +329,21 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
var wg sync.WaitGroup
var mu sync.Mutex
wg.Add(len(stores))
cur := time.Now()
for i := range stores {
go func(idx int) {
defer wg.Done()
s := stores[idx]

var lastAny any
var ok bool
mu.Lock()
if lastAny, ok = mppStoreLastFailTime.Load(s.GetAddr()); ok && cur.Sub(lastAny.(time.Time)) < 100*time.Millisecond {
// The interval time is so short that may happen in a same query, so we needn't to check again.
mu.Unlock()
return
} else if !ok {
lastAny = time.Time{}
}
mu.Unlock()

resp, err := kvStore.GetTiKVClient().SendRequest(ctx, s.GetAddr(), &tikvrpc.Request{
Type: tikvrpc.CmdMPPAlive,
StoreTp: tikvrpc.TiFlash,
Req: &mpp.IsAliveRequest{},
Context: kvrpcpb.Context{},
}, 2*time.Second)

if err != nil || !resp.Resp.(*mpp.IsAliveResponse).Available {
errMsg := "store not ready to serve"
if err != nil {
errMsg = err.Error()
}
logutil.BgLogger().Warn("Store is not ready", zap.String("store address", s.GetAddr()), zap.String("err message", errMsg))
mu.Lock()
mppStoreLastFailTime.Store(s.GetAddr(), time.Now())
mu.Unlock()
// check if store is failed already.
ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl)
if !ok {
return
}

if cur.Sub(lastAny.(time.Time)) < ttl {
logutil.BgLogger().Warn("Cannot detect store's availability because the current time has not reached MPPStoreLastFailTime + MPPStoreFailTTL", zap.String("store address", s.GetAddr()), zap.Time("last fail time", lastAny.(time.Time)))
tikvClient := kvStore.GetTiKVClient()
ok = detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit)
if !ok {
GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient)
return
}

Expand Down Expand Up @@ -534,29 +508,29 @@ func buildBatchCopTasksForNonPartitionedTable(bo *backoff.Backoffer,
store *kvStore,
ranges *KeyRanges,
storeType kv.StoreType,
mppStoreLastFailTime *sync.Map,
isMPP bool,
ttl time.Duration,
balanceWithContinuity bool,
balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
if config.GetGlobalConfig().DisaggregatedTiFlash {
return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount)
return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
}
return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount)
return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
}

func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer,
store *kvStore,
rangesForEachPhysicalTable []*KeyRanges,
storeType kv.StoreType,
mppStoreLastFailTime *sync.Map,
isMPP bool,
ttl time.Duration,
balanceWithContinuity bool,
balanceContinuousRegionCount int64,
partitionIDs []int64) (batchTasks []*batchCopTask, err error) {
if config.GetGlobalConfig().DisaggregatedTiFlash {
batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount)
batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
} else {
batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount)
batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
}
if err != nil {
return nil, err
Expand All @@ -566,8 +540,8 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer,
return batchTasks, nil
}

func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, mppStoreLastFailTime *sync.Map, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount)
func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -614,7 +588,7 @@ func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, ran
// When `partitionIDs != nil`, it means that buildBatchCopTasksCore is constructing a batch cop tasks for PartitionTableScan.
// At this time, `len(rangesForEachPhysicalTable) == len(partitionIDs)` and `rangesForEachPhysicalTable[i]` is for partition `partitionIDs[i]`.
// Otherwise, `rangesForEachPhysicalTable[0]` indicates the range for the single physical table.
func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, mppStoreLastFailTime *sync.Map, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
cache := store.GetRegionCache()
start := time.Now()
const cmdType = tikvrpc.CmdBatchCop
Expand Down Expand Up @@ -644,7 +618,6 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach

storeTaskMap := make(map[string]*batchCopTask)
needRetry := false
isMPP := mppStoreLastFailTime != nil
for _, task := range tasks {
rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, isMPP)
if err != nil {
Expand Down Expand Up @@ -696,7 +669,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
logutil.BgLogger().Debug(msg)
}
balanceStart := time.Now()
batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount)
batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
balanceElapsed := time.Since(balanceStart)
if log.GetLevel() <= zap.DebugLevel {
msg := "After region balance:"
Expand Down Expand Up @@ -762,11 +735,11 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V
keyRanges = append(keyRanges, NewKeyRanges(pi.KeyRanges))
partitionIDs = append(partitionIDs, pi.ID)
}
tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store.kvStore, keyRanges, req.StoreType, nil, 0, false, 0, partitionIDs)
tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store.kvStore, keyRanges, req.StoreType, false, 0, false, 0, partitionIDs)
} else {
// TODO: merge the if branch.
ranges := NewKeyRanges(req.KeyRanges.FirstPartitionRange())
tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store.kvStore, ranges, req.StoreType, nil, 0, false, 0)
tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store.kvStore, ranges, req.StoreType, false, 0, false, 0)
}

if err != nil {
Expand Down Expand Up @@ -913,7 +886,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Ba
ranges = append(ranges, *ran)
})
}
ret, err := buildBatchCopTasksForNonPartitionedTable(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, nil, 0, false, 0)
ret, err := buildBatchCopTasksForNonPartitionedTable(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, false, 0, false, 0)
return ret, err
}
// Retry Partition Table Scan
Expand All @@ -932,7 +905,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Ba
}
keyRanges = append(keyRanges, NewKeyRanges(ranges))
}
ret, err := buildBatchCopTasksForPartitionedTable(bo, b.store, keyRanges, b.req.StoreType, nil, 0, false, 0, pid)
ret, err := buildBatchCopTasksForPartitionedTable(bo, b.store, keyRanges, b.req.StoreType, false, 0, false, 0, pid)
return ret, err
}

Expand Down
4 changes: 2 additions & 2 deletions store/copr/batch_coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ func TestBalanceBatchCopTaskWithContinuity(t *testing.T) {
func TestBalanceBatchCopTaskWithEmptyTaskSet(t *testing.T) {
{
var nilTaskSet []*batchCopTask
nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, nil, time.Second, false, 0)
nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, false, time.Second, false, 0)
require.True(t, nilResult == nil)
}

{
emptyTaskSet := make([]*batchCopTask, 0)
emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, nil, time.Second, false, 0)
emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, false, time.Second, false, 0)
require.True(t, emptyResult != nil)
require.True(t, len(emptyResult) == 0)
}
Expand Down
Loading