Skip to content

Commit

Permalink
planner, executor: support inline projection for TopN (#58500)
Browse files Browse the repository at this point in the history
ref #54245
  • Loading branch information
EricZequan authored Jan 6, 2025
1 parent e79d22f commit 911d5a1
Show file tree
Hide file tree
Showing 27 changed files with 1,007 additions and 448 deletions.
134 changes: 134 additions & 0 deletions pkg/executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,140 @@ func BenchmarkLimitExec(b *testing.B) {
}
}

type topNTestCase struct {
rows int
offset int
count int
orderByIdx []int
usingInlineProjection bool
columnIdxsUsedByChild []bool
ctx sessionctx.Context
}

func (tc topNTestCase) columns() []*expression.Column {
return []*expression.Column{
{Index: 0, RetType: types.NewFieldType(mysql.TypeLonglong)},
{Index: 1, RetType: types.NewFieldType(mysql.TypeLonglong)},
{Index: 2, RetType: types.NewFieldType(mysql.TypeLonglong)},
}
}

func (tc topNTestCase) String() string {
return fmt.Sprintf("(rows:%v, offset:%v, count:%v, orderByIdx:%v, inline_projection:%v)",
tc.rows, tc.offset, tc.count, tc.orderByIdx, tc.usingInlineProjection)
}

func defaultTopNTestCase() *topNTestCase {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker(-1, -1)
return &topNTestCase{
rows: 100000,
offset: 0,
count: 10,
orderByIdx: []int{0},
usingInlineProjection: false,
columnIdxsUsedByChild: []bool{false, true, false},
ctx: ctx,
}
}

func benchmarkTopNExec(b *testing.B, cas *topNTestCase) {
opt := testutil.MockDataSourceParameters{
DataSchema: expression.NewSchema(cas.columns()...),
Rows: cas.rows,
Ctx: cas.ctx,
}
dataSource := testutil.BuildMockDataSource(opt)
executorSort := sortexec.SortExec{
BaseExecutor: exec.NewBaseExecutor(cas.ctx, dataSource.Schema(), 4, dataSource),
ByItems: make([]*util.ByItems, 0, len(cas.orderByIdx)),
ExecSchema: dataSource.Schema(),
}
for _, idx := range cas.orderByIdx {
executorSort.ByItems = append(executorSort.ByItems, &util.ByItems{Expr: cas.columns()[idx]})
}

executor := &sortexec.TopNExec{
SortExec: executorSort,
Limit: &core.PhysicalLimit{
Count: uint64(cas.count),
Offset: uint64(cas.offset),
},
}

executor.ExecSchema = dataSource.Schema().Clone()

var exe exec.Executor
if cas.usingInlineProjection {
if len(cas.columnIdxsUsedByChild) > 0 {
executor.ColumnIdxsUsedByChild = make([]int, 0, len(cas.columnIdxsUsedByChild))
for i, used := range cas.columnIdxsUsedByChild {
if used {
executor.ColumnIdxsUsedByChild = append(executor.ColumnIdxsUsedByChild, i)
}
}
}
exe = executor
} else {
columns := cas.columns()
usedCols := make([]*expression.Column, 0, len(columns))
exprs := make([]expression.Expression, 0, len(columns))
for i, used := range cas.columnIdxsUsedByChild {
if used {
usedCols = append(usedCols, columns[i])
exprs = append(exprs, columns[i])
}
}
proj := &ProjectionExec{
BaseExecutorV2: exec.NewBaseExecutorV2(cas.ctx.GetSessionVars(), expression.NewSchema(usedCols...), 0, executor),
numWorkers: 1,
evaluatorSuit: expression.NewEvaluatorSuite(exprs, false),
}
exe = proj
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
tmpCtx := context.Background()
chk := exec.NewFirstChunk(exe)
dataSource.PrepareChunks()

b.StartTimer()
if err := exe.Open(tmpCtx); err != nil {
b.Fatal(err)
}
for {
if err := exe.Next(tmpCtx, chk); err != nil {
b.Fatal(err)
}
if chk.NumRows() == 0 {
break
}
}

if err := exe.Close(); err != nil {
b.Fatal(err)
}
b.StopTimer()
}
}

