Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: move logical join and logical selection to logicalop #55272

Merged
merged 5 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/design/2022-01-04-integer-shard-index.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ The entry point to add the `tidb_shard` expression is the function as bellow. We

func (ds *DataSource) PredicatePushDown(predicates []expression.Expression, opt *logicalOptimizeOp) ([]expression.Expression, LogicalPlan) {
predicates = expression.PropagateConstant(ds.ctx, predicates)
predicates = DeleteTrueExprs(ds, predicates)
predicates = constraint.DeleteTrueExprs(ds, predicates)
// Add tidb_shard() prefix to the condtion for shard index in some scenarios
// TODO: remove it to the place building logical plan
predicates = ds.AddPrefix4ShardIndexes(ds.ctx, predicates)
Expand Down
12 changes: 6 additions & 6 deletions pkg/executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ type hashJoinTestCase struct {
concurrency int
ctx sessionctx.Context
keyIdx []int
joinType core.JoinType
joinType logicalop.JoinType
disk bool
useOuterToBuild bool
rawData string
Expand All @@ -607,7 +607,7 @@ func (tc hashJoinTestCase) String() string {
tc.rows, tc.cols, tc.concurrency, tc.keyIdx, tc.disk)
}

func defaultHashJoinTestCase(cols []*types.FieldType, joinType core.JoinType, useOuterToBuild bool) *hashJoinTestCase {
func defaultHashJoinTestCase(cols []*types.FieldType, joinType logicalop.JoinType, useOuterToBuild bool) *hashJoinTestCase {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
Expand All @@ -621,10 +621,10 @@ func defaultHashJoinTestCase(cols []*types.FieldType, joinType core.JoinType, us
return tc
}

func prepareResolveIndices(joinSchema, lSchema, rSchema *expression.Schema, joinType core.JoinType) *expression.Schema {
func prepareResolveIndices(joinSchema, lSchema, rSchema *expression.Schema, joinType logicalop.JoinType) *expression.Schema {
colsNeedResolving := joinSchema.Len()
// The last output column of this two join is the generated column to indicate whether the row is matched or not.
if joinType == core.LeftOuterSemiJoin || joinType == core.AntiLeftOuterSemiJoin {
if joinType == logicalop.LeftOuterSemiJoin || joinType == logicalop.AntiLeftOuterSemiJoin {
colsNeedResolving--
}
mergedSchema := expression.MergeSchema(lSchema, rSchema)
Expand Down Expand Up @@ -687,7 +687,7 @@ func prepare4HashJoinV2(testCase *hashJoinTestCase, innerExec, outerExec exec.Ex
// todo: need systematic way to protect.
// physical join should resolveIndices to get right schema column index.
// otherwise, markChildrenUsedColsForTest will fail below.
joinSchema = prepareResolveIndices(joinSchema, innerExec.Schema(), outerExec.Schema(), core.InnerJoin)
joinSchema = prepareResolveIndices(joinSchema, innerExec.Schema(), outerExec.Schema(), logicalop.InnerJoin)

joinKeysColIdx := make([]int, 0, len(testCase.keyIdx))
joinKeysColIdx = append(joinKeysColIdx, testCase.keyIdx...)
Expand Down Expand Up @@ -776,7 +776,7 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec exec.Exec
// todo: need systematic way to protect.
// physical join should resolveIndices to get right schema column index.
// otherwise, markChildrenUsedColsForTest will fail below.
joinSchema = prepareResolveIndices(joinSchema, innerExec.Schema(), outerExec.Schema(), core.InnerJoin)
joinSchema = prepareResolveIndices(joinSchema, innerExec.Schema(), outerExec.Schema(), logicalop.InnerJoin)

joinKeysColIdx := make([]int, 0, len(testCase.keyIdx))
joinKeysColIdx = append(joinKeysColIdx, testCase.keyIdx...)
Expand Down
20 changes: 10 additions & 10 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,15 +1428,15 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) exec.

defaultValues := v.DefaultValues
if defaultValues == nil {
if v.JoinType == plannercore.RightOuterJoin {
if v.JoinType == logicalop.RightOuterJoin {
defaultValues = make([]types.Datum, leftExec.Schema().Len())
} else {
defaultValues = make([]types.Datum, rightExec.Schema().Len())
}
}

colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
if v.JoinType == logicalop.LeftOuterSemiJoin || v.JoinType == logicalop.AntiLeftOuterSemiJoin {
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}

Expand All @@ -1447,7 +1447,7 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) exec.
Joiner: join.NewJoiner(
b.ctx,
v.JoinType,
v.JoinType == plannercore.RightOuterJoin,
v.JoinType == logicalop.RightOuterJoin,
defaultValues,
v.OtherConditions,
exec.RetTypes(leftExec),
Expand All @@ -1470,7 +1470,7 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) exec.
Filters: v.RightConditions,
}

if v.JoinType == plannercore.RightOuterJoin {
if v.JoinType == logicalop.RightOuterJoin {
e.InnerTable = leftTable
e.OuterTable = rightTable
} else {
Expand Down Expand Up @@ -1603,7 +1603,7 @@ func (b *executorBuilder) buildHashJoinV2(v *plannercore.PhysicalHashJoin) exec.
}

colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
if v.JoinType == logicalop.LeftOuterSemiJoin || v.JoinType == logicalop.AntiLeftOuterSemiJoin {
// the matched column is added inside join
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}
Expand Down Expand Up @@ -1772,7 +1772,7 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) exec.Ex
}
isNAJoin := len(v.LeftNAJoinKeys) > 0
colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
if v.JoinType == logicalop.LeftOuterSemiJoin || v.JoinType == logicalop.AntiLeftOuterSemiJoin {
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}
childrenUsedSchema := markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema())
Expand Down Expand Up @@ -2464,7 +2464,7 @@ func (b *executorBuilder) buildApply(v *plannercore.PhysicalApply) exec.Executor
OuterExec: outerExec,
OuterFilter: outerFilter,
InnerFilter: innerFilter,
Outer: v.JoinType != plannercore.InnerJoin,
Outer: v.JoinType != logicalop.InnerJoin,
Joiner: tupleJoiner,
OuterSchema: v.OuterSchema,
Sctx: b.ctx,
Expand Down Expand Up @@ -2505,7 +2505,7 @@ func (b *executorBuilder) buildApply(v *plannercore.PhysicalApply) exec.Executor
outerExec: outerExec,
outerFilter: outerFilter,
innerFilter: innerFilters,
outer: v.JoinType != plannercore.InnerJoin,
outer: v.JoinType != logicalop.InnerJoin,
joiners: joiners,
corCols: corCols,
concurrency: v.Concurrency,
Expand Down Expand Up @@ -3294,7 +3294,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plannercore.PhysicalIndexJoin)
Finished: &atomic.Value{},
}
colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
if v.JoinType == logicalop.LeftOuterSemiJoin || v.JoinType == logicalop.AntiLeftOuterSemiJoin {
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}
childrenUsedSchema := markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema())
Expand Down Expand Up @@ -3420,7 +3420,7 @@ func (b *executorBuilder) buildIndexLookUpMergeJoin(v *plannercore.PhysicalIndex
LastColHelper: v.CompareFilters,
}
colsFromChildren := v.Schema().Columns
if v.JoinType == plannercore.LeftOuterSemiJoin || v.JoinType == plannercore.AntiLeftOuterSemiJoin {
if v.JoinType == logicalop.LeftOuterSemiJoin || v.JoinType == logicalop.AntiLeftOuterSemiJoin {
colsFromChildren = colsFromChildren[:len(colsFromChildren)-1]
}
childrenUsedSchema := markChildrenUsedCols(colsFromChildren, v.Children()[0].Schema(), v.Children()[1].Schema())
Expand Down
9 changes: 5 additions & 4 deletions pkg/executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
Expand Down Expand Up @@ -697,8 +698,8 @@ func TestMergeJoinRequiredRows(t *testing.T) {
panic("not support")
}
}
joinTypes := []plannercore.JoinType{plannercore.RightOuterJoin, plannercore.LeftOuterJoin,
plannercore.LeftOuterSemiJoin, plannercore.AntiLeftOuterSemiJoin}
joinTypes := []logicalop.JoinType{logicalop.RightOuterJoin, logicalop.LeftOuterJoin,
logicalop.LeftOuterSemiJoin, logicalop.AntiLeftOuterSemiJoin}
for _, joinType := range joinTypes {
ctx := defaultCtx()
required := make([]int, 100)
Expand All @@ -720,8 +721,8 @@ func TestMergeJoinRequiredRows(t *testing.T) {
}
}

