Skip to content

Commit

Permalink
executor: Use uint32 as element length in hash join v2 (#57458)
Browse files Browse the repository at this point in the history
ref #53127
  • Loading branch information
windtalker authored Nov 19, 2024
1 parent 0e3efb6 commit ac90676
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 37 deletions.
6 changes: 3 additions & 3 deletions pkg/executor/join/base_join_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,8 @@ func (j *baseJoinProbe) appendBuildRowToChunkInternal(chk *chunk.Chunk, usedCols
// not used so don't need to insert into chk, but still need to advance rowData
if meta.columnsSize[columnIndex] < 0 {
for index := 0; index < j.nextCachedBuildRowIndex; index++ {
size := *(*uint64)(unsafe.Add(*(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), j.cachedBuildRows[index].buildRowOffset))
j.cachedBuildRows[index].buildRowOffset += sizeOfLengthField + int(size)
size := *(*uint32)(unsafe.Add(*(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), j.cachedBuildRows[index].buildRowOffset))
j.cachedBuildRows[index].buildRowOffset += sizeOfElementSize + int(size)
}
} else {
for index := 0; index < j.nextCachedBuildRowIndex; index++ {
Expand Down Expand Up @@ -687,7 +687,7 @@ func isKeyMatched(keyMode keyMode, serializedKey []byte, rowStart unsafe.Pointer
case FixedSerializedKey:
return bytes.Equal(serializedKey, hack.GetBytesFromPtr(unsafe.Add(rowStart, meta.nullMapLength+sizeOfNextPtr), meta.joinKeysLength))
case VariableSerializedKey:
return bytes.Equal(serializedKey, hack.GetBytesFromPtr(unsafe.Add(rowStart, meta.nullMapLength+sizeOfNextPtr+sizeOfLengthField), int(meta.getSerializedKeyLength(rowStart))))
return bytes.Equal(serializedKey, hack.GetBytesFromPtr(unsafe.Add(rowStart, meta.nullMapLength+sizeOfNextPtr+sizeOfElementSize), int(meta.getSerializedKeyLength(rowStart))))
default:
panic("unknown key match type")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/join/join_row_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

const sizeOfNextPtr = int(unsafe.Sizeof(uintptr(0)))
const sizeOfLengthField = int(unsafe.Sizeof(uint64(1)))
const sizeOfElementSize = int(unsafe.Sizeof(uint32(1)))
const sizeOfUnsafePointer = int(unsafe.Sizeof(unsafe.Pointer(nil)))
const sizeOfUintptr = int(unsafe.Sizeof(uintptr(0)))

Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/join/join_row_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestHeapObjectCanMove(t *testing.T) {

func TestFixedOffsetInRowLayout(t *testing.T) {
require.Equal(t, 8, sizeOfNextPtr)
require.Equal(t, 8, sizeOfLengthField)
require.Equal(t, 4, sizeOfElementSize)
}

func TestBitMaskInUint32(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions pkg/executor/join/join_table_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ type joinTableMeta struct {
fakeKeyByte []byte
}

func (meta *joinTableMeta) getSerializedKeyLength(rowStart unsafe.Pointer) uint64 {
return *(*uint64)(unsafe.Add(rowStart, sizeOfNextPtr+meta.nullMapLength))
func (meta *joinTableMeta) getSerializedKeyLength(rowStart unsafe.Pointer) uint32 {
return *(*uint32)(unsafe.Add(rowStart, sizeOfNextPtr+meta.nullMapLength))
}

func (meta *joinTableMeta) isReadNullMapThreadSafe(columnIndex int) bool {
Expand All @@ -85,7 +85,7 @@ func (meta *joinTableMeta) getKeyBytes(rowStart unsafe.Pointer) []byte {
case FixedSerializedKey:
return hack.GetBytesFromPtr(unsafe.Add(rowStart, meta.nullMapLength+sizeOfNextPtr), meta.joinKeysLength)
case VariableSerializedKey:
return hack.GetBytesFromPtr(unsafe.Add(rowStart, meta.nullMapLength+sizeOfNextPtr+sizeOfLengthField), int(meta.getSerializedKeyLength(rowStart)))
return hack.GetBytesFromPtr(unsafe.Add(rowStart, meta.nullMapLength+sizeOfNextPtr+sizeOfElementSize), int(meta.getSerializedKeyLength(rowStart)))
default:
panic("unknown key match type")
}
Expand All @@ -94,7 +94,7 @@ func (meta *joinTableMeta) getKeyBytes(rowStart unsafe.Pointer) []byte {
func (meta *joinTableMeta) advanceToRowData(matchedRowInfo *matchedRowInfo) {
if meta.rowDataOffset == -1 {
// variable length, non-inlined key
matchedRowInfo.buildRowOffset = sizeOfNextPtr + meta.nullMapLength + sizeOfLengthField + int(meta.getSerializedKeyLength(*(*unsafe.Pointer)(unsafe.Pointer(&matchedRowInfo.buildRowStart))))
matchedRowInfo.buildRowOffset = sizeOfNextPtr + meta.nullMapLength + sizeOfElementSize + int(meta.getSerializedKeyLength(*(*unsafe.Pointer)(unsafe.Pointer(&matchedRowInfo.buildRowStart))))
} else {
matchedRowInfo.buildRowOffset = meta.rowDataOffset
}
Expand Down Expand Up @@ -338,7 +338,7 @@ func newTableMeta(buildKeyIndex []int, buildTypes, buildKeyTypes, probeKeyTypes
if meta.isJoinKeysFixedLength {
meta.rowDataOffset = sizeOfNextPtr + meta.nullMapLength
} else {
meta.rowDataOffset = sizeOfNextPtr + meta.nullMapLength + sizeOfLengthField
meta.rowDataOffset = sizeOfNextPtr + meta.nullMapLength + sizeOfElementSize
}
} else {
if meta.isJoinKeysFixedLength {
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/join/left_outer_semi_join_probe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func TestLeftOuterSemiJoinSpill(t *testing.T) {
// basic case
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{3000000, 1700000, 3500000, 750000, 10000}},
// with other condition
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, otherCondition, []int{1}, []int{3}, []int64{3000000, 1700000, 4000000, 750000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, otherCondition, []int{1}, []int{3}, []int64{3000000, 1700000, 3500000, 750000, 10000}},
}

for _, param := range params {
Expand All @@ -478,7 +478,7 @@ func TestLeftOuterSemiJoinSpill(t *testing.T) {
// basic case with sel
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, nil, nil, nil, []int64{1000000, 900000, 1700000, 100000, 10000}},
// with other condition with sel
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, otherCondition, []int{1}, []int{3}, []int64{1000000, 900000, 2000000, 100000, 10000}},
{true, leftKeys, rightKeys, leftTypes, rightTypes, []int{0, 1, 3, 4}, []int{}, otherCondition, []int{1}, []int{3}, []int64{1000000, 900000, 1600000, 100000, 10000}},
}

for _, param := range params2 {
Expand Down
68 changes: 50 additions & 18 deletions pkg/executor/join/row_table_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package join

import (
"errors"
"hash"
"hash/fnv"
"math"
"strconv"
"unsafe"

"github.com/pingcap/tidb/pkg/expression"
Expand Down Expand Up @@ -81,8 +84,31 @@ func (b *rowTableBuilder) initHashValueAndPartIndexForOneChunk(partitionMaskOffs
}
}

func (b *rowTableBuilder) checkMaxElementSize(chk *chunk.Chunk, hashJoinCtx *HashJoinCtxV2) (bool, int) {
// check both join keys and the columns needed to be converted to row format
for _, colIdx := range b.buildKeyIndex {
column := chk.Column(colIdx)
if column.ContainsVeryLargeElement() {
return true, colIdx
}
}
for _, colIdx := range hashJoinCtx.hashTableMeta.rowColumnsOrder {
column := chk.Column(colIdx)
if column.ContainsVeryLargeElement() {
return true, colIdx
}
}
return false, 0
}

func (b *rowTableBuilder) processOneChunk(chk *chunk.Chunk, typeCtx types.Context, hashJoinCtx *HashJoinCtxV2, workerID int) error {
elementSizeExceedLimit, colIdx := b.checkMaxElementSize(chk, hashJoinCtx)
if elementSizeExceedLimit {
// TiDB's max row size is 128MB, so element size should never exceed limit
return errors.New("row table build failed: column contains element larger than 4GB, column index: " + strconv.Itoa(colIdx))
}
b.ResetBuffer(chk)

b.firstSegRowSizeHint = max(uint(1), uint(float64(len(b.usedRows))/float64(hashJoinCtx.partitionNumber)*float64(1.2)))
var err error
if b.hasFilter {
Expand All @@ -102,6 +128,12 @@ func (b *rowTableBuilder) processOneChunk(chk *chunk.Chunk, typeCtx types.Contex
return err
}
}
for _, key := range b.serializedKeyVectorBuffer {
if len(key) > math.MaxUint32 {
// TiDB's max row size is 128MB, so key size should never exceed limit
return errors.New("row table build failed: join key contains element larger than 4GB")
}
}
err = checkSQLKiller(&hashJoinCtx.SessCtx.GetSessionVars().SQLKiller, "killedDuringBuild")
if err != nil {
return err
Expand Down Expand Up @@ -251,56 +283,56 @@ func fillNextRowPtr(seg *rowTableSegment) int {
return sizeOfNextPtr
}

func (b *rowTableBuilder) fillSerializedKeyAndKeyLengthIfNeeded(rowTableMeta *joinTableMeta, hasValidKey bool, logicalRowIndex int, seg *rowTableSegment) int {
appendRowLength := 0
func (b *rowTableBuilder) fillSerializedKeyAndKeyLengthIfNeeded(rowTableMeta *joinTableMeta, hasValidKey bool, logicalRowIndex int, seg *rowTableSegment) int64 {
appendRowLength := int64(0)
// 1. fill key length if needed
if !rowTableMeta.isJoinKeysFixedLength {
// if join_key is not fixed length: `key_length` need to be written in rawData
// even the join keys is inlined, for example if join key is 2 binary string
// then the inlined join key should be: col1_size + col1_data + col2_size + col2_data
// and len(col1_size + col1_data + col2_size + col2_data) need to be written before the inlined join key
length := uint64(0)
length := uint32(0)
if hasValidKey {
length = uint64(len(b.serializedKeyVectorBuffer[logicalRowIndex]))
length = uint32(len(b.serializedKeyVectorBuffer[logicalRowIndex]))
} else {
length = 0
}
seg.rawData = append(seg.rawData, unsafe.Slice((*byte)(unsafe.Pointer(&length)), sizeOfLengthField)...)
appendRowLength += sizeOfLengthField
seg.rawData = append(seg.rawData, unsafe.Slice((*byte)(unsafe.Pointer(&length)), sizeOfElementSize)...)
appendRowLength += int64(sizeOfElementSize)
}
// 2. fill serialized key if needed
if !rowTableMeta.isJoinKeysInlined {
// if join_key is not inlined: `serialized_key` need to be written in rawData
if hasValidKey {
seg.rawData = append(seg.rawData, b.serializedKeyVectorBuffer[logicalRowIndex]...)
appendRowLength += len(b.serializedKeyVectorBuffer[logicalRowIndex])
appendRowLength += int64(len(b.serializedKeyVectorBuffer[logicalRowIndex]))
} else {
// if there is no valid key, and the key is fixed length, then write a fake key
if rowTableMeta.isJoinKeysFixedLength {
seg.rawData = append(seg.rawData, rowTableMeta.fakeKeyByte...)
appendRowLength += rowTableMeta.joinKeysLength
appendRowLength += int64(rowTableMeta.joinKeysLength)
}
// otherwise don't need to write since length is 0
}
}
return appendRowLength
}

func fillRowData(rowTableMeta *joinTableMeta, row *chunk.Row, seg *rowTableSegment) int {
appendRowLength := 0
func fillRowData(rowTableMeta *joinTableMeta, row *chunk.Row, seg *rowTableSegment) int64 {
appendRowLength := int64(0)
for index, colIdx := range rowTableMeta.rowColumnsOrder {
if rowTableMeta.columnsSize[index] > 0 {
// fixed size
seg.rawData = append(seg.rawData, row.GetRaw(colIdx)...)
appendRowLength += rowTableMeta.columnsSize[index]
appendRowLength += int64(rowTableMeta.columnsSize[index])
} else {
// length, raw_data
raw := row.GetRaw(colIdx)
length := uint64(len(raw))
seg.rawData = append(seg.rawData, unsafe.Slice((*byte)(unsafe.Pointer(&length)), sizeOfLengthField)...)
appendRowLength += sizeOfLengthField
length := uint32(len(raw))
seg.rawData = append(seg.rawData, unsafe.Slice((*byte)(unsafe.Pointer(&length)), sizeOfElementSize)...)
appendRowLength += int64(sizeOfElementSize)
seg.rawData = append(seg.rawData, raw...)
appendRowLength += int(length)
appendRowLength += int64(length)
}
}
return appendRowLength
Expand Down Expand Up @@ -337,11 +369,11 @@ func (b *rowTableBuilder) appendToRowTable(chk *chunk.Chunk, hashJoinCtx *HashJo
}
seg.hashValues = append(seg.hashValues, b.hashValue[logicalRowIndex])
seg.rowStartOffset = append(seg.rowStartOffset, uint64(len(seg.rawData)))
rowLength := 0
rowLength := int64(0)
// fill next_row_ptr field
rowLength += fillNextRowPtr(seg)
rowLength += int64(fillNextRowPtr(seg))
// fill null_map
rowLength += fillNullMap(rowTableMeta, &row, seg)
rowLength += int64(fillNullMap(rowTableMeta, &row, seg))
// fill serialized key and key length if needed
rowLength += b.fillSerializedKeyAndKeyLengthIfNeeded(rowTableMeta, hasValidKey, logicalRowIndex, seg)
// fill row data
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,12 +471,12 @@ func AppendCellFromRawData(dst *Column, rowData unsafe.Pointer, currentOffset in
dst.data = append(dst.data, hack.GetBytesFromPtr(unsafe.Add(rowData, currentOffset), elemLen)...)
currentOffset += elemLen
} else {
elemLen := *(*uint64)(unsafe.Add(rowData, currentOffset))
elemLen := *(*uint32)(unsafe.Add(rowData, currentOffset))
if elemLen > 0 {
dst.data = append(dst.data, hack.GetBytesFromPtr(unsafe.Add(rowData, currentOffset+8), int(elemLen))...)
dst.data = append(dst.data, hack.GetBytesFromPtr(unsafe.Add(rowData, currentOffset+sizeUint32), int(elemLen))...)
}
dst.offsets = append(dst.offsets, int64(len(dst.data)))
currentOffset += int(elemLen + 8)
currentOffset += int(elemLen + uint32(sizeUint32))
}
dst.length++
return currentOffset
Expand Down
21 changes: 21 additions & 0 deletions pkg/util/chunk/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package chunk

import (
"fmt"
"math"
"math/bits"
"math/rand"
"reflect"
Expand Down Expand Up @@ -383,6 +384,7 @@ func (c *Column) AppendEnum(enum types.Enum) {
const (
sizeInt64 = int(unsafe.Sizeof(int64(0)))
sizeUint64 = int(unsafe.Sizeof(uint64(0)))
sizeUint32 = int(unsafe.Sizeof(uint32(0)))
sizeFloat32 = int(unsafe.Sizeof(float32(0)))
sizeFloat64 = int(unsafe.Sizeof(float64(0)))
sizeMyDecimal = int(unsafe.Sizeof(types.MyDecimal{}))
Expand Down Expand Up @@ -871,3 +873,22 @@ func (c *Column) DestroyDataForTest() {
c.data[i] = byte(rand.Intn(256))
}
}

// ContainsVeryLargeElement checks if the column contains element whose length is greater than math.MaxUint32.
func (c *Column) ContainsVeryLargeElement() bool {
if c.length == 0 {
return false
}
if c.isFixed() {
return false
}
if c.offsets[c.length] <= math.MaxUint32 {
return false
}
for i := range c.length {
if c.offsets[i+1]-c.offsets[i] > math.MaxUint32 {
return true
}
}
return false
}
8 changes: 4 additions & 4 deletions pkg/util/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,9 @@ func SerializeKeys(typeCtx types.Context, chk *chunk.Chunk, tp *types.FieldType,
continue
}
data := ConvertByCollation(column.GetBytes(physicalRowIndex), tp)
size := uint64(len(data))
size := uint32(len(data))
if serializeMode == KeepVarColumnLength {
serializedKeysVector[logicalRowIndex] = append(serializedKeysVector[logicalRowIndex], unsafe.Slice((*byte)(unsafe.Pointer(&size)), sizeUint64)...)
serializedKeysVector[logicalRowIndex] = append(serializedKeysVector[logicalRowIndex], unsafe.Slice((*byte)(unsafe.Pointer(&size)), sizeUint32)...)
}
serializedKeysVector[logicalRowIndex] = append(serializedKeysVector[logicalRowIndex], data...)
}
Expand Down Expand Up @@ -596,8 +596,8 @@ func SerializeKeys(typeCtx types.Context, chk *chunk.Chunk, tp *types.FieldType,
jsonHashBuffer = jsonHashBuffer[:0]
jsonHashBuffer = column.GetJSON(physicalRowindex).HashValue(jsonHashBuffer)
if serializeMode == KeepVarColumnLength {
size := uint64(len(jsonHashBuffer))
serializedKeysVector[logicalRowIndex] = append(serializedKeysVector[logicalRowIndex], unsafe.Slice((*byte)(unsafe.Pointer(&size)), sizeUint64)...)
size := uint32(len(jsonHashBuffer))
serializedKeysVector[logicalRowIndex] = append(serializedKeysVector[logicalRowIndex], unsafe.Slice((*byte)(unsafe.Pointer(&size)), sizeUint32)...)
}
serializedKeysVector[logicalRowIndex] = append(serializedKeysVector[logicalRowIndex], jsonHashBuffer...)
}
Expand Down

0 comments on commit ac90676

Please sign in to comment.