Skip to content

Commit

Permalink
executor: support left outer semi join for hash join v2 (#57053)
Browse files Browse the repository at this point in the history
ref #53127
  • Loading branch information
wshwsh12 authored Nov 13, 2024
1 parent 4cca1ff commit 8832684
Show file tree
Hide file tree
Showing 11 changed files with 980 additions and 2 deletions.
3 changes: 3 additions & 0 deletions pkg/executor/join/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"join_row_table.go",
"join_table_meta.go",
"joiner.go",
"left_outer_semi_join_probe.go",
"merge_join.go",
"outer_join_probe.go",
"row_table_builder.go",
Expand Down Expand Up @@ -58,6 +59,7 @@ go_library(
"//pkg/util/logutil",
"//pkg/util/memory",
"//pkg/util/mvmap",
"//pkg/util/queue",
"//pkg/util/ranger",
"//pkg/util/serialization",
"//pkg/util/sqlkiller",
Expand Down Expand Up @@ -86,6 +88,7 @@ go_test(
"join_table_meta_test.go",
"joiner_test.go",
"left_outer_join_probe_test.go",
"left_outer_semi_join_probe_test.go",
"merge_join_test.go",
"outer_join_spill_test.go",
"right_outer_join_probe_test.go",
Expand Down
5 changes: 5 additions & 0 deletions pkg/executor/join/base_join_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,11 @@ func NewJoinProbe(ctx *HashJoinCtxV2, workID uint, joinType logicalop.JoinType,
return newOuterJoinProbe(base, !rightAsBuildSide, rightAsBuildSide)
case logicalop.RightOuterJoin:
return newOuterJoinProbe(base, rightAsBuildSide, rightAsBuildSide)
case logicalop.LeftOuterSemiJoin:
if rightAsBuildSide {
return newLeftOuterSemiJoinProbe(base)
}
fallthrough
default:
panic("unsupported join type")
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/executor/join/inner_join_probe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@ func testJoinProbe(t *testing.T, withSel bool, leftKeyIndex []int, rightKeyIndex
resultTypes[len(resultTypes)-1].DelFlag(mysql.NotNullFlag)
}
}
if joinType == logicalop.LeftOuterSemiJoin {
resultTypes = append(resultTypes, types.NewFieldType(mysql.TypeTiny))
}

meta := newTableMeta(buildKeyIndex, buildTypes, buildKeyTypes, probeKeyTypes, buildUsedByOtherCondition, buildUsed, needUsedFlag)
hashJoinCtx := &HashJoinCtxV2{
Expand Down Expand Up @@ -458,6 +461,10 @@ func testJoinProbe(t *testing.T, withSel bool, leftKeyIndex []int, rightKeyIndex
expectedChunks := genRightOuterJoinResult(t, hashJoinCtx.SessCtx, rightFilter, leftChunks, rightChunks, leftKeyIndex, rightKeyIndex, leftTypes,
rightTypes, leftKeyTypes, rightKeyTypes, leftUsed, rightUsed, otherCondition, resultTypes)
checkChunksEqual(t, expectedChunks, resultChunks, resultTypes)
case logicalop.LeftOuterSemiJoin:
expectedChunks := genLeftOuterSemiJoinResult(t, hashJoinCtx.SessCtx, leftFilter, leftChunks, rightChunks, leftKeyIndex, rightKeyIndex, leftTypes,
rightTypes, leftKeyTypes, rightKeyTypes, leftUsed, rightUsed, otherCondition, resultTypes)
checkChunksEqual(t, expectedChunks, resultChunks, resultTypes)
default:
require.NoError(t, errors.New("not supported join type"))
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/executor/join/inner_join_spill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ func getReturnTypes(joinType logicalop.JoinType, param spillTestParam) []*types.
resultTypes[len(resultTypes)-1].DelFlag(mysql.NotNullFlag)
}
}
if joinType == logicalop.LeftOuterSemiJoin {
resultTypes = append(resultTypes, types.NewFieldType(mysql.TypeTiny))
}
return resultTypes
}

Expand Down
283 changes: 283 additions & 0 deletions pkg/executor/join/left_outer_semi_join_probe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package join

import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/queue"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
)

type leftOuterSemiJoinProbe struct {
baseJoinProbe

// isMatchedRows marks whether the left side row is matched
isMatchedRows []bool
// isNullRows marks whether the left side row matched result is null
isNullRows []bool

// buffer isNull for other condition evaluation
isNulls []bool

// used in other condition to record which rows need to be processed
unFinishedProbeRowIdxQueue *queue.Queue[int]
}

var _ ProbeV2 = &leftOuterSemiJoinProbe{}

func newLeftOuterSemiJoinProbe(base baseJoinProbe) *leftOuterSemiJoinProbe {
probe := &leftOuterSemiJoinProbe{
baseJoinProbe: base,
}
if base.ctx.hasOtherCondition() {
probe.unFinishedProbeRowIdxQueue = queue.NewQueue[int](32)
}
return probe
}

func (j *leftOuterSemiJoinProbe) SetChunkForProbe(chunk *chunk.Chunk) (err error) {
err = j.baseJoinProbe.SetChunkForProbe(chunk)
if err != nil {
return err
}
j.resetProbeState()
return nil
}

func (j *leftOuterSemiJoinProbe) SetRestoredChunkForProbe(chunk *chunk.Chunk) (err error) {
err = j.baseJoinProbe.SetRestoredChunkForProbe(chunk)
if err != nil {
return err
}
j.resetProbeState()
return nil
}

func (j *leftOuterSemiJoinProbe) resetProbeState() {
j.isMatchedRows = j.isMatchedRows[:0]
for i := 0; i < j.chunkRows; i++ {
j.isMatchedRows = append(j.isMatchedRows, false)
}
j.isNullRows = j.isNullRows[:0]
for i := 0; i < j.chunkRows; i++ {
j.isNullRows = append(j.isNullRows, false)
}
if j.ctx.hasOtherCondition() {
j.unFinishedProbeRowIdxQueue.Clear()
for i := 0; i < j.chunkRows; i++ {
if j.matchedRowsHeaders[i] != 0 {
j.unFinishedProbeRowIdxQueue.Push(i)
}
}
}
}

func (*leftOuterSemiJoinProbe) NeedScanRowTable() bool {
return false
}

func (*leftOuterSemiJoinProbe) IsScanRowTableDone() bool {
panic("should not reach here")
}

func (*leftOuterSemiJoinProbe) InitForScanRowTable() {
panic("should not reach here")
}

func (*leftOuterSemiJoinProbe) ScanRowTable(joinResult *hashjoinWorkerResult, _ *sqlkiller.SQLKiller) *hashjoinWorkerResult {
return joinResult
}

func (j *leftOuterSemiJoinProbe) Probe(joinResult *hashjoinWorkerResult, sqlKiller *sqlkiller.SQLKiller) (ok bool, _ *hashjoinWorkerResult) {
joinedChk, remainCap, err := j.prepareForProbe(joinResult.chk)
if err != nil {
joinResult.err = err
return false, joinResult
}

if j.ctx.hasOtherCondition() {
err = j.probeWithOtherCondition(joinResult.chk, joinedChk, remainCap, sqlKiller)
} else {
err = j.probeWithoutOtherCondition(joinResult.chk, joinedChk, remainCap, sqlKiller)
}
if err != nil {
joinResult.err = err
return false, joinResult
}
return true, joinResult
}

func (j *leftOuterSemiJoinProbe) probeWithOtherCondition(chk, joinedChk *chunk.Chunk, remainCap int, sqlKiller *sqlkiller.SQLKiller) (err error) {
if !j.unFinishedProbeRowIdxQueue.IsEmpty() {
err = j.produceResult(joinedChk, sqlKiller)
if err != nil {
return err
}
j.currentProbeRow = 0
}

if j.unFinishedProbeRowIdxQueue.IsEmpty() {
startProbeRow := j.currentProbeRow
j.currentProbeRow = min(startProbeRow+remainCap, j.chunkRows)
j.buildResult(chk, startProbeRow)
}
return
}

func (j *leftOuterSemiJoinProbe) produceResult(joinedChk *chunk.Chunk, sqlKiller *sqlkiller.SQLKiller) (err error) {
err = j.concatenateProbeAndBuildRows(joinedChk, sqlKiller)
if err != nil {
return err
}

if joinedChk.NumRows() > 0 {
j.selected, j.isNulls, err = expression.VecEvalBool(j.ctx.SessCtx.GetExprCtx().GetEvalCtx(), j.ctx.SessCtx.GetSessionVars().EnableVectorizedExpression, j.ctx.OtherCondition, joinedChk, j.selected, j.isNulls)
if err != nil {
return err
}

for i := 0; i < joinedChk.NumRows(); i++ {
if j.selected[i] {
j.isMatchedRows[j.rowIndexInfos[i].probeRowIndex] = true
}
if j.isNulls[i] {
j.isNullRows[j.rowIndexInfos[i].probeRowIndex] = true
}
}
}
return nil
}

func (j *leftOuterSemiJoinProbe) probeWithoutOtherCondition(_, joinedChk *chunk.Chunk, remainCap int, sqlKiller *sqlkiller.SQLKiller) (err error) {
meta := j.ctx.hashTableMeta
startProbeRow := j.currentProbeRow
tagHelper := j.ctx.hashTableContext.tagHelper

for remainCap > 0 && j.currentProbeRow < j.chunkRows {
if j.matchedRowsHeaders[j.currentProbeRow] != 0 {
candidateRow := tagHelper.toUnsafePointer(j.matchedRowsHeaders[j.currentProbeRow])
if !isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) {
j.probeCollision++
j.matchedRowsHeaders[j.currentProbeRow] = getNextRowAddress(candidateRow, tagHelper, j.matchedRowsHashValue[j.currentProbeRow])
continue
}
j.isMatchedRows[j.currentProbeRow] = true
}
j.matchedRowsHeaders[j.currentProbeRow] = 0
remainCap--
j.currentProbeRow++
}

err = checkSQLKiller(sqlKiller, "killedDuringProbe")

if err != nil {
return err
}

j.buildResult(joinedChk, startProbeRow)
return nil
}

