Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: classify logical join's logic into a seperate file for later pkg move. #54741

Merged
merged 3 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 2 additions & 100 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2363,104 +2363,6 @@ func preferMppBCJ(p *LogicalJoin) bool {
return checkChildFitBC(p.Children()[0]) || checkChildFitBC(p.Children()[1])
}

// ExhaustPhysicalPlans implements LogicalPlan interface
// it can generates hash join, index join and sort merge join.
// Firstly we check the hint, if hint is figured by user, we force to choose the corresponding physical plan.
// If the hint is not matched, it will get other candidates.
// If the hint is not figured, we will pick all candidates.
func (p *LogicalJoin) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
failpoint.Inject("MockOnlyEnableIndexHashJoin", func(val failpoint.Value) {
if val.(bool) && !p.SCtx().GetSessionVars().InRestrictedSQL {
indexJoins, _ := tryToGetIndexJoin(p, prop)
failpoint.Return(indexJoins, true, nil)
}
})

if !isJoinHintSupportedInMPPMode(p.PreferJoinType) {
if hasMPPJoinHints(p.PreferJoinType) {
// If there are MPP hints but has some conflicts join method hints, all the join hints are invalid.
p.SCtx().GetSessionVars().StmtCtx.SetHintWarning("The MPP join hints are in conflict, and you can only specify join method hints that are currently supported by MPP mode now")
p.PreferJoinType = 0
} else {
// If there are no MPP hints but has some conflicts join method hints, the MPP mode will be blocked.
p.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because you have used hint to specify a join algorithm which is not supported by mpp now.")
if prop.IsFlashProp() {
return nil, false, nil
}
}
}
if prop.MPPPartitionTp == property.BroadcastType {
return nil, false, nil
}
joins := make([]base.PhysicalPlan, 0, 8)
canPushToTiFlash := p.CanPushToCop(kv.TiFlash)
if p.SCtx().GetSessionVars().IsMPPAllowed() && canPushToTiFlash {
if (p.PreferJoinType & h.PreferShuffleJoin) > 0 {
if shuffleJoins := tryToGetMppHashJoin(p, prop, false); len(shuffleJoins) > 0 {
return shuffleJoins, true, nil
}
}
if (p.PreferJoinType & h.PreferBCJoin) > 0 {
if bcastJoins := tryToGetMppHashJoin(p, prop, true); len(bcastJoins) > 0 {
return bcastJoins, true, nil
}
}
if preferMppBCJ(p) {
mppJoins := tryToGetMppHashJoin(p, prop, true)
joins = append(joins, mppJoins...)
} else {
mppJoins := tryToGetMppHashJoin(p, prop, false)
joins = append(joins, mppJoins...)
}
} else {
hasMppHints := false
var errMsg string
if (p.PreferJoinType & h.PreferShuffleJoin) > 0 {
errMsg = "The join can not push down to the MPP side, the shuffle_join() hint is invalid"
hasMppHints = true
}
if (p.PreferJoinType & h.PreferBCJoin) > 0 {
errMsg = "The join can not push down to the MPP side, the broadcast_join() hint is invalid"
hasMppHints = true
}
if hasMppHints {
p.SCtx().GetSessionVars().StmtCtx.SetHintWarning(errMsg)
}
}
if prop.IsFlashProp() {
return joins, true, nil
}

if !p.isNAAJ() {
// naaj refuse merge join and index join.
mergeJoins := GetMergeJoin(p, prop, p.Schema(), p.StatsInfo(), p.Children()[0].StatsInfo(), p.Children()[1].StatsInfo())
if (p.PreferJoinType&h.PreferMergeJoin) > 0 && len(mergeJoins) > 0 {
return mergeJoins, true, nil
}
joins = append(joins, mergeJoins...)

indexJoins, forced := tryToGetIndexJoin(p, prop)
if forced {
return indexJoins, true, nil
}
joins = append(joins, indexJoins...)
}

