Skip to content

Commit

Permalink
coldata: vary coldata.BatchSize in tests
Browse files Browse the repository at this point in the history
This commit exposes a setter for batchSize private variable to be
modified in tests and runs all tests in sql/colexec with a random
batch size in [3, 4096] range.

The tests in several places needed to be adjusted because their
assumptions might no longer hold when batch size is modified.

Release note: None
  • Loading branch information
yuzefovich committed Jan 14, 2020
1 parent a303753 commit 3d83574
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 39 deletions.
27 changes: 25 additions & 2 deletions pkg/col/coldata/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,39 @@ type Batch interface {

var _ Batch = &MemBatch{}

const maxBatchSize = 1024
const (
// MinBatchSize is the minimum acceptable size of batches.
MinBatchSize = 3
// MaxBatchSize is the maximum acceptable size of batches.
MaxBatchSize = 4096
)

// TODO(jordan): tune.
var batchSize = uint16(1024)

// BatchSize is the maximum number of tuples that fit in a column batch.
// TODO(jordan): tune
func BatchSize() uint16 {
return batchSize
}

// SetBatchSizeForTests modifies batchSize variable. It should only be used in
// tests.
func SetBatchSizeForTests(newBatchSize uint16) {
if newBatchSize > MaxBatchSize {
panic(
fmt.Sprintf("requested batch size %d is greater than MaxBatchSize %d",
newBatchSize, MaxBatchSize),
)
}
if newBatchSize < MinBatchSize {
panic(
fmt.Sprintf("requested batch size %d is smaller than MinBatchSize %d",
newBatchSize, MinBatchSize),
)
}
batchSize = newBatchSize
}

// NewMemBatch allocates a new in-memory Batch. A coltypes.Unknown type
// will create a placeholder Vec that may not be accessed.
// TODO(jordan): pool these allocations.
Expand Down
8 changes: 4 additions & 4 deletions pkg/col/coldata/nulls.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@

package coldata

// zeroedNulls is a zeroed out slice representing a bitmap of size maxBatchSize.
// zeroedNulls is a zeroed out slice representing a bitmap of size MaxBatchSize.
// This is copied to efficiently set all nulls.
var zeroedNulls [(maxBatchSize-1)/8 + 1]byte
var zeroedNulls [(MaxBatchSize-1)/8 + 1]byte

// filledNulls is a slice representing a bitmap of size maxBatchSize with every
// filledNulls is a slice representing a bitmap of size MaxBatchSize with every
// single bit set.
var filledNulls [(maxBatchSize-1)/8 + 1]byte
var filledNulls [(MaxBatchSize-1)/8 + 1]byte

// bitMask[i] is a byte with a single bit set at i.
var bitMask = [8]byte{0x1, 0x2, 0x4, 0x8, 0x10, 0x20, 0x40, 0x80}
Expand Down
19 changes: 11 additions & 8 deletions pkg/sql/colexec/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestAggregatorOneFunc(t *testing.T) {
{7},
{8},
},
batchSize: 4,
batchSize: 3,
outputBatchSize: 1,
name: "CarryBetweenInputAndOutputBatches",
},
Expand Down Expand Up @@ -476,6 +476,11 @@ func TestAggregatorRandom(t *testing.T) {
rng, _ := randutil.NewPseudoRand()
ctx := context.Background()
for _, groupSize := range []int{1, 2, int(coldata.BatchSize()) / 4, int(coldata.BatchSize()) / 2} {
if groupSize == 0 {
// We might be varying coldata.BatchSize() so that when it is divided by
// 4, groupSize is 0. We want to skip such configuration.
continue
}
for _, numInputBatches := range []int{1, 2, 64} {
for _, hasNulls := range []bool{true, false} {
for _, agg := range aggTypes {
Expand All @@ -495,7 +500,7 @@ func TestAggregatorRandom(t *testing.T) {
curGroup := -1
for i := range groups {
if i%groupSize == 0 {
expRowCounts = append(expRowCounts, int64(groupSize))
expRowCounts = append(expRowCounts, 0)
expCounts = append(expCounts, 0)
expSums = append(expSums, 0)
expMins = append(expMins, 2048)
Expand All @@ -508,8 +513,11 @@ func TestAggregatorRandom(t *testing.T) {
// slower.
aggCol[i] = 2048 * (rng.Float64() - 0.5)

// NULL values contribute to the row count, so we're updating
// the row counts outside of the if block.
expRowCounts[curGroup]++
if hasNulls && rng.Float64() < nullProbability {
aggColNulls.SetNull(uint16(i))
aggColNulls.SetNull64(uint64(i))
} else {
expNulls[curGroup] = false
expCounts[curGroup]++
Expand Down Expand Up @@ -704,12 +712,7 @@ func BenchmarkAggregator(b *testing.B) {
a.(resetter).reset()
source.reset()
// Exhaust aggregator until all batches have been read.
foundTuples := 0
for b := a.Next(ctx); b.Length() != 0; b = a.Next(ctx) {
foundTuples += int(b.Length())
}
if foundTuples != nTuples/groupSize {
b.Fatalf("Found %d tuples, expected %d", foundTuples, nTuples/groupSize)
}
}
},
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/colexec/hashjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,12 @@ func TestHashJoiner(t *testing.T) {
}

for _, outputBatchSize := range []uint16{1, 17, coldata.BatchSize()} {
if outputBatchSize > coldata.BatchSize() {
// It is possible for varied coldata.BatchSize() to be smaller than
// requested outputBatchSize. Such configuration is invalid, and we skip
// it.
continue
}
for _, tc := range tcs {
inputs := []tuples{tc.leftTuples, tc.rightTuples}
typs := [][]coltypes.T{tc.leftTypes, tc.rightTypes}
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/colexec/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ package colexec

import (
"context"
"fmt"
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/util/mon"
Expand Down Expand Up @@ -43,6 +45,13 @@ func TestMain(m *testing.M) {
testMemAcc = &memAcc
testAllocator = NewAllocator(ctx, testMemAcc)
defer testMemAcc.Close(ctx)
rng, _ := randutil.NewPseudoRand()
// Pick a random batch size in [coldata.MinBatchSize, coldata.MaxBatchSize]
// range.
randomBatchSize := uint16(coldata.MinBatchSize +
rng.Intn(coldata.MaxBatchSize-coldata.MinBatchSize))
fmt.Printf("coldata.BatchSize() is set to %d\n", randomBatchSize)
coldata.SetBatchSizeForTests(randomBatchSize)
return m.Run()
}())
}
19 changes: 17 additions & 2 deletions pkg/sql/colexec/mergejoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1814,10 +1814,25 @@ func TestMergeJoinerMultiBatchRuns(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
for _, groupSize := range []int{int(coldata.BatchSize()) / 8, int(coldata.BatchSize()) / 4, int(coldata.BatchSize()) / 2} {
if groupSize == 0 {
// We might be varying coldata.BatchSize() so that when it is divided by
// 4, groupSize is 0. We want to skip such configuration.
continue
}
for _, numInputBatches := range []int{1, 2, 16} {
t.Run(fmt.Sprintf("groupSize=%d/numInputBatches=%d", groupSize, numInputBatches),
func(t *testing.T) {
nTuples := int(coldata.BatchSize()) * numInputBatches
// There will be nTuples/groupSize "full" groups - i.e. groups of
// groupSize. Each of these "full" groups will produce groupSize^2
// tuples. The last group might be not full and will consist of
// nTuples % groupSize tuples. That group will produce
// lastGroupSize^2 tuples.
// Note that the math will still be correct in case when nTuples is
// divisible by groupSize - all the groups will be full and "last"
// group will be of size 0.
lastGroupSize := nTuples % groupSize
expCount := nTuples/groupSize*(groupSize*groupSize) + lastGroupSize*lastGroupSize
typs := []coltypes.T{coltypes.Int64, coltypes.Int64}
cols := []coldata.Vec{
testAllocator.NewMemColumn(typs[0], nTuples),
Expand Down Expand Up @@ -1870,9 +1885,9 @@ func TestMergeJoinerMultiBatchRuns(t *testing.T) {
i++
}

if count != groupSize*int(coldata.BatchSize())*numInputBatches {
if count != expCount {
t.Fatalf("found count %d, expected count %d",
count, groupSize*int(coldata.BatchSize())*numInputBatches)
count, expCount)
}
})
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/orderedsynchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ func TestOrderedSyncRandomInput(t *testing.T) {
numInputs := 3
inputLen := 1024
batchSize := uint16(16)
if batchSize > coldata.BatchSize() {
batchSize = coldata.BatchSize()
}

// Generate a random slice of sorted ints.
randInts := make([]int, inputLen)
Expand Down
13 changes: 9 additions & 4 deletions pkg/sql/colexec/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,14 @@ type routerOutput interface {
cancel()
}

// defaultRouterOutputBlockedThreshold is the number of unread values buffered
// by the routerOutputOp after which the output is considered blocked.
var defaultRouterOutputBlockedThreshold = int(coldata.BatchSize() * 2)
// getDefaultRouterOutputBlockedThreshold returns the number of unread values
// buffered by the routerOutputOp after which the output is considered blocked.
// It is a function rather than a variable so that in tests we could modify
// coldata.BatchSize() (if it were a variable, then its value would be
// evaluated before we set the desired batch size).
func getDefaultRouterOutputBlockedThreshold() int {
return int(coldata.BatchSize()) * 2
}

type routerOutputOp struct {
// input is a reference to our router.
Expand Down Expand Up @@ -90,7 +95,7 @@ func newRouterOutputOp(
allocator *Allocator, types []coltypes.T, unblockedEventsChan chan<- struct{},
) *routerOutputOp {
return newRouterOutputOpWithBlockedThresholdAndBatchSize(
allocator, types, unblockedEventsChan, defaultRouterOutputBlockedThreshold, int(coldata.BatchSize()),
allocator, types, unblockedEventsChan, getDefaultRouterOutputBlockedThreshold(), int(coldata.BatchSize()),
)
}

Expand Down
44 changes: 32 additions & 12 deletions pkg/sql/colexec/routers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,28 @@ func TestRouterOutputAddBatch(t *testing.T) {
{
inputBatchSize: coldata.BatchSize(),
outputBatchSize: int(coldata.BatchSize()),
blockedThreshold: defaultRouterOutputBlockedThreshold,
blockedThreshold: getDefaultRouterOutputBlockedThreshold(),
selection: fullSelection,
name: "OneBatch",
},
{
inputBatchSize: coldata.BatchSize(),
outputBatchSize: 4,
blockedThreshold: defaultRouterOutputBlockedThreshold,
blockedThreshold: getDefaultRouterOutputBlockedThreshold(),
selection: fullSelection,
name: "OneBatchGTOutputSize",
},
{
inputBatchSize: 4,
outputBatchSize: int(coldata.BatchSize()),
blockedThreshold: defaultRouterOutputBlockedThreshold,
blockedThreshold: getDefaultRouterOutputBlockedThreshold(),
selection: fullSelection,
name: "MultipleInputBatchesLTOutputSize",
},
{
inputBatchSize: coldata.BatchSize(),
outputBatchSize: int(coldata.BatchSize()),
blockedThreshold: defaultRouterOutputBlockedThreshold,
blockedThreshold: getDefaultRouterOutputBlockedThreshold(),
selection: fullSelection[:len(fullSelection)/4],
name: "QuarterSelection",
},
Expand Down Expand Up @@ -243,13 +243,23 @@ func TestRouterOutputNext(t *testing.T) {
blockThreshold = smallBatchSize / 2
)

// It is possible that coldata.BatchSize is smaller than our smallBatchSize
// which breaks the assumptions of this test. Also, interestingly, if we
// do something like 'if smallBatchSize < coldata.BatchSize()', apparently
// the compiler will produce such code that the 'if' condition will be only
// checked on the first run whereas the condition might not longer hold on
// consequent runs. That's why we have this "data dependency" on the slice.
if len(fullSelection) < blockThreshold {
return
}

// Use a smaller selection than the batch size; it increases test coverage.
selection := fullSelection[:blockThreshold]

expected := make(tuples, len(data)/(smallBatchSize/blockThreshold))
for i, j := 0, 0; i < len(data) && j < len(expected); i, j = i+smallBatchSize, j+blockThreshold {
for k := 0; k < blockThreshold; k++ {
expected[j+k] = data[i+k]
expected := make(tuples, 0, len(data))
for i := 0; i < len(data); i += smallBatchSize {
for k := 0; k < blockThreshold && i+k < len(data); k++ {
expected = append(expected, data[i+k])
}
}

Expand Down Expand Up @@ -437,14 +447,23 @@ func TestHashRouterComputesDestination(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

data := make(tuples, coldata.BatchSize())
// We have precomputed expectedNumVals only for the default batch size, so we
// will override it if a different value is set.
const expectedBatchSize = 1024
batchSize := coldata.BatchSize()
if batchSize != expectedBatchSize {
coldata.SetBatchSizeForTests(expectedBatchSize)
defer func(batchSize uint16) { coldata.SetBatchSizeForTests(batchSize) }(batchSize)
batchSize = expectedBatchSize
}
data := make(tuples, batchSize)
valsYetToSee := make(map[int64]struct{})
for i := range data {
data[i] = tuple{i}
valsYetToSee[int64(i)] = struct{}{}
}

in := newOpTestInput(coldata.BatchSize(), data, nil /* typs */)
in := newOpTestInput(batchSize, data, nil /* typs */)
in.Init()

var (
Expand All @@ -453,10 +472,11 @@ func TestHashRouterComputesDestination(t *testing.T) {
// runs of tests unless the underlying hash algorithm changes. If it does,
// distributed hash routing will not produce correct results.
expectedNumVals = []int{273, 252, 287, 212}
valsPushed = make([]int, len(expectedNumVals))
numOutputs = 4
valsPushed = make([]int, numOutputs)
)

outputs := make([]routerOutput, len(expectedNumVals))
outputs := make([]routerOutput, numOutputs)
for i := range outputs {
// Capture the index.
outputIdx := i
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/colexec/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/col/coldata"
)

var zeroBoolColumn = make([]bool, coldata.BatchSize())
var zeroBoolColumn = make([]bool, coldata.MaxBatchSize)

var zeroDecimalColumn = make([]apd.Decimal, coldata.BatchSize())
var zeroDecimalColumn = make([]apd.Decimal, coldata.MaxBatchSize)

var zeroInt16Column = make([]int16, coldata.BatchSize())
var zeroInt16Column = make([]int16, coldata.MaxBatchSize)

var zeroInt32Column = make([]int32, coldata.BatchSize())
var zeroInt32Column = make([]int32, coldata.MaxBatchSize)

var zeroInt64Column = make([]int64, coldata.BatchSize())
var zeroInt64Column = make([]int64, coldata.MaxBatchSize)

var zeroFloat64Column = make([]float64, coldata.BatchSize())
var zeroFloat64Column = make([]float64, coldata.MaxBatchSize)
11 changes: 10 additions & 1 deletion pkg/sql/colexec/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ func runTestsWithFn(
rng, _ := randutil.NewPseudoRand()

for _, batchSize := range []uint16{1, uint16(math.Trunc(.002 * float64(coldata.BatchSize()))), uint16(math.Trunc(.003 * float64(coldata.BatchSize()))), uint16(math.Trunc(.016 * float64(coldata.BatchSize()))), coldata.BatchSize()} {
if batchSize == 0 {
// It is possible for batchSize to be 0 here when varying
// coldata.BatchSize(), so we want to skip such configuration.
continue
}
for _, useSel := range []bool{false, true} {
t.Run(fmt.Sprintf("batchSize=%d/sel=%t", batchSize, useSel), func(t *testing.T) {
inputSources := make([]Operator, len(tups))
Expand Down Expand Up @@ -1036,7 +1041,11 @@ func TestRepeatableBatchSourceWithFixedSel(t *testing.T) {
defer leaktest.AfterTest(t)()
batch := testAllocator.NewMemBatch([]coltypes.T{coltypes.Int64})
rng, _ := randutil.NewPseudoRand()
sel := randomSel(rng, 10 /* batchSize */, 0 /* probOfOmitting */)
batchSize := uint16(10)
if batchSize > coldata.BatchSize() {
batchSize = coldata.BatchSize()
}
sel := randomSel(rng, batchSize, 0 /* probOfOmitting */)
batchLen := uint16(len(sel))
batch.SetLength(batchLen)
batch.SetSelection(true)
Expand Down

0 comments on commit 3d83574

Please sign in to comment.