Skip to content

Commit

Permalink
address the comment
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <[email protected]>
  • Loading branch information
Yisaer committed Dec 8, 2021
1 parent c432307 commit 85e1c61
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 39 deletions.
2 changes: 1 addition & 1 deletion planner/core/logical_plan_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (s *testPlanSuite) TestSingleRuleTraceStep(c *C) {
assertRuleSteps: []assertTraceStep{
{
assertAction: "join order become (((t1*t2)*(t3*t4))*t5) from origin ((((t1*t2)*t3)*t4)*t5)",
assertReason: "new join order is better than origin join order",
assertReason: "join cost during reorder[[t5, cost:10000],[t1, cost:10000],[t2, cost:10000],[t3, cost:10000],[t4, cost:10000]]",
},
},
},
Expand Down
85 changes: 57 additions & 28 deletions planner/core/rule_join_reorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,21 @@ type jrNode struct {
}

func (s *joinReOrderSolver) optimize(ctx context.Context, p LogicalPlan, opt *logicalOptimizeOp) (LogicalPlan, error) {
origin := opt.traceJoinReorder(p)
p, err := s.optimizeRecursive(p.SCtx(), p)
now := opt.traceJoinReorder(p)
appendJoinReorderTraceStep(origin, now, opt)
tracer := &joinReorderTrace{cost: map[string]float64{}, opt: opt}
tracer.traceJoinReorder(p)
p, err := s.optimizeRecursive(p.SCtx(), p, tracer)
tracer.traceJoinReorder(p)
appendJoinReorderTraceStep(tracer, p, opt)
return p, err
}

