Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner, executor: enable inline projection for merge join #15463

Merged
merged 15 commits into from
Apr 9, 2020
101 changes: 58 additions & 43 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ func buildMockDataSourceWithIndex(opt mockDataSourceParameters, index []int) *mo
return buildMockDataSource(opt)
}

// aggTestCase has a fixed schema (aggCol Double, groupBy LongLong).
type aggTestCase struct {
// The test table's schema is fixed (aggCol Double, groupBy LongLong).
execType string // "hash" or "stream"
aggFunc string // sum, avg, count ....
groupByNDV int // the number of distinct group-by keys
Expand Down Expand Up @@ -505,8 +505,8 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
return exec
}

// windowTestCase has a fixed schema (col Double, partitionBy LongLong, rawData VarString(16), col LongLong).
type windowTestCase struct {
// The test table's schema is fixed (col Double, partitionBy LongLong, rawData VarString(16), col LongLong).
windowFunc string
numFunc int // The number of windowFuncs. Default: 1.
frame *core.WindowFrame
Expand Down Expand Up @@ -1341,12 +1341,28 @@ func BenchmarkIndexJoinExec(b *testing.B) {

type mergeJoinTestCase struct {
indexJoinTestCase
childrenUsedSchema [][]bool
}

func prepare4MergeJoin(tc *mergeJoinTestCase, leftExec, rightExec *mockDataSource) *MergeJoinExec {
outerCols, innerCols := tc.columns(), tc.columns()
joinSchema := expression.NewSchema(outerCols...)
joinSchema.Append(innerCols...)

joinSchema := expression.NewSchema()
if tc.childrenUsedSchema != nil {
for i, used := range tc.childrenUsedSchema[0] {
if used {
joinSchema.Append(outerCols[i])
}
}
for i, used := range tc.childrenUsedSchema[1] {
if used {
joinSchema.Append(innerCols[i])
}
}
} else {
joinSchema.Append(outerCols...)
joinSchema.Append(innerCols...)
}

outerJoinKeys := make([]*expression.Column, 0, len(tc.outerJoinKeyIdx))
innerJoinKeys := make([]*expression.Column, 0, len(tc.innerJoinKeyIdx))
Expand All @@ -1370,19 +1386,20 @@ func prepare4MergeJoin(tc *mergeJoinTestCase, leftExec, rightExec *mockDataSourc
stmtCtx: tc.ctx.GetSessionVars().StmtCtx,
baseExecutor: newBaseExecutor(tc.ctx, joinSchema, stringutil.StringerStr("MergeJoin"), leftExec, rightExec),
compareFuncs: compareFuncs,
joiner: newJoiner(
tc.ctx,
0,
false,
defaultValues,
nil,
retTypes(leftExec),
retTypes(rightExec),
nil,
),
isOuterJoin: false,
isOuterJoin: false,
}

e.joiner = newJoiner(
tc.ctx,
0,
false,
defaultValues,
nil,
retTypes(leftExec),
retTypes(rightExec),
tc.childrenUsedSchema,
)

e.innerTable = &mergeJoinTable{
isInner: true,
childIndex: 1,
Expand All @@ -1399,7 +1416,7 @@ func prepare4MergeJoin(tc *mergeJoinTestCase, leftExec, rightExec *mockDataSourc
}

func defaultMergeJoinTestCase() *mergeJoinTestCase {
return &mergeJoinTestCase{*defaultIndexJoinTestCase()}
return &mergeJoinTestCase{*defaultIndexJoinTestCase(), nil}
}

func newMergeJoinBenchmark(numOuterRows, numInnerDup, numInnerRedundant int) (tc *mergeJoinTestCase, innerDS, outerDS *mockDataSource) {
Expand All @@ -1421,7 +1438,7 @@ func newMergeJoinBenchmark(numOuterRows, numInnerDup, numInnerRedundant int) (tc
innerIdx: []int{0, 1},
rawData: wideString,
}
tc = &mergeJoinTestCase{*itc}
tc = &mergeJoinTestCase{*itc, nil}
outerOpt := mockDataSourceParameters{
schema: expression.NewSchema(tc.columns()...),
rows: numOuterRows,
Expand Down Expand Up @@ -1514,37 +1531,35 @@ func BenchmarkMergeJoinExec(b *testing.B) {

totalRows := 300000

{
numInnerDup := 1
tc, innerDS, outerDS := newMergeJoinBenchmark(totalRows/numInnerDup, numInnerDup, 0)
b.Run(fmt.Sprintf("merge join %v", tc), func(b *testing.B) {
benchmarkMergeJoinExecWithCase(b, tc, outerDS, innerDS, innerMergeJoin)
})
innerDupAndRedundant := [][]int{
{1, 0},
{100, 0},
{10000, 0},
{1, 30000},
}

{
numInnerDup := 100
tc, innerDS, outerDS := newMergeJoinBenchmark(totalRows/numInnerDup, numInnerDup, 0)
b.Run(fmt.Sprintf("merge join %v", tc), func(b *testing.B) {
benchmarkMergeJoinExecWithCase(b, tc, outerDS, innerDS, innerMergeJoin)
})
childrenUsedSchemas := [][][]bool{
nil,
{
{true, false, false},
{false, true, false},
},
}

{
numInnerDup := 10000
tc, innerDS, outerDS := newMergeJoinBenchmark(totalRows/numInnerDup, numInnerDup, 0)
b.Run(fmt.Sprintf("merge join %v", tc), func(b *testing.B) {
benchmarkMergeJoinExecWithCase(b, tc, outerDS, innerDS, innerMergeJoin)
})
}
for _, params := range innerDupAndRedundant {
numInnerDup, numInnerRedundant := params[0], params[1]
for _, childrenUsedSchema := range childrenUsedSchemas {
tc, innerDS, outerDS := newMergeJoinBenchmark(totalRows/numInnerDup, numInnerDup, numInnerRedundant)
inlineProj := false
if childrenUsedSchema != nil {
inlineProj = true
tc.childrenUsedSchema = childrenUsedSchema
}

{
numInnerDup := 1
numInnerRedundant := 30000
tc, innerDS, outerDS := newMergeJoinBenchmark(totalRows/numInnerDup, numInnerDup, numInnerRedundant)
b.Run(fmt.Sprintf("merge join %v", tc), func(b *testing.B) {
benchmarkMergeJoinExecWithCase(b, tc, outerDS, innerDS, innerMergeJoin)
})
b.Run(fmt.Sprintf("merge join %v InlineProj:%v", tc, inlineProj), func(b *testing.B) {
benchmarkMergeJoinExecWithCase(b, tc, outerDS, innerDS, innerMergeJoin)
})
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Execu
v.OtherConditions,
retTypes(leftExec),
retTypes(rightExec),
nil,
markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema()),
),
isOuterJoin: v.JoinType.IsOuterJoin(),
desc: v.Desc,
Expand Down
4 changes: 4 additions & 0 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,10 @@ func (mp *mockPlan) GetExecutor() Executor {
return mp.exec
}

func (mp *mockPlan) Schema() *expression.Schema {
SunRunAway marked this conversation as resolved.
Show resolved Hide resolved
return mp.exec.Schema()
}

func (s *testExecSuite) TestVecGroupCheckerDATARACE(c *C) {
ctx := mock.NewContext()

Expand Down
30 changes: 14 additions & 16 deletions executor/merge_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,12 +387,11 @@ func (s *testSuite2) TestMergeJoin(c *C) {
tk.MustExec("create table s(a int, b int)")
tk.MustExec("insert into s values(1,1)")
tk.MustQuery("explain select /*+ TIDB_SMJ(t, s) */ a in (select a from s where s.b >= t.b) from t").Check(testkit.Rows(
"Projection_7 10000.00 root Column#7",
"└─MergeJoin_8 10000.00 root left outer semi join, other cond:eq(test.t.a, test.s.a), ge(test.s.b, test.t.b)",
" ├─TableReader_12(Build) 10000.00 root data:TableFullScan_11",
" │ └─TableFullScan_11 10000.00 cop[tikv] table:s keep order:false, stats:pseudo",
" └─TableReader_10(Probe) 10000.00 root data:TableFullScan_9",
" └─TableFullScan_9 10000.00 cop[tikv] table:t keep order:false, stats:pseudo",
"MergeJoin_8 10000.00 root left outer semi join, other cond:eq(test.t.a, test.s.a), ge(test.s.b, test.t.b)",
"├─TableReader_12(Build) 10000.00 root data:TableFullScan_11",
"│ └─TableFullScan_11 10000.00 cop[tikv] table:s keep order:false, stats:pseudo",
"└─TableReader_10(Probe) 10000.00 root data:TableFullScan_9",
" └─TableFullScan_9 10000.00 cop[tikv] table:t keep order:false, stats:pseudo",
))
tk.MustQuery("select /*+ TIDB_SMJ(t, s) */ a in (select a from s where s.b >= t.b) from t").Check(testkit.Rows(
"1",
Expand Down Expand Up @@ -420,16 +419,15 @@ func (s *testSuite2) TestMergeJoin(c *C) {
tk.MustExec("create table s (a int)")
tk.MustExec("insert into s values (4), (1), (3), (2)")
tk.MustQuery("explain select s1.a1 from (select a as a1 from s order by s.a desc) as s1 join (select a as a2 from s order by s.a desc) as s2 on s1.a1 = s2.a2 order by s1.a1 desc").Check(testkit.Rows(
"Projection_27 12487.50 root test.s.a",
"└─MergeJoin_28 12487.50 root inner join, left key:test.s.a, right key:test.s.a",
" ├─Sort_31(Build) 9990.00 root test.s.a:desc",
" │ └─TableReader_26 9990.00 root data:Selection_25",
" │ └─Selection_25 9990.00 cop[tikv] not(isnull(test.s.a))",
" │ └─TableFullScan_24 10000.00 cop[tikv] table:s keep order:false, stats:pseudo",
" └─Sort_29(Probe) 9990.00 root test.s.a:desc",
" └─TableReader_21 9990.00 root data:Selection_20",
" └─Selection_20 9990.00 cop[tikv] not(isnull(test.s.a))",
" └─TableFullScan_19 10000.00 cop[tikv] table:s keep order:false, stats:pseudo",
"MergeJoin_28 12487.50 root inner join, left key:test.s.a, right key:test.s.a",
"├─Sort_31(Build) 9990.00 root test.s.a:desc",
"│ └─TableReader_26 9990.00 root data:Selection_25",
"│ └─Selection_25 9990.00 cop[tikv] not(isnull(test.s.a))",
"│ └─TableFullScan_24 10000.00 cop[tikv] table:s keep order:false, stats:pseudo",
"└─Sort_29(Probe) 9990.00 root test.s.a:desc",
" └─TableReader_21 9990.00 root data:Selection_20",
" └─Selection_20 9990.00 cop[tikv] not(isnull(test.s.a))",
" └─TableFullScan_19 10000.00 cop[tikv] table:s keep order:false, stats:pseudo",
))
tk.MustQuery("select s1.a1 from (select a as a1 from s order by s.a desc) as s1 join (select a as a2 from s order by s.a desc) as s2 on s1.a1 = s2.a2 order by s1.a1 desc").Check(testkit.Rows(
"4", "3", "2", "1"))
Expand Down
3 changes: 2 additions & 1 deletion planner/cascades/testdata/integration_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@
"select 1 from (select /*+ HASH_JOIN(t1) */ t1.a not in (select t2.a from t2) from t1) x;", // TODO: should use hash join
"select /*+ INL_JOIN(t1) */ t1.b, t2.b from t1 inner join t2 on t1.a = t2.a;",
"select /*+ INL_HASH_JOIN(t1) */ t1.b, t2.b from t1 inner join t2 on t1.a = t2.a;",
"select /*+ INL_MERGE_JOIN(t1) */ t1.b, t2.b from t1 inner join t2 on t1.a = t2.a;"
"select /*+ INL_MERGE_JOIN(t1) */ t1.b, t2.b from t1 inner join t2 on t1.a = t2.a;",
"select /*+ MERGE_JOIN(t1, t2) */ t1.b, t2.b from t1 inner join t2 on t1.a = t2.a;"
]
}
]
13 changes: 13 additions & 0 deletions planner/cascades/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,19 @@
"Result": [
"1 1"
]
},
{
"SQL": "select /*+ MERGE_JOIN(t1, t2) */ t1.b, t2.b from t1 inner join t2 on t1.a = t2.a;",
"Plan": [
"HashJoin_9 12500.00 root inner join, equal:[eq(test.t1.a, test.t2.a)]",
"├─TableReader_12(Build) 10000.00 root data:TableFullScan_13",
"│ └─TableFullScan_13 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo",
"└─TableReader_14(Probe) 10000.00 root data:TableFullScan_15",
" └─TableFullScan_15 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo"
],
"Result": [
"1 1"
]
}
]
}
Expand Down
5 changes: 2 additions & 3 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ type copTask struct {
indexPlanFinished bool
// keepOrder indicates if the plan scans data by order.
keepOrder bool
// In double read case, it may output one more column for handle(row id).
// We need to prune it, so we add a project do this.
// doubleReadNeedProj means an extra prune is needed because
// in double read case, it may output one more column for handle(row id).
doubleReadNeedProj bool

extraHandleCol *expression.Column
Expand Down Expand Up @@ -574,7 +574,6 @@ func (p *PhysicalMergeJoin) attach2Task(tasks ...task) task {
lTask := finishCopTask(p.ctx, tasks[0].copy())
rTask := finishCopTask(p.ctx, tasks[1].copy())
p.SetChildren(lTask.plan(), rTask.plan())
p.schema = BuildPhysicalJoinSchema(p.JoinType, p)
SunRunAway marked this conversation as resolved.
Show resolved Hide resolved
return &rootTask{
p: p,
cst: lTask.cost() + rTask.cost() + p.GetCost(lTask.count(), rTask.count()),
Expand Down
3 changes: 2 additions & 1 deletion planner/core/testdata/plan_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,8 @@
"select 1 from (select /*+ HASH_JOIN(t1) */ t1.a not in (select t2.a from t2) from t1) x;",
"select /*+ INL_JOIN(t1) */ t1.b, t2.b from t1 inner join t2 on t1.a = t2.a;",
"select /*+ INL_HASH_JOIN(t1) */ t1.b, t2.b from t1 inner join t2 on t1.a = t2.a;",
"select /*+ INL_MERGE_JOIN(t1) */ t1.b, t2.b from t1 inner join t2 on t1.a = t2.a;"
"select /*+ INL_MERGE_JOIN(t1) */ t1.b, t2.b from t1 inner join t2 on t1.a = t2.a;",
"select /*+ MERGE_JOIN(t1) */ t1.b, t2.b from t1 inner join t2 on t1.a = t2.a;"
]
},
{
Expand Down
Loading