Skip to content

Commit

Permalink
distsql,executor: using paging protocol for all coprocessor requests (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Jun 1, 2022
1 parent c5393d0 commit c8468a7
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 1 deletion.
3 changes: 3 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
builder.Request.Priority = builder.getKVPriority(sv)
builder.Request.ReplicaRead = replicaReadType
builder.SetResourceGroupTagger(sv.StmtCtx.GetResourceGroupTagger())
if sv.EnablePaging {
builder.SetPaging(true)
}
return builder
}

Expand Down
6 changes: 6 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3216,6 +3216,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
if err != nil {
return nil, err
}
paging := b.ctx.GetSessionVars().EnablePaging
e := &TableReaderExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
dagPB: dagReq,
Expand All @@ -3227,6 +3228,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
desc: ts.Desc,
columns: ts.Columns,
streaming: streaming,
paging: paging,
corColInFilter: b.corColInDistPlan(v.TablePlans),
corColInAccess: b.corColInAccess(v.TablePlans[0]),
plans: v.TablePlans,
Expand Down Expand Up @@ -3489,6 +3491,7 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea
if err != nil {
return nil, err
}
paging := b.ctx.GetSessionVars().EnablePaging
e := &IndexReaderExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
dagPB: dagReq,
Expand All @@ -3502,6 +3505,7 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea
desc: is.Desc,
columns: is.Columns,
streaming: streaming,
paging: paging,
corColInFilter: b.corColInDistPlan(v.IndexPlans),
corColInAccess: b.corColInAccess(v.IndexPlans[0]),
idxCols: is.IdxCols,
Expand Down Expand Up @@ -3815,6 +3819,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
return nil, err
}

paging := b.ctx.GetSessionVars().EnablePaging
e := &IndexMergeReaderExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
dagPBs: partialReqs,
Expand All @@ -3830,6 +3835,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
tblPlans: v.TablePlans,
dataReaderBuilder: readerBuilder,
feedbacks: feedbacks,
paging: paging,
handleCols: ts.HandleCols,
isCorColInPartialFilters: isCorColInPartialFilters,
isCorColInTableFilter: isCorColInTableFilter,
Expand Down
1 change: 1 addition & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ type IndexReaderExecutor struct {

feedback *statistics.QueryFeedback
streaming bool
paging bool

keepOrder bool
desc bool
Expand Down
2 changes: 2 additions & 0 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type IndexMergeReaderExecutor struct {

// memTracker is used to track the memory usage of this executor.
memTracker *memory.Tracker
paging bool

// checkIndexValue is used to check the consistency of the index data.
*checkIndexValue // nolint:unused
Expand Down Expand Up @@ -310,6 +311,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
SetIsStaleness(e.isStaleness).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.memTracker).
SetPaging(e.paging).
SetFromInfoSchema(e.ctx.GetInfoSchema())

for parTblIdx, keyRange := range keyRanges {
Expand Down
6 changes: 5 additions & 1 deletion executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type TableReaderExecutor struct {
keepOrder bool
desc bool
streaming bool
paging bool
storeType kv.StoreType
// corColInFilter tells whether there's correlated column in filter.
corColInFilter bool
Expand Down Expand Up @@ -338,6 +339,7 @@ func (e *TableReaderExecutor) buildKVReqSeparately(ctx context.Context, ranges [
SetFromInfoSchema(e.ctx.GetInfoSchema()).
SetMemTracker(e.memTracker).
SetStoreType(e.storeType).
SetPaging(e.paging).
SetAllowBatchCop(e.batchCop).Build()
if err != nil {
return nil, err
Expand Down Expand Up @@ -376,6 +378,7 @@ func (e *TableReaderExecutor) buildKVReqForPartitionTableScan(ctx context.Contex
SetFromInfoSchema(e.ctx.GetInfoSchema()).
SetMemTracker(e.memTracker).
SetStoreType(e.storeType).
SetPaging(e.paging).
SetAllowBatchCop(e.batchCop).Build()
if err != nil {
return nil, err
Expand Down Expand Up @@ -407,7 +410,8 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R
SetFromInfoSchema(e.ctx.GetInfoSchema()).
SetMemTracker(e.memTracker).
SetStoreType(e.storeType).
SetAllowBatchCop(e.batchCop)
SetAllowBatchCop(e.batchCop).
SetPaging(e.paging)
return reqBuilder.Build()
}

Expand Down

0 comments on commit c8468a7

Please sign in to comment.