func (j *leftOuterSemiJoinProbe) buildResult(chk *chunk.Chunk, startProbeRow int) {
var selected []bool
if startProbeRow == 0 && j.currentProbeRow == j.chunkRows && j.currentChunk.Sel() == nil && chk.NumRows() == 0 && len(j.spilledIdx) == 0 {
// TODO: Can do a shallow copy by directly copying the Column pointers
for index, colIndex := range j.lUsed {
j.currentChunk.Column(colIndex).CopyConstruct(chk.Column(index))
}
} else {
selected = make([]bool, j.chunkRows)
for i := startProbeRow; i < j.currentProbeRow; i++ {
selected[i] = true
}
for _, spilledIdx := range j.spilledIdx {
selected[spilledIdx] = false // ignore spilled rows
}
for index, colIndex := range j.lUsed {
dstCol := chk.Column(index)
srcCol := j.currentChunk.Column(colIndex)
chunk.CopySelectedRowsWithRowIDFunc(dstCol, srcCol, selected, 0, len(selected), func(i int) int {
return j.usedRows[i]
})
}
}

for i := startProbeRow; i < j.currentProbeRow; i++ {
if selected != nil && !selected[i] {
continue
}
if j.isMatchedRows[i] {
chk.AppendInt64(len(j.lUsed), 1)
} else if j.isNullRows[i] {
chk.AppendNull(len(j.lUsed))
} else {
chk.AppendInt64(len(j.lUsed), 0)
}
}
chk.SetNumVirtualRows(chk.NumRows())
}