func BenchmarkTopNExec(b *testing.B) {
b.ReportAllocs()
usingInlineProjection := []bool{false, true}

for _, inlineProjection := range usingInlineProjection {
cas := defaultTopNTestCase()
cas.usingInlineProjection = inlineProjection
b.Run(fmt.Sprintf("TopNExec InlineProjection:%v", inlineProjection), func(b *testing.B) {
benchmarkTopNExec(b, cas)
})
}
}

func BenchmarkReadLastLinesOfHugeLine(b *testing.B) {
// step 1. initial a huge line log file
hugeLine := make([]byte, 1024*1024*10)
Expand Down
44 changes: 43 additions & 1 deletion pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2491,11 +2491,27 @@ func (b *executorBuilder) buildTopN(v *plannercore.PhysicalTopN) exec.Executor {
ExecSchema: v.Schema(),
}
executor_metrics.ExecutorCounterTopNExec.Inc()
return &sortexec.TopNExec{
t := &sortexec.TopNExec{
SortExec: sortExec,
Limit: &plannercore.PhysicalLimit{Count: v.Count, Offset: v.Offset},
Concurrency: b.ctx.GetSessionVars().Concurrency.ExecutorConcurrency,
}
columnIdxsUsedByChild, columnMissing := retrieveColumnIdxsUsedByChild(v.Schema(), v.Children()[0].Schema())
if columnIdxsUsedByChild != nil && columnMissing {
// In the expected cases colMissing will never happen.
// However, suppose that childSchema contains generatedCol and is cloned by selfSchema.
// Then childSchema.generatedCol.UniqueID will not be equal to selfSchema.generatedCol.UniqueID.
// In this case, colMissing occurs, but it is not wrong.
// So here we cancel the inline projection, take all of columns from child.
// If the inline projection directly generates some error causes colMissing,
// notice that the error feedback given would be inaccurate.
columnIdxsUsedByChild = nil
// TODO: If there is valid verification logic, please uncomment the following code
// b.err = errors.Annotate(ErrBuildExecutor, "Inline projection occurs when `buildTopN` exectutor, columns should not missing in the child schema")
// return nil
}
t.ColumnIdxsUsedByChild = columnIdxsUsedByChild
return t
}

func (b *executorBuilder) buildApply(v *plannercore.PhysicalApply) exec.Executor {
Expand Down Expand Up @@ -3181,6 +3197,32 @@ func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) exec.Executor {
return e
}

// retrieveColumnIdxsUsedByChild retrieve column indices map from child physical plan schema columns.
//
// E.g. columnIdxsUsedByChild = [2, 3, 1] means child[col2, col3, col1] -> parent[col0, col1, col2].
// `columnMissing` indicates whether one or more columns in `selfSchema` are not found in `childSchema`.
// And `-1` in `columnIdxsUsedByChild` indicates the column not found.
// If columnIdxsUsedByChild == nil, means selfSchema and childSchema are equal.
func retrieveColumnIdxsUsedByChild(selfSchema *expression.Schema, childSchema *expression.Schema) ([]int, bool) {
equalSchema := (selfSchema.Len() == childSchema.Len())
columnMissing := false
columnIdxsUsedByChild := make([]int, 0, selfSchema.Len())
for selfIdx, selfCol := range selfSchema.Columns {
colIdxInChild := childSchema.ColumnIndex(selfCol)
if !columnMissing && colIdxInChild == -1 {
columnMissing = true
}
if equalSchema && selfIdx != colIdxInChild {
equalSchema = false
}
columnIdxsUsedByChild = append(columnIdxsUsedByChild, colIdxInChild)
}
if equalSchema {
columnIdxsUsedByChild = nil
}
return columnIdxsUsedByChild, columnMissing
}

// markChildrenUsedCols compares each child with the output schema, and mark
// each column of the child is used by output or not.
func markChildrenUsedCols(outputCols []*expression.Column, childSchemas ...*expression.Schema) (childrenUsed [][]int) {
Expand Down
10 changes: 9 additions & 1 deletion pkg/executor/sortexec/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type TopNExec struct {
isSpillTriggeredInStage2ForTest bool

Concurrency int

// ColumnIdxsUsedByChild keep column indexes of child executor used for inline projection
ColumnIdxsUsedByChild []int
}

// Open implements the Executor Open interface.
Expand Down Expand Up @@ -240,7 +243,12 @@ func (e *TopNExec) Next(ctx context.Context, req *chunk.Chunk) error {
if !ok || row.err != nil {
return row.err
}
req.AppendRow(row.row)
// Be careful, if inline projection occurs.
// TopN's schema may be not match child executor's output columns.
// We should extract only the required columns from child's executor.
// Do not do it on `loadChunksUntilTotalLimit` or `processChildChk`,
// cauz it may destroy the correctness of executor's `keyColumns`.
req.AppendRowsByColIdxs([]chunk.Row{row.row}, e.ColumnIdxsUsedByChild)
}
}
return nil
Expand Down
14 changes: 12 additions & 2 deletions pkg/executor/sortexec/topn_chunk_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ type topNChunkHeap struct {
func (h *topNChunkHeap) init(topnExec *TopNExec, memTracker *memory.Tracker, totalLimit uint64, idx int, greaterRow func(chunk.Row, chunk.Row) bool, fieldTypes []*types.FieldType) {
h.memTracker = memTracker

h.rowChunks = chunk.NewList(exec.RetTypes(topnExec), topnExec.InitCap(), topnExec.MaxChunkSize())
// The schema of TopN keep same with its children without inline projection. After inline projection, TopN will have its own schema,
// so TopN can not be used to construct chunks, but children information needs to be used instead.
// Row size of new chunk list may not be enough to hold the result set from child executor when inline projection occurs.
// To avoid this problem, we use child executor's schmea to build new chunk list by default.
ch := topnExec.Children(0)
h.rowChunks = chunk.NewList(exec.RetTypes(ch), ch.InitCap(), ch.MaxChunkSize())
h.rowChunks.GetMemTracker().AttachTo(h.memTracker)
h.rowChunks.GetMemTracker().SetLabel(memory.LabelForRowChunks)

Expand Down Expand Up @@ -112,7 +117,12 @@ func (h *topNChunkHeap) processChk(chk *chunk.Chunk) {
// but we want descending top N, then we will keep all data in memory.
// But if data is distributed randomly, this function will be called log(n) times.
func (h *topNChunkHeap) doCompaction(topnExec *TopNExec) error {
newRowChunks := chunk.NewList(exec.RetTypes(topnExec), topnExec.InitCap(), topnExec.MaxChunkSize())
// The schema of TopN keep same with its children without inline projection. After inline projection, TopN will have its own schema,
// so TopN can not be used to construct chunks, but children information needs to be used instead.
// Row size of new chunk list may not be enough to hold the result set from child executor when inline projection occurs.
// To avoid this problem, we use child executor's schmea to build new chunk list by default.
ch := topnExec.Children(0)
newRowChunks := chunk.NewList(exec.RetTypes(ch), ch.InitCap(), ch.MaxChunkSize())
newRowPtrs := make([]chunk.RowPtr, 0, h.rowChunks.Len())
for _, rowPtr := range h.rowPtrs {
newRowPtr := newRowChunks.AppendRow(h.rowChunks.GetRow(rowPtr))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,11 @@
"explain format = 'brief' SELECT a FROM t WHERE b = 2 and c > 0 ORDER BY a limit 1"
],
"Plan": [
"Projection 1.00 root test.t.a",
"└─TopN 1.00 root test.t.a, offset:0, count:1",
" └─IndexReader 1.00 root index:TopN",
" └─TopN 1.00 cop[tikv] test.t.a, offset:0, count:1",
" └─Selection 6.00 cop[tikv] gt(test.t.c, 0)",
" └─IndexRangeScan 6.00 cop[tikv] table:t, index:idx(b, d, a, c) range:[2,2], keep order:false"
"TopN 1.00 root test.t.a, offset:0, count:1",
"└─IndexReader 1.00 root index:TopN",
" └─TopN 1.00 cop[tikv] test.t.a, offset:0, count:1",
" └─Selection 6.00 cop[tikv] gt(test.t.c, 0)",
" └─IndexRangeScan 6.00 cop[tikv] table:t, index:idx(b, d, a, c) range:[2,2], keep order:false"
]
}
]
Expand Down
22 changes: 11 additions & 11 deletions pkg/planner/core/casetest/dag/testdata/plan_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@
},
{
"SQL": "select c from t where t.c = 1 and t.e = 1 order by t.d limit 1",
"Best": "IndexReader(Index(t.c_d_e)[[1,1]]->Sel([eq(test.t.e, 1)])->Limit)->Limit->Projection"
"Best": "IndexReader(Index(t.c_d_e)[[1,1]]->Sel([eq(test.t.e, 1)])->Limit)->Limit"
},
{
"SQL": "select c from t order by t.a limit 1",
"Best": "TableReader(Table(t)->Limit)->Limit->Projection"
"Best": "TableReader(Table(t)->Limit)->Limit"
},
{
"SQL": "select c from t order by t.a + t.b limit 1",
"Best": "TableReader(Table(t)->TopN([plus(test.t.a, test.t.b)],0,1))->Projection->TopN([Column#14],0,1)->Projection->Projection"
"Best": "TableReader(Table(t)->TopN([plus(test.t.a, test.t.b)],0,1))->Projection->TopN([Column#14],0,1)->Projection"
},
{
"SQL": "select c from t limit 1",
Expand Down Expand Up @@ -92,11 +92,11 @@
},
{
"SQL": "select c from t where t.c = 1 and t.a > 1 order by t.d limit 1",
"Best": "IndexReader(Index(t.c_d_e)[[1,1]]->Sel([gt(test.t.a, 1)])->Limit)->Limit->Projection"
"Best": "IndexReader(Index(t.c_d_e)[[1,1]]->Sel([gt(test.t.a, 1)])->Limit)->Limit"
},
{
"SQL": "select c from t where t.c = 1 and t.d = 1 order by t.a limit 1",
"Best": "IndexReader(Index(t.c_d_e)[[1 1,1 1]]->TopN([test.t.a],0,1))->TopN([test.t.a],0,1)->Projection"
"Best": "IndexReader(Index(t.c_d_e)[[1 1,1 1]]->TopN([test.t.a],0,1))->TopN([test.t.a],0,1)"
},
{
"SQL": "select * from t where t.c = 1 and t.a > 1 order by t.d limit 1",
Expand Down Expand Up @@ -451,7 +451,7 @@
},
{
"SQL": "delete from t where b < 1 order by d limit 1",
"Best": "TableReader(Table(t)->Sel([lt(test.t.b, 1)])->TopN([test.t.d],0,1))->TopN([test.t.d],0,1)->Projection->Delete",
"Best": "TableReader(Table(t)->Sel([lt(test.t.b, 1)])->TopN([test.t.d],0,1))->TopN([test.t.d],0,1)->Delete",
"Hints": "use_index(@`del_1` `test`.`t` ), no_order_index(@`del_1` `test`.`t` `primary`), limit_to_cop(@`del_1`)"
},
{
Expand Down Expand Up @@ -626,7 +626,7 @@
},
{
"SQL": "select count(*) from t group by g order by g limit 10",
"Best": "IndexReader(Index(t.g)[[NULL,+inf]]->StreamAgg)->StreamAgg->Limit->Projection"
"Best": "IndexReader(Index(t.g)[[NULL,+inf]]->StreamAgg)->StreamAgg->Limit"
},
{
"SQL": "select count(*) from t group by g limit 10",
Expand All @@ -638,11 +638,11 @@
},
{
"SQL": "select count(*) from t group by g order by g desc limit 1",
"Best": "IndexReader(Index(t.g)[[NULL,+inf]]->StreamAgg)->StreamAgg->Limit->Projection"
"Best": "IndexReader(Index(t.g)[[NULL,+inf]]->StreamAgg)->StreamAgg->Limit"
},
{
"SQL": "select count(*) from t group by b order by b limit 10",
"Best": "TableReader(Table(t)->HashAgg)->HashAgg->TopN([test.t.b],0,10)->Projection"
"Best": "TableReader(Table(t)->HashAgg)->HashAgg->TopN([test.t.b],0,10)"
},
{
"SQL": "select count(*) from t group by b order by b",
Expand All @@ -658,11 +658,11 @@
},
{
"SQL": "select /*+ tidb_inlj(a,b) */ sum(a.g), sum(b.g) from t a join t b on a.g = b.g and a.g > 60 group by a.g order by a.g limit 1",
"Best": "IndexJoin{IndexReader(Index(t.g)[(60,+inf]])->IndexReader(Index(t.g)[[NULL,NULL]]->Sel([gt(test.t.g, 60)]))}(test.t.g,test.t.g)->Projection->StreamAgg->Limit->Projection"
"Best": "IndexJoin{IndexReader(Index(t.g)[(60,+inf]])->IndexReader(Index(t.g)[[NULL,NULL]]->Sel([gt(test.t.g, 60)]))}(test.t.g,test.t.g)->Projection->StreamAgg->Limit"
},
{
"SQL": "select sum(a.g), sum(b.g) from t a join t b on a.g = b.g and a.a>5 group by a.g order by a.g limit 1",
"Best": "MergeInnerJoin{IndexReader(Index(t.g)[[NULL,+inf]]->Sel([gt(test.t.a, 5)]))->IndexReader(Index(t.g)[[NULL,+inf]])}(test.t.g,test.t.g)->Projection->StreamAgg->Limit->Projection"
"Best": "MergeInnerJoin{IndexReader(Index(t.g)[[NULL,+inf]]->Sel([gt(test.t.a, 5)]))->IndexReader(Index(t.g)[[NULL,+inf]])}(test.t.g,test.t.g)->Projection->StreamAgg->Limit"
},
{
"SQL": "select sum(d) from t",
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/casetest/vectorsearch/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
data = glob(["testdata/**"]),
flaky = True,
shard_count = 7,
shard_count = 8,
deps = [
"//pkg/config",
"//pkg/domain",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,33 @@
"explain select id, vec_cosine_distance(vec, '[1,1,1]') as d, a, b from t1 order by d limit 10",
"explain select id, a, b, vec_cosine_distance(vec, '[1,1,1]') as d from t1 order by d limit 10"
]
},
{
"name": "TestVectorSearchHeavyFunction",
"cases": [
"explain select id from t1 order by vec_cosine_distance(vec, '[1,1,1]') limit 10",
"explain select id from t1 order by vec_l1_distance(vec, '[1,1,1]') limit 10",
"explain select id from t1 order by vec_l2_distance(vec, '[1,1,1]') limit 10",
"explain select id from t1 order by vec_negative_inner_product(vec, '[1,1,1]') limit 10",
"explain select id from t1 order by vec_dims(vec) limit 10",
"explain select id from t1 order by vec_l2_norm(vec) limit 10",
"explain select id from t1 order by MOD(a, 3) limit 10",

"explain select id, vec_cosine_distance(vec, '[1,1,1]') as d from t1 order by d limit 10",
"explain select id, vec_l1_distance(vec, '[1,1,1]') as d from t1 order by d limit 10",
"explain select id, vec_l2_distance(vec, '[1,1,1]') as d from t1 order by d limit 10",
"explain select id, vec_negative_inner_product(vec, '[1,1,1]') as d from t1 order by d limit 10",
"explain select id, vec_dims(vec) as d from t1 order by d limit 10",
"explain select id, vec_l2_norm(vec) as d from t1 order by d limit 10",
"explain select id, MOD(a, 3) as d from t1 order by d limit 10",

"explain select * from t1 order by vec_cosine_distance(vec, '[1,1,1]') limit 10",
"explain select * from t1 order by vec_l1_distance(vec, '[1,1,1]') limit 10",
"explain select * from t1 order by vec_l2_distance(vec, '[1,1,1]') limit 10",
"explain select * from t1 order by vec_negative_inner_product(vec, '[1,1,1]') limit 10",
"explain select * from t1 order by vec_dims(vec) limit 10",
"explain select * from t1 order by vec_l2_norm(vec) limit 10",
"explain select * from t1 order by MOD(a, 3) limit 10"
]
}
]
Loading

0 comments on commit 911d5a1

Please sign in to comment.