Skip to content

Commit

Permalink
executor, plan: fix a bug about index join.
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros committed Jan 11, 2018
1 parent 75a99b9 commit 41df377
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 41 deletions.
14 changes: 10 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1127,14 +1127,20 @@ func (b *executorBuilder) buildIndexJoin(v *plan.PhysicalIndexJoin) Executor {
if !v.KeepOrder {
batchSize = b.ctx.GetSessionVars().IndexJoinBatchSize
}
outerConditions := v.LeftConditions
innerConditions := v.RightConditions
if v.OuterIndex == 1 {
outerConditions, innerConditions = innerConditions, outerConditions
}
return &IndexLookUpJoin{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, b.build(v.Children()[0])),
innerExec: b.build(v.Children()[1]).(DataReader),
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, b.build(v.Children()[v.OuterIndex])),
innerExec: b.build(v.Children()[1-v.OuterIndex]).(DataReader),
leftIsOuter: v.OuterIndex == 0,
outerJoinKeys: v.OuterJoinKeys,
innerJoinKeys: v.InnerJoinKeys,
outer: v.Outer,
leftConditions: v.LeftConditions,
rightConditions: v.RightConditions,
outerConditions: outerConditions,
innerConditions: innerConditions,
otherConditions: v.OtherConditions,
defaultValues: v.DefaultValues,
batchSize: batchSize,
Expand Down
22 changes: 16 additions & 6 deletions executor/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ type IndexLookUpJoin struct {
innerDatums orderedRows // innerDatums are extracted by innerRows and innerJoinKeys
exhausted bool // exhausted means whether all data has been extracted

leftIsOuter bool
outerJoinKeys []*expression.Column
innerJoinKeys []*expression.Column
leftConditions expression.CNFExprs
rightConditions expression.CNFExprs
outerConditions expression.CNFExprs
innerConditions expression.CNFExprs
otherConditions expression.CNFExprs
defaultValues []types.Datum
outer bool
Expand Down Expand Up @@ -111,7 +112,7 @@ func (e *IndexLookUpJoin) Next() (Row, error) {
e.exhausted = true
break
}
match, err := expression.EvalBool(e.leftConditions, outerRow, e.ctx)
match, err := expression.EvalBool(e.outerConditions, outerRow, e.ctx)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -151,7 +152,11 @@ func (e *IndexLookUpJoin) Next() (Row, error) {
}

func (e *IndexLookUpJoin) fillDefaultValues(row Row) Row {
row = append(row, e.defaultValues...)
if e.leftIsOuter {
row = append(row, e.defaultValues...)
} else {
row = append(e.defaultValues, row...)
}
return row
}

Expand Down Expand Up @@ -182,7 +187,7 @@ func (e *IndexLookUpJoin) doJoin() error {
if innerRow == nil {
break
}
match, err1 := expression.EvalBool(e.rightConditions, innerRow, e.ctx)
match, err1 := expression.EvalBool(e.innerConditions, innerRow, e.ctx)
if err1 != nil {
return errors.Trace(err1)
}
Expand Down Expand Up @@ -237,7 +242,12 @@ func (e *IndexLookUpJoin) doMergeJoin() error {
outerRow := e.outerRows[i].row
for j := innerBeginCursor; j < innerEndCursor; j++ {
innerRow := e.innerRows[j].row
joinedRow := makeJoinRow(outerRow, innerRow)
var joinedRow Row
if e.leftIsOuter {
joinedRow = makeJoinRow(outerRow, innerRow)
} else {
joinedRow = makeJoinRow(innerRow, outerRow)
}
match, err := expression.EvalBool(e.otherConditions, joinedRow, e.ctx)
if err != nil {
return errors.Trace(err)
Expand Down
11 changes: 11 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,3 +756,14 @@ func (s *testSuite) TestHashJoinExecEncodeDecodeRow(c *C) {
result := tk.MustQuery("select ts from t1 inner join t2 where t2.name = 'xxx'")
result.Check(testkit.Rows("2003-06-09 10:51:26"))
}

func (s *testSuite) TestIssue5255(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1(a int, b date, c float, primary key(a, b))")
tk.MustExec("create table t2(a int primary key)")
tk.MustExec("insert into t1 values(1, '2017-11-29', 2.2)")
tk.MustExec("insert into t2 values(1)")
tk.MustQuery("select /*+ TIDB_INLJ(t2) */ * from t1 join t2 on t1.a=t2.a").Check(testkit.Rows("1 2017-11-29 2.2 1"))
}
2 changes: 1 addition & 1 deletion plan/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (p *PhysicalApply) ExplainInfo() string {
// ExplainInfo implements PhysicalPlan interface.
func (p *PhysicalIndexJoin) ExplainInfo() string {
buffer := bytes.NewBufferString(fmt.Sprintf("outer:%s",
p.Children()[p.outerIndex].ExplainID()))
p.Children()[p.OuterIndex].ExplainID()))
if len(p.OuterJoinKeys) > 0 {
buffer.WriteString(fmt.Sprintf(", outer key:%s",
expression.ExplainColumnList(p.OuterJoinKeys)))
Expand Down
14 changes: 7 additions & 7 deletions plan/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,13 @@ func (s *testExplainSuite) TestExplain(c *C) {
{
"select count(b.c2) from t1 a, t2 b where a.c1 = b.c2 group by a.c1",
[]string{
"TableScan_21 cop table:a, range:(-inf,+inf), keep order:false 8000",
"TableReader_22 HashLeftJoin_13 root data:TableScan_21 8000",
"TableScan_16 HashAgg_15 cop table:b, range:(-inf,+inf), keep order:false 8000",
"HashAgg_15 TableScan_16 cop type:complete, group by:b.c2, funcs:count(b.c2), firstrow(b.c2) 6400",
"TableReader_18 HashAgg_17 root data:HashAgg_15 6400",
"HashAgg_17 HashLeftJoin_13 TableReader_18 root type:final, group by:, funcs:count(col_0), firstrow(col_1) 6400",
"HashLeftJoin_13 Projection_9 TableReader_22,HashAgg_17 root inner join, small:HashAgg_17, equal:[eq(a.c1, b.c2)] 8000",
"TableScan_15 cop table:a, range:(-inf,+inf), keep order:false 8000",
"TableReader_16 HashLeftJoin_13 root data:TableScan_15 8000",
"TableScan_18 HashAgg_17 cop table:b, range:(-inf,+inf), keep order:false 8000",
"HashAgg_17 TableScan_18 cop type:complete, group by:b.c2, funcs:count(b.c2), firstrow(b.c2) 6400",
"TableReader_20 HashAgg_19 root data:HashAgg_17 6400",
"HashAgg_19 HashLeftJoin_13 TableReader_20 root type:final, group by:, funcs:count(col_0), firstrow(col_1) 6400",
"HashLeftJoin_13 Projection_9 TableReader_16,HashAgg_19 root inner join, small:HashAgg_19, equal:[eq(a.c1, b.c2)] 8000",
"Projection_9 HashLeftJoin_13 root cast(join_agg_0) 8000",
},
},
Expand Down
2 changes: 0 additions & 2 deletions plan/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package plan

import (
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/expression"
)

const (
Expand Down Expand Up @@ -315,7 +314,6 @@ func (p PhysicalIndexJoin) init(allocator *idAllocator, ctx context.Context, chi
p.basePlan = newBasePlan(TypeIndexJoin, allocator, ctx, &p)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
p.children = children
p.schema = expression.MergeSchema(p.children[0].Schema(), p.children[1].Schema())
return &p
}

Expand Down
23 changes: 8 additions & 15 deletions plan/new_physical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,34 +107,26 @@ func joinKeysMatchIndex(keys []*expression.Column, index *model.IndexInfo) []int
}

func (p *LogicalJoin) constructIndexJoin(innerJoinKeys, outerJoinKeys []*expression.Column, outerIdx int, innerPlan PhysicalPlan) []PhysicalPlan {
var rightConds, leftConds expression.CNFExprs
if outerIdx == 0 {
rightConds = p.RightConditions.Clone()
leftConds = p.LeftConditions.Clone()
} else {
rightConds = p.LeftConditions.Clone()
leftConds = p.RightConditions.Clone()
}
join := PhysicalIndexJoin{
outerIndex: outerIdx,
LeftConditions: leftConds,
RightConditions: rightConds,
OuterIndex: outerIdx,
LeftConditions: p.LeftConditions,
RightConditions: p.RightConditions,
OtherConditions: p.OtherConditions,
Outer: p.JoinType != InnerJoin,
OuterJoinKeys: outerJoinKeys,
InnerJoinKeys: innerJoinKeys,
DefaultValues: p.DefaultValues,
outerSchema: p.children[outerIdx].Schema(),
innerPlan: innerPlan,
}.init(p.allocator, p.ctx, p.children[outerIdx], p.children[1-outerIdx])
join.SetSchema(expression.MergeSchema(p.children[outerIdx].Schema(), p.children[1-outerIdx].Schema()))
}.init(p.allocator, p.ctx, p.children...)
join.SetSchema(p.schema)
join.profile = p.profile
orderJoin := join.Copy().(*PhysicalIndexJoin)
orderJoin.KeepOrder = true
return []PhysicalPlan{join, orderJoin}
}

// getIndexJoinByOuterIdx will generate index join by outerIndex. OuterIdx points out the outer child,
// getIndexJoinByOuterIdx will generate index join by OuterIndex. OuterIdx points out the outer child,
// because we will swap the children of join when the right child is outer child.
// First of all, we will extract the join keys for p's equal conditions. If the join keys can match some of the indices or PK
// column of inner child, we can apply the index join.
Expand Down Expand Up @@ -202,7 +194,8 @@ func (p *PhysicalIndexJoin) getChildrenPossibleProps(prop *requiredProp) [][]*re
}
}
requiredProps1 := make([]*requiredProp, 2)
requiredProps1[p.outerIndex] = &requiredProp{taskTp: rootTaskType, expectedCnt: prop.expectedCnt, cols: prop.cols, desc: prop.desc}
requiredProps1[p.OuterIndex] = &requiredProp{taskTp: rootTaskType, expectedCnt: prop.expectedCnt, cols: prop.cols, desc: prop.desc}
requiredProps1[1-p.OuterIndex] = &requiredProp{taskTp: rootTaskType, expectedCnt: math.MaxFloat64}
return [][]*requiredProp{requiredProps1}
}

Expand Down
2 changes: 1 addition & 1 deletion plan/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ type PhysicalIndexJoin struct {
LeftConditions expression.CNFExprs
RightConditions expression.CNFExprs
OtherConditions expression.CNFExprs
outerIndex int
OuterIndex int
KeepOrder bool
outerSchema *expression.Schema
innerPlan PhysicalPlan
Expand Down
4 changes: 2 additions & 2 deletions plan/resolve_indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func (p *PhysicalIndexJoin) ResolveIndices() {
lSchema := p.children[0].Schema()
rSchema := p.children[1].Schema()
for i := range p.InnerJoinKeys {
p.OuterJoinKeys[i].ResolveIndices(lSchema)
p.InnerJoinKeys[i].ResolveIndices(rSchema)
p.OuterJoinKeys[i].ResolveIndices(p.children[p.OuterIndex].Schema())
p.InnerJoinKeys[i].ResolveIndices(p.children[1-p.OuterIndex].Schema())
}
for _, expr := range p.LeftConditions {
expr.ResolveIndices(lSchema)
Expand Down
10 changes: 7 additions & 3 deletions plan/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,16 @@ func (p *PhysicalApply) attach2Task(tasks ...task) task {
}

func (p *PhysicalIndexJoin) attach2Task(tasks ...task) task {
lTask := finishCopTask(tasks[p.outerIndex].copy(), p.ctx, p.allocator)
outerTask := finishCopTask(tasks[p.OuterIndex].copy(), p.ctx, p.allocator)
np := p.Copy()
np.SetChildren(lTask.plan(), p.innerPlan)
if p.OuterIndex == 0 {
np.SetChildren(outerTask.plan(), p.innerPlan)
} else {
np.SetChildren(p.innerPlan, outerTask.plan())
}
return &rootTask{
p: np,
cst: lTask.cost() + p.getCost(lTask.count()),
cst: outerTask.cost() + p.getCost(outerTask.count()),
}
}

Expand Down

1 comment on commit 41df377

@XuHuaiyu
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Please sign in to comment.