Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: move base logical plan to logical operator pkg. #53293

Merged
merged 22 commits into from
May 27, 2024
2 changes: 2 additions & 0 deletions pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ go_library(
"//pkg/planner/core/cost",
"//pkg/planner/core/metrics",
"//pkg/planner/core/operator/baseimpl",
"//pkg/planner/core/operator/logicalop",
"//pkg/planner/funcdep",
"//pkg/planner/property",
"//pkg/planner/util",
Expand Down Expand Up @@ -260,6 +261,7 @@ go_test(
"//pkg/parser/terror",
"//pkg/planner",
"//pkg/planner/core/base",
"//pkg/planner/core/operator/logicalop",
"//pkg/planner/property",
"//pkg/planner/util",
"//pkg/planner/util/coretestsdk",
Expand Down
1 change: 1 addition & 0 deletions pkg/planner/core/base/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "base",
srcs = [
"doc.go",
"misc_base.go",
"plan_base.go",
"task_base.go",
Expand Down
33 changes: 33 additions & 0 deletions pkg/planner/core/base/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package base

// read me if you want change the content of base interface definition.
// several things you should think twice before you add new method in.
//
// 1: interface should be simplified for abstract logic for most implementors
// if your method is only inherited by few follower, do not use interface.
//
// 2: interface method declared here, meaning additional implementation logic
// should be added in where the inheritors are declared. (pointer receiver
// func can only be declared in where the inheritor defined: same pkg)
//
// 3: interface definition should cover the abstract logic, do not depend on
// concrete implementor type, or relay on other core pkg handling logic.
// otherwise, importing cycle occurs, think about abstraction again.
//
// 4: if additional interface method is decided to added, pls append it to
// function list with order, the later implementors reference can also be
// easy to locate.
3 changes: 3 additions & 0 deletions pkg/planner/core/base/misc_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"github.com/pingcap/tipb/go-tipb"
)

// Note: appending the new adding method to the last, for the convenience of easy
// locating in other implementor from other package.

// AccessObject represents what is accessed by an operator.
// It corresponds to the "access object" column in an EXPLAIN statement result.
type AccessObject interface {
Expand Down
14 changes: 10 additions & 4 deletions pkg/planner/core/base/plan_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type PlanContext = context.PlanContext
// BuildPBContext is the context for building `*tipb.Executor`.
type BuildPBContext = context.BuildPBContext

// Note: appending the new adding method to the last, for the convenience of easy
// locating in other implementor from other package.

// Plan is the description of an execution flow.
// It is created from ast.Node first, then optimized by the optimizer,
// finally used by the executor to create a Cursor which executes the statement.
Expand Down Expand Up @@ -217,9 +220,6 @@ type LogicalPlan interface {
// interface definition should depend on concrete implementation type.
PushDownTopN(topN LogicalPlan, opt *optimizetrace.LogicalOptimizeOp) LogicalPlan

// ConvertOuterToInnerJoin converts outer joins if the unmatching rows are filtered.
ConvertOuterToInnerJoin(predicates []expression.Expression) LogicalPlan

// DeriveTopN derives an implicit TopN from a filter on row_number window function...
DeriveTopN(opt *optimizetrace.LogicalOptimizeOp) LogicalPlan

Expand Down Expand Up @@ -263,7 +263,7 @@ type LogicalPlan interface {
// MaxOneRow means whether this operator only returns max one row.
MaxOneRow() bool

// Get all the children.
// Children Get all the children.
Children() []LogicalPlan

// SetChildren sets the children for the plan.
Expand All @@ -280,4 +280,10 @@ type LogicalPlan interface {

// ExtractFD derive the FDSet from the tree bottom up.
ExtractFD() *fd.FDSet

// GetBaseLogicalPlan return the baseLogicalPlan inside each logical plan.
GetBaseLogicalPlan() LogicalPlan

// ConvertOuterToInnerJoin converts outer joins if the matching rows are filtered.
ConvertOuterToInnerJoin(predicates []expression.Expression) LogicalPlan
}
3 changes: 3 additions & 0 deletions pkg/planner/core/base/task_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

package base

// Note: appending the new adding method to the last, for the convenience of easy
// locating in other implementor from other package.

// Task is a new version of `PhysicalPlanInfo`. It stores cost information for a task.
// A task may be CopTask, RootTask, MPPTaskMeta or a ParallelTask.
type Task interface {
Expand Down
3 changes: 3 additions & 0 deletions pkg/planner/core/core_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import (
func init() {
// For code refactor init.
utilfuncp.AddSelection = addSelection
utilfuncp.FindBestTask = findBestTask
utilfuncp.HasMaxOneRowUtil = HasMaxOneRow
utilfuncp.GetTaskPlanCost = getTaskPlanCost
utilfuncp.CanPushToCopImpl = canPushToCopImpl
utilfuncp.AppendCandidate4PhysicalOptimizeOp = appendCandidate4PhysicalOptimizeOp
utilfuncp.PushDownTopNForBaseLogicalPlan = pushDownTopNForBaseLogicalPlan

// For mv index init.
cardinality.GetTblInfoForUsedStatsByPhysicalID = getTblInfoForUsedStatsByPhysicalID
Expand Down
84 changes: 37 additions & 47 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/planner/cardinality"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/cost"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/planner/property"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/planner/util/fixcontrol"
Expand Down Expand Up @@ -455,7 +456,7 @@ func (p *LogicalJoin) getHashJoin(prop *property.PhysicalProperty, innerIdx int,
chReqProps[1-innerIdx] = &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64, CTEProducerStatus: prop.CTEProducerStatus}
if prop.ExpectedCnt < p.StatsInfo().RowCount {
expCntScale := prop.ExpectedCnt / p.StatsInfo().RowCount
chReqProps[1-innerIdx].ExpectedCnt = p.children[1-innerIdx].StatsInfo().RowCount * expCntScale
chReqProps[1-innerIdx].ExpectedCnt = p.Children()[1-innerIdx].StatsInfo().RowCount * expCntScale
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
}
hashJoin := NewPhysicalHashJoin(p, innerIdx, useOuterToBuild, p.StatsInfo().ScaleByExpectCnt(prop.ExpectedCnt), chReqProps...)
hashJoin.SetSchema(p.schema)
Expand Down Expand Up @@ -498,7 +499,7 @@ func (p *LogicalJoin) constructIndexJoin(
chReqProps[outerIdx] = &property.PhysicalProperty{TaskTp: property.RootTaskType, ExpectedCnt: math.MaxFloat64, SortItems: prop.SortItems, CTEProducerStatus: prop.CTEProducerStatus}
if prop.ExpectedCnt < p.StatsInfo().RowCount {
expCntScale := prop.ExpectedCnt / p.StatsInfo().RowCount
chReqProps[outerIdx].ExpectedCnt = p.children[outerIdx].StatsInfo().RowCount * expCntScale
chReqProps[outerIdx].ExpectedCnt = p.Children()[outerIdx].StatsInfo().RowCount * expCntScale
}
newInnerKeys := make([]*expression.Column, 0, len(innerJoinKeys))
newOuterKeys := make([]*expression.Column, 0, len(outerJoinKeys))
Expand Down Expand Up @@ -714,7 +715,7 @@ func (p *LogicalJoin) constructIndexHashJoin(
// Then, we will extract the join keys of p's equal conditions. Then check whether all of them are just the primary key
// or match some part of on index. If so we will choose the best one and construct a index join.
func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, outerIdx int) (joins []base.PhysicalPlan) {
outerChild, innerChild := p.children[outerIdx], p.children[1-outerIdx]
outerChild, innerChild := p.Children()[outerIdx], p.Children()[1-outerIdx]
all, _ := prop.AllSameOrder()
// If the order by columns are not all from outer child, index join cannot promise the order.
if !prop.AllColsFromSchema(outerChild.Schema()) || !all {
Expand Down Expand Up @@ -2318,17 +2319,17 @@ func calcHashExchangeSizeByChild(p1 base.Plan, p2 base.Plan, mppStoreCnt int) (r
// Set a scale factor (`mppStoreCnt^*`) when estimating broadcast join in `isJoinFitMPPBCJ` and `isJoinChildFitMPPBCJ` (based on TPCH benchmark, it has been verified in Q9).

func isJoinFitMPPBCJ(p *LogicalJoin, mppStoreCnt int) bool {
rowBC, szBC, hasSizeBC := calcBroadcastExchangeSizeByChild(p.children[0], p.children[1], mppStoreCnt)
rowHash, szHash, hasSizeHash := calcHashExchangeSizeByChild(p.children[0], p.children[1], mppStoreCnt)
rowBC, szBC, hasSizeBC := calcBroadcastExchangeSizeByChild(p.Children()[0], p.Children()[1], mppStoreCnt)
rowHash, szHash, hasSizeHash := calcHashExchangeSizeByChild(p.Children()[0], p.Children()[1], mppStoreCnt)
if hasSizeBC && hasSizeHash {
return szBC*float64(mppStoreCnt) <= szHash
}
return rowBC*float64(mppStoreCnt) <= rowHash
}

func isJoinChildFitMPPBCJ(p *LogicalJoin, childIndexToBC int, mppStoreCnt int) bool {
rowBC, szBC, hasSizeBC := calcBroadcastExchangeSize(p.children[childIndexToBC], mppStoreCnt)
rowHash, szHash, hasSizeHash := calcHashExchangeSizeByChild(p.children[0], p.children[1], mppStoreCnt)
rowBC, szBC, hasSizeBC := calcBroadcastExchangeSize(p.Children()[childIndexToBC], mppStoreCnt)
rowHash, szHash, hasSizeHash := calcHashExchangeSizeByChild(p.Children()[0], p.Children()[1], mppStoreCnt)

if hasSizeBC && hasSizeHash {
return szBC*float64(mppStoreCnt) <= szHash
Expand Down Expand Up @@ -2368,11 +2369,11 @@ func (p *LogicalJoin) preferMppBCJ() bool {
}

if onlyCheckChild1 {
return checkChildFitBC(p.children[1])
return checkChildFitBC(p.Children()[1])
} else if onlyCheckChild0 {
return checkChildFitBC(p.children[0])
return checkChildFitBC(p.Children()[0])
}
return checkChildFitBC(p.children[0]) || checkChildFitBC(p.children[1])
return checkChildFitBC(p.Children()[0]) || checkChildFitBC(p.Children()[1])
}

// ExhaustPhysicalPlans implements LogicalPlan interface
Expand Down Expand Up @@ -2445,7 +2446,7 @@ func (p *LogicalJoin) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]b

if !p.isNAAJ() {
// naaj refuse merge join and index join.
mergeJoins := p.GetMergeJoin(prop, p.schema, p.StatsInfo(), p.children[0].StatsInfo(), p.children[1].StatsInfo())
mergeJoins := p.GetMergeJoin(prop, p.schema, p.StatsInfo(), p.Children()[0].StatsInfo(), p.Children()[1].StatsInfo())
if (p.preferJoinType&h.PreferMergeJoin) > 0 && len(mergeJoins) > 0 {
return mergeJoins, true, nil
}
Expand Down Expand Up @@ -2566,7 +2567,7 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
preferredBuildIndex := 0
fixedBuildSide := false // Used to indicate whether the build side for the MPP join is fixed or not.
if p.JoinType == InnerJoin {
if p.children[0].StatsInfo().Count() > p.children[1].StatsInfo().Count() {
if p.Children()[0].StatsInfo().Count() > p.Children()[1].StatsInfo().Count() {
preferredBuildIndex = 1
}
} else if p.JoinType.IsSemiJoin() {
Expand All @@ -2575,7 +2576,7 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
preferredBuildIndex = 1
// MPPOuterJoinFixedBuildSide default value is false
// use MPPOuterJoinFixedBuildSide here as a way to disable using left table as build side
if !p.SCtx().GetSessionVars().MPPOuterJoinFixedBuildSide && p.children[1].StatsInfo().Count() > p.children[0].StatsInfo().Count() {
if !p.SCtx().GetSessionVars().MPPOuterJoinFixedBuildSide && p.Children()[1].StatsInfo().Count() > p.Children()[0].StatsInfo().Count() {
preferredBuildIndex = 0
}
} else {
Expand All @@ -2597,7 +2598,7 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
if p.JoinType == LeftOuterJoin {
preferredBuildIndex = 1
}
} else if p.children[0].StatsInfo().Count() > p.children[1].StatsInfo().Count() {
} else if p.Children()[0].StatsInfo().Count() > p.Children()[1].StatsInfo().Count() {
preferredBuildIndex = 1
}
}
Expand Down Expand Up @@ -2630,7 +2631,7 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
expCnt := math.MaxFloat64
if prop.ExpectedCnt < p.StatsInfo().RowCount {
expCntScale := prop.ExpectedCnt / p.StatsInfo().RowCount
expCnt = p.children[1-preferredBuildIndex].StatsInfo().RowCount * expCntScale
expCnt = p.Children()[1-preferredBuildIndex].StatsInfo().RowCount * expCntScale
}
if prop.MPPPartitionTp == property.HashType {
lPartitionKeys, rPartitionKeys := p.GetPotentialPartitionKeys()
Expand Down Expand Up @@ -2887,7 +2888,7 @@ func (la *LogicalApply) GetHashJoin(prop *property.PhysicalProperty) *PhysicalHa

// ExhaustPhysicalPlans implements LogicalPlan interface.
func (la *LogicalApply) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
if !prop.AllColsFromSchema(la.children[0].Schema()) || prop.IsFlashProp() { // for convenient, we don't pass through any prop
if !prop.AllColsFromSchema(la.Children()[0].Schema()) || prop.IsFlashProp() { // for convenient, we don't pass through any prop
la.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced(
"MPP mode may be blocked because operator `Apply` is not supported now.")
return nil, true, nil
Expand All @@ -2896,7 +2897,7 @@ func (la *LogicalApply) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([
la.SCtx().GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackErrorf("Parallel Apply rejects the possible order properties of its outer child currently"))
return nil, true, nil
}
disableAggPushDownToCop(la.children[0])
disableAggPushDownToCop(la.Children()[0])
join := la.GetHashJoin(prop)
var columns = make([]*expression.Column, 0, len(la.CorCols))
for _, colColumn := range la.CorCols {
Expand Down Expand Up @@ -3097,21 +3098,10 @@ func (lw *LogicalWindow) ExhaustPhysicalPlans(prop *property.PhysicalProperty) (
return windows, true, nil
}

// ExhaustPhysicalPlans is only for implementing interface. DataSource and Dual generate task in `findBestTask` directly.
func (*baseLogicalPlan) ExhaustPhysicalPlans(*property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
panic("baseLogicalPlan.ExhaustPhysicalPlans() should never be called.")
}

// CanPushToCop checks if it can be pushed to some stores. For TiKV, it only checks datasource.
// For TiFlash, it will check whether the operator is supported, but note that the check might be inaccrate.
func (p *baseLogicalPlan) CanPushToCop(storeTp kv.StoreType) bool {
return canPushToCopImpl(p, storeTp, false)
}

// todo: move canPushToCopImpl to func_pointer_misc when move baseLogicalPlan out of core.
func canPushToCopImpl(p *baseLogicalPlan, storeTp kv.StoreType, considerDual bool) bool {
func canPushToCopImpl(lp base.LogicalPlan, storeTp kv.StoreType, considerDual bool) bool {
p := lp.GetBaseLogicalPlan().(*logicalop.BaseLogicalPlan)
ret := true
for _, ch := range p.children {
for _, ch := range p.Children() {
switch c := ch.(type) {
case *DataSource:
validDs := false
Expand All @@ -3126,8 +3116,8 @@ func canPushToCopImpl(p *baseLogicalPlan, storeTp kv.StoreType, considerDual boo
}
ret = ret && validDs

_, isTopN := p.self.(*LogicalTopN)
_, isLimit := p.self.(*LogicalLimit)
_, isTopN := p.Self().(*LogicalTopN)
_, isLimit := p.Self().(*LogicalLimit)
if (isTopN || isLimit) && indexMergeIsIntersection {
return false // TopN and Limit cannot be pushed down to the intersection type IndexMerge
}
Expand All @@ -3143,23 +3133,23 @@ func canPushToCopImpl(p *baseLogicalPlan, storeTp kv.StoreType, considerDual boo
if storeTp != kv.TiFlash {
return false
}
ret = ret && canPushToCopImpl(&c.baseLogicalPlan, storeTp, true)
ret = ret && canPushToCopImpl(&c.BaseLogicalPlan, storeTp, true)
case *LogicalSort:
if storeTp != kv.TiFlash {
return false
}
ret = ret && canPushToCopImpl(&c.baseLogicalPlan, storeTp, true)
ret = ret && canPushToCopImpl(&c.BaseLogicalPlan, storeTp, true)
case *LogicalProjection:
if storeTp != kv.TiFlash {
return false
}
ret = ret && canPushToCopImpl(&c.baseLogicalPlan, storeTp, considerDual)
ret = ret && canPushToCopImpl(&c.BaseLogicalPlan, storeTp, considerDual)
case *LogicalExpand:
// Expand itself only contains simple col ref and literal projection. (always ok, check its child)
if storeTp != kv.TiFlash {
return false
}
ret = ret && canPushToCopImpl(&c.baseLogicalPlan, storeTp, considerDual)
ret = ret && canPushToCopImpl(&c.BaseLogicalPlan, storeTp, considerDual)
case *LogicalTableDual:
return storeTp == kv.TiFlash && considerDual
case *LogicalAggregation, *LogicalSelection, *LogicalJoin, *LogicalWindow:
Expand Down Expand Up @@ -3191,7 +3181,7 @@ func canPushToCopImpl(p *baseLogicalPlan, storeTp kv.StoreType, considerDual boo

// CanPushToCop implements LogicalPlan interface.
func (la *LogicalAggregation) CanPushToCop(storeTp kv.StoreType) bool {
return la.baseLogicalPlan.CanPushToCop(storeTp) && !la.noCopPushDown
return la.BaseLogicalPlan.CanPushToCop(storeTp) && !la.noCopPushDown
}

func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalProperty) []base.PhysicalPlan {
Expand Down Expand Up @@ -3691,9 +3681,9 @@ func (p *LogicalUnionAll) ExhaustPhysicalPlans(prop *property.PhysicalProperty)
if prop.TaskTp == property.MppTaskType && prop.MPPPartitionTp != property.AnyType {
return nil, true, nil
}
canUseMpp := p.SCtx().GetSessionVars().IsMPPAllowed() && canPushToCopImpl(&p.baseLogicalPlan, kv.TiFlash, true)
chReqProps := make([]*property.PhysicalProperty, 0, len(p.children))
for range p.children {
canUseMpp := p.SCtx().GetSessionVars().IsMPPAllowed() && canPushToCopImpl(&p.BaseLogicalPlan, kv.TiFlash, true)
chReqProps := make([]*property.PhysicalProperty, 0, p.ChildLen())
for range p.Children() {
if canUseMpp && prop.TaskTp == property.MppTaskType {
chReqProps = append(chReqProps, &property.PhysicalProperty{
ExpectedCnt: prop.ExpectedCnt,
Expand All @@ -3710,8 +3700,8 @@ func (p *LogicalUnionAll) ExhaustPhysicalPlans(prop *property.PhysicalProperty)
}.Init(p.SCtx(), p.StatsInfo().ScaleByExpectCnt(prop.ExpectedCnt), p.QueryBlockOffset(), chReqProps...)
ua.SetSchema(p.Schema())
if canUseMpp && prop.TaskTp == property.RootTaskType {
chReqProps = make([]*property.PhysicalProperty, 0, len(p.children))
for range p.children {
chReqProps = make([]*property.PhysicalProperty, 0, p.ChildLen())
for range p.Children() {
chReqProps = append(chReqProps, &property.PhysicalProperty{
ExpectedCnt: prop.ExpectedCnt,
TaskTp: property.MppTaskType,
Expand Down Expand Up @@ -3768,7 +3758,7 @@ func (ls *LogicalSort) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]
return ret, true, nil
}
} else if prop.TaskTp == property.MppTaskType && prop.RejectSort {
if canPushToCopImpl(&ls.baseLogicalPlan, kv.TiFlash, true) {
if canPushToCopImpl(&ls.BaseLogicalPlan, kv.TiFlash, true) {
newProp := prop.CloneEssentialFields()
newProp.RejectSort = true
ps := NominalSort{OnlyColumn: true, ByItems: ls.ByItems}.Init(
Expand Down Expand Up @@ -3824,13 +3814,13 @@ func (p *LogicalSequence) ExhaustPhysicalPlans(prop *property.PhysicalProperty)
}
seqs := make([]base.PhysicalPlan, 0, 2)
for _, propChoice := range possibleChildrenProps {
childReqs := make([]*property.PhysicalProperty, 0, len(p.children))
for i := 0; i < len(p.children)-1; i++ {
childReqs := make([]*property.PhysicalProperty, 0, p.ChildLen())
for i := 0; i < p.ChildLen()-1; i++ {
childReqs = append(childReqs, propChoice[0].CloneEssentialFields())
}
childReqs = append(childReqs, propChoice[1])
seq := PhysicalSequence{}.Init(p.SCtx(), p.StatsInfo(), p.QueryBlockOffset(), childReqs...)
seq.SetSchema(p.children[len(p.children)-1].Schema())
seq.SetSchema(p.Children()[p.ChildLen()-1].Schema())
seqs = append(seqs, seq)
}
return seqs, true, nil
Expand Down
Loading