var maxMatchedRowNum = 4

func (j *leftOuterSemiJoinProbe) matchMultiBuildRows(joinedChk *chunk.Chunk, joinedChkRemainCap *int) {
tagHelper := j.ctx.hashTableContext.tagHelper
meta := j.ctx.hashTableMeta
for j.matchedRowsHeaders[j.currentProbeRow] != 0 && *joinedChkRemainCap > 0 && j.matchedRowsForCurrentProbeRow < maxMatchedRowNum {
candidateRow := tagHelper.toUnsafePointer(j.matchedRowsHeaders[j.currentProbeRow])
if isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) {
j.appendBuildRowToCachedBuildRowsV1(j.currentProbeRow, candidateRow, joinedChk, 0, true)
j.matchedRowsForCurrentProbeRow++
*joinedChkRemainCap--
} else {
j.probeCollision++
}
j.matchedRowsHeaders[j.currentProbeRow] = getNextRowAddress(candidateRow, tagHelper, j.matchedRowsHashValue[j.currentProbeRow])
}

j.finishLookupCurrentProbeRow()
}

func (j *leftOuterSemiJoinProbe) concatenateProbeAndBuildRows(joinedChk *chunk.Chunk, sqlKiller *sqlkiller.SQLKiller) error {
joinedChkRemainCap := joinedChk.Capacity() - joinedChk.NumRows()

for joinedChkRemainCap > 0 && !j.unFinishedProbeRowIdxQueue.IsEmpty() {
probeRowIdx := j.unFinishedProbeRowIdxQueue.Pop()
if j.isMatchedRows[probeRowIdx] {
continue
}
j.currentProbeRow = probeRowIdx
j.matchMultiBuildRows(joinedChk, &joinedChkRemainCap)
if j.matchedRowsHeaders[probeRowIdx] == 0 {
continue
}
j.unFinishedProbeRowIdxQueue.Push(probeRowIdx)
}

err := checkSQLKiller(sqlKiller, "killedDuringProbe")
if err != nil {
return err
}

j.finishCurrentLookupLoop(joinedChk)
return nil
}

func (j *leftOuterSemiJoinProbe) IsCurrentChunkProbeDone() bool {
if j.ctx.hasOtherCondition() && !j.unFinishedProbeRowIdxQueue.IsEmpty() {
return false
}
return j.baseJoinProbe.IsCurrentChunkProbeDone()
}
Loading

0 comments on commit 8832684

Please sign in to comment.