func buildMergeJoinExec(ctx sessionctx.Context, joinType plannercore.JoinType, innerSrc, outerSrc exec.Executor) exec.Executor {
if joinType == plannercore.RightOuterJoin {
func buildMergeJoinExec(ctx sessionctx.Context, joinType logicalop.JoinType, innerSrc, outerSrc exec.Executor) exec.Executor {
if joinType == logicalop.RightOuterJoin {
innerSrc, outerSrc = outerSrc, innerSrc
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/join/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/planner/core",
"//pkg/planner/core/operator/logicalop",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
Expand Down Expand Up @@ -87,7 +88,7 @@ go_test(
"//pkg/expression",
"//pkg/parser/ast",
"//pkg/parser/mysql",
"//pkg/planner/core",
"//pkg/planner/core/operator/logicalop",
"//pkg/session",
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
Expand Down
10 changes: 5 additions & 5 deletions pkg/executor/join/base_join_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
Expand Down Expand Up @@ -498,7 +498,7 @@ func isKeyMatched(keyMode keyMode, serializedKey []byte, rowStart unsafe.Pointer
}

// NewJoinProbe create a join probe used for hash join v2
func NewJoinProbe(ctx *HashJoinCtxV2, workID uint, joinType core.JoinType, keyIndex []int, joinedColumnTypes, probeKeyTypes []*types.FieldType, rightAsBuildSide bool) ProbeV2 {
func NewJoinProbe(ctx *HashJoinCtxV2, workID uint, joinType logicalop.JoinType, keyIndex []int, joinedColumnTypes, probeKeyTypes []*types.FieldType, rightAsBuildSide bool) ProbeV2 {
base := baseJoinProbe{
ctx: ctx,
workID: workID,
Expand Down Expand Up @@ -540,11 +540,11 @@ func NewJoinProbe(ctx *HashJoinCtxV2, workID uint, joinType core.JoinType, keyIn
base.rowIndexInfos = make([]*matchedRowInfo, 0, chunk.InitialCapacity)
}
switch joinType {
case core.InnerJoin:
case logicalop.InnerJoin:
return &innerJoinProbe{base}
case core.LeftOuterJoin:
case logicalop.LeftOuterJoin:
return newOuterJoinProbe(base, !rightAsBuildSide, rightAsBuildSide)
case core.RightOuterJoin:
case logicalop.RightOuterJoin:
return newOuterJoinProbe(base, rightAsBuildSide, rightAsBuildSide)
default:
panic("unsupported join type")
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/join/hash_join_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
Expand Down Expand Up @@ -55,7 +55,7 @@ type hashJoinCtxBase struct {
finished atomic.Bool
IsNullEQ []bool
buildFinished chan error
JoinType plannercore.JoinType
JoinType logicalop.JoinType
IsNullAware bool
memTracker *memory.Tracker // track memory usage.
diskTracker *disk.Tracker // track disk usage.
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/join/hash_join_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/executor/unionexec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser/terror"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/types"
Expand Down Expand Up @@ -216,7 +216,7 @@ func (e *HashJoinV1Exec) fetchAndProbeHashTable(ctx context.Context) {
defer trace.StartRegion(ctx, "HashJoinProbeSideFetcher").End()
e.ProbeSideTupleFetcher.fetchProbeSideChunks(ctx, e.MaxChunkSize(), func() bool {
return e.ProbeSideTupleFetcher.RowContainer.Len() == uint64(0)
}, e.ProbeSideTupleFetcher.JoinType == plannercore.InnerJoin || e.ProbeSideTupleFetcher.JoinType == plannercore.SemiJoin,
}, e.ProbeSideTupleFetcher.JoinType == logicalop.InnerJoin || e.ProbeSideTupleFetcher.JoinType == logicalop.SemiJoin,
false, e.ProbeSideTupleFetcher.IsOuterJoin, &e.ProbeSideTupleFetcher.hashJoinCtxBase)
}, e.ProbeSideTupleFetcher.handleProbeSideFetcherPanic)

Expand Down Expand Up @@ -737,8 +737,8 @@ func (w *ProbeWorkerV1) joinNAASJMatchProbeSideRow2Chunk(probeKey uint64, probeK
// For NA-AntiLeftOuterSemiJoin, we couldn't match null-bucket first, because once y set has a same key x and null
// key, we should return the result as left side row appended with a scalar value 0 which is from same key matching failure.
func (w *ProbeWorkerV1) joinNAAJMatchProbeSideRow2Chunk(probeKey uint64, probeKeyNullBits *bitmap.ConcurrentBitmap, probeSideRow chunk.Row, hCtx *HashContext, joinResult *hashjoinWorkerResult) (bool, int64, *hashjoinWorkerResult) {
naAntiSemiJoin := w.HashJoinCtx.JoinType == plannercore.AntiSemiJoin && w.HashJoinCtx.IsNullAware
naAntiLeftOuterSemiJoin := w.HashJoinCtx.JoinType == plannercore.AntiLeftOuterSemiJoin && w.HashJoinCtx.IsNullAware
naAntiSemiJoin := w.HashJoinCtx.JoinType == logicalop.AntiSemiJoin && w.HashJoinCtx.IsNullAware
naAntiLeftOuterSemiJoin := w.HashJoinCtx.JoinType == logicalop.AntiLeftOuterSemiJoin && w.HashJoinCtx.IsNullAware
if naAntiSemiJoin {
return w.joinNAASJMatchProbeSideRow2Chunk(probeKey, probeKeyNullBits, probeSideRow, hCtx, joinResult)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/executor/join/hash_join_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser/mysql"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/channel"
Expand Down Expand Up @@ -335,10 +335,10 @@ func (e *HashJoinV2Exec) Open(ctx context.Context) error {
}

func (fetcher *ProbeSideTupleFetcherV2) shouldLimitProbeFetchSize() bool {
if fetcher.JoinType == plannercore.LeftOuterJoin && fetcher.RightAsBuildSide {
if fetcher.JoinType == logicalop.LeftOuterJoin && fetcher.RightAsBuildSide {
return true
}
if fetcher.JoinType == plannercore.RightOuterJoin && !fetcher.RightAsBuildSide {
if fetcher.JoinType == logicalop.RightOuterJoin && !fetcher.RightAsBuildSide {
return true
}
return false
Expand Down Expand Up @@ -374,13 +374,13 @@ func (w *BuildWorkerV2) splitPartitionAndAppendToRowTable(typeCtx types.Context,

func (e *HashJoinV2Exec) canSkipProbeIfHashTableIsEmpty() bool {
switch e.JoinType {
case plannercore.InnerJoin:
case logicalop.InnerJoin:
return true
case plannercore.LeftOuterJoin:
case logicalop.LeftOuterJoin:
return !e.RightAsBuildSide
case plannercore.RightOuterJoin:
case logicalop.RightOuterJoin:
return e.RightAsBuildSide
case plannercore.SemiJoin:
case logicalop.SemiJoin:
return e.RightAsBuildSide
default:
return false
Expand Down
Loading