// optimizeRecursive recursively collects join groups and applies join reorder algorithm for each group.
func (s *joinReOrderSolver) optimizeRecursive(ctx sessionctx.Context, p LogicalPlan) (LogicalPlan, error) {
func (s *joinReOrderSolver) optimizeRecursive(ctx sessionctx.Context, p LogicalPlan, tracer *joinReorderTrace) (LogicalPlan, error) {
var err error
curJoinGroup, eqEdges, otherConds := extractJoinGroup(p)
if len(curJoinGroup) > 1 {
for i := range curJoinGroup {
curJoinGroup[i], err = s.optimizeRecursive(ctx, curJoinGroup[i])
curJoinGroup[i], err = s.optimizeRecursive(ctx, curJoinGroup[i], tracer)
if err != nil {
return nil, err
}
Expand All @@ -88,13 +89,13 @@ func (s *joinReOrderSolver) optimizeRecursive(ctx sessionctx.Context, p LogicalP
baseSingleGroupJoinOrderSolver: baseGroupSolver,
eqEdges: eqEdges,
}
p, err = groupSolver.solve(curJoinGroup)
p, err = groupSolver.solve(curJoinGroup, tracer)
} else {
dpSolver := &joinReorderDPSolver{
baseSingleGroupJoinOrderSolver: baseGroupSolver,
}
dpSolver.newJoin = dpSolver.newJoinWithEdges
p, err = dpSolver.solve(curJoinGroup, expression.ScalarFuncs2Exprs(eqEdges))
p, err = dpSolver.solve(curJoinGroup, expression.ScalarFuncs2Exprs(eqEdges), tracer)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -122,7 +123,7 @@ func (s *joinReOrderSolver) optimizeRecursive(ctx sessionctx.Context, p LogicalP
}
newChildren := make([]LogicalPlan, 0, len(p.Children()))
for _, child := range p.Children() {
newChild, err := s.optimizeRecursive(ctx, child)
newChild, err := s.optimizeRecursive(ctx, child, tracer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -203,24 +204,27 @@ func (*joinReOrderSolver) name() string {
return "join_reorder"
}

func appendJoinReorderTraceStep(origin, now *tracing.LogicalPlanTrace, opt *logicalOptimizeOp) {
if origin == nil || now == nil {
func appendJoinReorderTraceStep(tracer *joinReorderTrace, plan LogicalPlan, opt *logicalOptimizeOp) {
if len(tracer.initial) < 1 || len(tracer.final) < 1 {
return
}
foldOriginOrder := joinOrderToString(origin)
foldNowOrder := joinOrderToString(now)
if foldOriginOrder != foldNowOrder {
action := fmt.Sprintf("join order become %v from origin %v", foldNowOrder, foldOriginOrder)
reason := "new join order is better than origin join order"
opt.appendStepToCurrent(now.ID, now.TP, reason, action)
}
}

func (op *logicalOptimizeOp) traceJoinReorder(p LogicalPlan) *tracing.LogicalPlanTrace {
if op.tracer == nil {
return nil
if tracer.final != tracer.initial {
action := fmt.Sprintf("join order become %v from origin %v", tracer.final, tracer.initial)
reason := func() string {
buffer := bytes.NewBufferString("join cost during reorder[")
i := 0
for join, cost := range tracer.cost {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(fmt.Sprintf("[%s, cost:%v]", join, cost))
i++
}
buffer.WriteString("]")
return buffer.String()
}()
opt.appendStepToCurrent(plan.ID(), plan.TP(), reason, action)
}
return extractJoinAndDataSource(p.buildLogicalPlanTrace(p))
}

// joinOrderToString let Join(DataSource, DataSource) become '(t1*t2)'
Expand All @@ -244,7 +248,7 @@ func joinOrderToString(t *tracing.LogicalPlanTrace) string {
// extractJoinAndDataSource will only keep join and dataSource operator and remove other operators.
// For example: Proj->Join->(Proj->DataSource, DataSource) will become Join->(DataSource, DataSource)
func extractJoinAndDataSource(t *tracing.LogicalPlanTrace) *tracing.LogicalPlanTrace {
root := findRootJoin(t)
root := findRoot(t)
if root == nil {
return nil
}
Expand Down Expand Up @@ -275,12 +279,37 @@ func simplify(node *tracing.LogicalPlanTrace) {
}
}

func findRootJoin(t *tracing.LogicalPlanTrace) *tracing.LogicalPlanTrace {
if t.TP == plancodec.TypeJoin {
func findRoot(t *tracing.LogicalPlanTrace) *tracing.LogicalPlanTrace {
if t.TP == plancodec.TypeJoin || t.TP == plancodec.TypeDataSource {
return t
}
if len(t.Children) < 1 {
return nil
}
return findRootJoin(t.Children[0])
return findRoot(t.Children[0])
}

type joinReorderTrace struct {
opt *logicalOptimizeOp
initial string
final string
cost map[string]float64
}

func (t *joinReorderTrace) traceJoinReorder(p LogicalPlan) {
if t == nil || t.opt == nil || t.opt.tracer == nil {
return
}
if len(t.initial) > 0 {
t.final = joinOrderToString(extractJoinAndDataSource(p.buildLogicalPlanTrace(p)))
return
}
t.initial = joinOrderToString(extractJoinAndDataSource(p.buildLogicalPlanTrace(p)))
}

func (t *joinReorderTrace) appendLogicalJoinCost(join LogicalPlan, cost float64) {
if t == nil || t.opt == nil || t.opt.tracer == nil {
return
}
t.cost[joinOrderToString(extractJoinAndDataSource(join.buildLogicalPlanTrace(join)))] = cost
}
12 changes: 8 additions & 4 deletions planner/core/rule_join_reorder_dp.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@ type joinGroupNonEqEdge struct {
expr expression.Expression
}

func (s *joinReorderDPSolver) solve(joinGroup []LogicalPlan, eqConds []expression.Expression) (LogicalPlan, error) {
func (s *joinReorderDPSolver) solve(joinGroup []LogicalPlan, eqConds []expression.Expression, tracer *joinReorderTrace) (LogicalPlan, error) {
for _, node := range joinGroup {
_, err := node.recursiveDeriveStats(nil)
if err != nil {
return nil, err
}
cost := s.baseNodeCumCost(node)
s.curJoinGroup = append(s.curJoinGroup, &jrNode{
p: node,
cumCost: s.baseNodeCumCost(node),
cumCost: cost,
})
tracer.appendLogicalJoinCost(node, cost)
}
adjacents := make([][]int, len(s.curJoinGroup))
totalEqEdges := make([]joinGroupEqEdge, 0, len(eqConds))
Expand Down Expand Up @@ -120,7 +122,7 @@ func (s *joinReorderDPSolver) solve(joinGroup []LogicalPlan, eqConds []expressio
totalNonEqEdges = append(totalNonEqEdges[:i], totalNonEqEdges[i+1:]...)
}
// Do DP on each sub graph.
join, err := s.dpGraph(visitID2NodeID, nodeID2VisitID, joinGroup, totalEqEdges, subNonEqEdges)
join, err := s.dpGraph(visitID2NodeID, nodeID2VisitID, joinGroup, totalEqEdges, subNonEqEdges, tracer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -159,7 +161,7 @@ func (s *joinReorderDPSolver) bfsGraph(startNode int, visited []bool, adjacents
// It implements the traditional join reorder algorithm: DP by subset using the following formula:
// bestPlan[S:set of node] = the best one among Join(bestPlan[S1:subset of S], bestPlan[S2: S/S1])
func (s *joinReorderDPSolver) dpGraph(visitID2NodeID, nodeID2VisitID []int, joinGroup []LogicalPlan,
totalEqEdges []joinGroupEqEdge, totalNonEqEdges []joinGroupNonEqEdge) (LogicalPlan, error) {
totalEqEdges []joinGroupEqEdge, totalNonEqEdges []joinGroupNonEqEdge, tracer *joinReorderTrace) (LogicalPlan, error) {
nodeCnt := uint(len(visitID2NodeID))
bestPlan := make([]*jrNode, 1<<nodeCnt)
// bestPlan[s] is nil can be treated as bestCost[s] = +inf.
Expand Down Expand Up @@ -197,9 +199,11 @@ func (s *joinReorderDPSolver) dpGraph(visitID2NodeID, nodeID2VisitID []int, join
p: join,
cumCost: curCost,
}
tracer.appendLogicalJoinCost(join, curCost)
} else if bestPlan[nodeBitmap].cumCost > curCost {
bestPlan[nodeBitmap].p = join
bestPlan[nodeBitmap].cumCost = curCost
tracer.appendLogicalJoinCost(join, curCost)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/rule_join_reorder_dp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (s *testJoinReorderDPSuite) TestDPReorderTPCHQ5(c *C) {
},
newJoin: s.newMockJoin,
}
result, err := solver.solve(joinGroups, eqConds)
result, err := solver.solve(joinGroups, eqConds, nil)
c.Assert(err, IsNil)
c.Assert(s.planToString(result), Equals, "MockJoin{supplier, MockJoin{lineitem, MockJoin{orders, MockJoin{customer, MockJoin{nation, region}}}}}")
}
Expand All @@ -212,7 +212,7 @@ func (s *testJoinReorderDPSuite) TestDPReorderAllCartesian(c *C) {
},
newJoin: s.newMockJoin,
}
result, err := solver.solve(joinGroup, nil)
result, err := solver.solve(joinGroup, nil, nil)
c.Assert(err, IsNil)
c.Assert(s.planToString(result), Equals, "MockJoin{MockJoin{a, b}, MockJoin{c, d}}")
}
11 changes: 7 additions & 4 deletions planner/core/rule_join_reorder_greedy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,26 @@ type joinReorderGreedySolver struct {
//
// For the nodes and join trees which don't have a join equal condition to
// connect them, we make a bushy join tree to do the cartesian joins finally.
func (s *joinReorderGreedySolver) solve(joinNodePlans []LogicalPlan) (LogicalPlan, error) {
func (s *joinReorderGreedySolver) solve(joinNodePlans []LogicalPlan, tracer *joinReorderTrace) (LogicalPlan, error) {
for _, node := range joinNodePlans {
_, err := node.recursiveDeriveStats(nil)
if err != nil {
return nil, err
}
cost := s.baseNodeCumCost(node)
s.curJoinGroup = append(s.curJoinGroup, &jrNode{
p: node,
cumCost: s.baseNodeCumCost(node),
cumCost: cost,
})
tracer.appendLogicalJoinCost(node, cost)
}
sort.SliceStable(s.curJoinGroup, func(i, j int) bool {
return s.curJoinGroup[i].cumCost < s.curJoinGroup[j].cumCost
})

var cartesianGroup []LogicalPlan
for len(s.curJoinGroup) > 0 {
newNode, err := s.constructConnectedJoinTree()
newNode, err := s.constructConnectedJoinTree(tracer)
if err != nil {
return nil, err
}
Expand All @@ -68,7 +70,7 @@ func (s *joinReorderGreedySolver) solve(joinNodePlans []LogicalPlan) (LogicalPla
return s.makeBushyJoin(cartesianGroup), nil
}

func (s *joinReorderGreedySolver) constructConnectedJoinTree() (*jrNode, error) {
func (s *joinReorderGreedySolver) constructConnectedJoinTree(tracer *joinReorderTrace) (*jrNode, error) {
curJoinTree := s.curJoinGroup[0]
s.curJoinGroup = s.curJoinGroup[1:]
for {
Expand Down Expand Up @@ -101,6 +103,7 @@ func (s *joinReorderGreedySolver) constructConnectedJoinTree() (*jrNode, error)
p: bestJoin,
cumCost: bestCost,
}
tracer.appendLogicalJoinCost(bestJoin, bestCost)
s.curJoinGroup = append(s.curJoinGroup[:bestIdx], s.curJoinGroup[bestIdx+1:]...)
s.otherConds = finalRemainOthers
}
Expand Down

0 comments on commit 85e1c61

Please sign in to comment.