hashJoins, forced := getHashJoins(p, prop)
if forced && len(hashJoins) > 0 {
return hashJoins, true, nil
}
joins = append(joins, hashJoins...)

if p.PreferJoinType > 0 {
// If we reach here, it means we have a hint that doesn't work.
// It might be affected by the required property, so we enforce
// this property and try the hint again.
return joins, false, nil
}
return joins, true, nil
}

func canExprsInJoinPushdown(p *LogicalJoin, storeType kv.StoreType) bool {
equalExprs := make([]expression.Expression, 0, len(p.EqualConditions))
for _, eqCondition := range p.EqualConditions {
Expand Down Expand Up @@ -2558,7 +2460,7 @@ func tryToGetMppHashJoin(p *LogicalJoin, prop *property.PhysicalProperty, useBCJ
preferredBuildIndex = 1
}
} else if p.JoinType.IsSemiJoin() {
if !useBCJ && !p.isNAAJ() && len(p.EqualConditions) > 0 && (p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin) {
if !useBCJ && !p.IsNAAJ() && len(p.EqualConditions) > 0 && (p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin) {
// TiFlash only supports Non-null_aware non-cross semi/anti_semi join to use both sides as build side
preferredBuildIndex = 1
// MPPOuterJoinFixedBuildSide default value is false
Expand All @@ -2577,7 +2479,7 @@ func tryToGetMppHashJoin(p *LogicalJoin, prop *property.PhysicalProperty, useBCJ
// 1. it is a broadcast join(for broadcast join, it makes sense to use the broadcast side as the build side)
// 2. or session variable MPPOuterJoinFixedBuildSide is set to true
// 3. or nullAware/cross joins
if useBCJ || p.isNAAJ() || len(p.EqualConditions) == 0 || p.SCtx().GetSessionVars().MPPOuterJoinFixedBuildSide {
if useBCJ || p.IsNAAJ() || len(p.EqualConditions) == 0 || p.SCtx().GetSessionVars().MPPOuterJoinFixedBuildSide {
if !p.SCtx().GetSessionVars().MPPOuterJoinFixedBuildSide {
// The hint has higher priority than variable.
fixedBuildSide = true
Expand Down
22 changes: 0 additions & 22 deletions pkg/planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,28 +880,6 @@ func formatWindowFuncDescs(ctx expression.EvalContext, buffer *bytes.Buffer, des
return buffer
}

// ExplainInfo implements Plan interface.
func (p *LogicalJoin) ExplainInfo() string {
evalCtx := p.SCtx().GetExprCtx().GetEvalCtx()
buffer := bytes.NewBufferString(p.JoinType.String())
if len(p.EqualConditions) > 0 {
fmt.Fprintf(buffer, ", equal:%v", p.EqualConditions)
}
if len(p.LeftConditions) > 0 {
fmt.Fprintf(buffer, ", left cond:%s",
expression.SortedExplainExpressionList(evalCtx, p.LeftConditions))
}
if len(p.RightConditions) > 0 {
fmt.Fprintf(buffer, ", right cond:%s",
expression.SortedExplainExpressionList(evalCtx, p.RightConditions))
}
if len(p.OtherConditions) > 0 {
fmt.Fprintf(buffer, ", other cond:%s",
expression.SortedExplainExpressionList(evalCtx, p.OtherConditions))
}
return buffer.String()
}

// ExplainInfo implements Plan interface.
func (p *LogicalApply) ExplainInfo() string {
return p.LogicalJoin.ExplainInfo()
Expand Down
6 changes: 0 additions & 6 deletions pkg/planner/core/logical_initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ import (
"github.com/pingcap/tidb/pkg/util/plancodec"
)

// Init initializes LogicalJoin.
func (p LogicalJoin) Init(ctx base.PlanContext, offset int) *LogicalJoin {
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeJoin, &p, offset)
return &p
}

// Init initializes DataSource.
func (ds DataSource) Init(ctx base.PlanContext, offset int) *DataSource {
ds.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeDataSource, &ds, offset)
Expand Down
Loading