Skip to content

Commit

Permalink
Fix incorrect results with parquet page filtering
Browse files Browse the repository at this point in the history
Fixes an off-by-one bug in the nested data case, which was
causing failures like "Loaded block positions count (511)
doesn't match lazy block positions count (512)"
This was handled by the changes around `targetRow` calculations.

Fixes another issue around counting of consumed rows during `seek`.
This was causing wrong results when certain blocks are skipped due to lazy loading.
This was handled by the changes in `seek` and `processValues` functions.
  • Loading branch information
raunaqmorarka authored and martint committed Oct 22, 2021
1 parent 22272e2 commit 2e60184
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,12 @@ private void readValues(BlockBuilder blockBuilder, int valuesToRead, Type type,
readValue(blockBuilder, type);
definitionLevels.add(definitionLevel);
repetitionLevels.add(repetitionLevel);
}, false);
});
}

private long skipValues(long valuesToRead)
private void skipValues(long valuesToRead)
{
return processValues(valuesToRead, this::skipValue, true);
processValues(valuesToRead, this::skipValue);
}

/**
Expand Down Expand Up @@ -273,21 +273,17 @@ private long skipValues(long valuesToRead)
* values (and the related rl and dl) for the rows [20, 39] in the end of the page 0 for col2. Similarly, we have to
* skip values while reading page0 and page1 for col3.
*/
private long processValues(long valuesToRead, Runnable valueReader, boolean consumeSkippedValues)
private void processValues(long valuesToRead, Runnable valueReader)
{
if (definitionLevel == EMPTY_LEVEL_VALUE && repetitionLevel == EMPTY_LEVEL_VALUE) {
definitionLevel = definitionReader.readLevel();
repetitionLevel = repetitionReader.readLevel();
}
long rowCount = 0;
int valueCount = 0;
int skipCount = 0;
for (int i = 0; i < valuesToRead; ) {
boolean consumed;
do {
if (repetitionLevel == 0) {
rowCount++;
}
if (incrementRowAndTestIfTargetReached(repetitionLevel)) {
valueReader.run();
valueCount++;
Expand All @@ -296,13 +292,13 @@ private long processValues(long valuesToRead, Runnable valueReader, boolean cons
else {
skipValue();
skipCount++;
consumed = consumeSkippedValues;
consumed = false;
}

if (valueCount + skipCount == remainingValueCountInPage) {
updateValueCounts(valueCount, skipCount);
if (!readNextPage()) {
return rowCount;
return;
}
valueCount = 0;
skipCount = 0;
Expand All @@ -318,7 +314,6 @@ private long processValues(long valuesToRead, Runnable valueReader, boolean cons
}
}
updateValueCounts(valueCount, skipCount);
return rowCount;
}

private void seek()
Expand All @@ -331,13 +326,8 @@ private void seek()
int valuePosition = 0;
while (valuePosition < readOffset) {
if (page == null) {
readNextPage();
if (indexIterator != null && indexIterator.hasNext()) {
long skipRows = targetRow - currentRow;
while (skipRows > 0) {
skipRows -= skipValues(skipRows);
}
currentRow = targetRow;
if (!readNextPage()) {
break;
}
}
int offset = Math.min(remainingValueCountInPage, readOffset - valuePosition);
Expand Down Expand Up @@ -444,11 +434,12 @@ private boolean incrementRowAndTestIfTargetReached(int repetitionLevel)
if (currentRow > targetRow) {
targetRow = indexIterator.hasNext() ? indexIterator.next() : Long.MAX_VALUE;
}
boolean isAtOrAfterTargetRow = currentRow >= targetRow;
boolean isAtTargetRow = currentRow == targetRow;
currentRow++;
return isAtOrAfterTargetRow;
return isAtTargetRow;
}

return currentRow >= targetRow;
// currentRow was incremented at repetitionLevel 0
return currentRow - 1 == targetRow;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import java.util.OptionalLong;
import java.util.PrimitiveIterator;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.LongStream;

Expand All @@ -67,6 +69,7 @@ public class TestColumnReader
@Test(dataProvider = "testRowRangesProvider")
public void testReadFilteredPage(
ColumnReaderInput columnReaderInput,
BatchSkipper skipper,
Optional<RowRanges> rowRanges,
List<RowRange> pageRowRanges)
{
Expand All @@ -84,13 +87,22 @@ public void testReadFilteredPage(

int readCount = 0;
int batchSize = 1;
Supplier<Boolean> skipFunction = skipper.getFunction();
while (readCount < rowCount) {
reader.prepareNextRead(batchSize);
Block block = reader.readPrimitive(columnReaderInput.getField()).getBlock();
assertThat(block.getPositionCount()).isEqualTo(batchSize);
for (int i = 0; i < block.getPositionCount(); i++) {
valuesRead.add((long) block.getInt(i, 0));
expectedValues.add(rowRangesIterator.next());
if (skipFunction.get()) {
// skip current batch to force a seek on next read
for (int i = 0; i < batchSize; i++) {
rowRangesIterator.next();
}
}
else {
Block block = reader.readPrimitive(columnReaderInput.getField()).getBlock();
assertThat(block.getPositionCount()).isEqualTo(batchSize);
for (int i = 0; i < block.getPositionCount(); i++) {
valuesRead.add((long) block.getInt(i, 0));
expectedValues.add(rowRangesIterator.next());
}
}

readCount += batchSize;
Expand All @@ -104,6 +116,7 @@ public void testReadFilteredPage(
public Object[][] testRowRangesProvider()
{
Object[] columnReaders = ColumnReaderInput.values();
Object[] batchSkippers = BatchSkipper.values();
Object[] rowRanges = new Object[] {
Optional.empty(),
Optional.of(toRowRange(1024)),
Expand All @@ -118,9 +131,10 @@ public Object[][] testRowRangesProvider()
ImmutableList.of(range(0, 255), range(256, 511), range(512, 767), range(768, 1023)),
ImmutableList.of(range(0, 99), range(100, 199), range(200, 399), range(400, 599), range(600, 799), range(800, 999), range(1000, 1023))
};
Object[][] rangesWithNoPageSkipped = cartesianProduct(columnReaders, rowRanges, pageRowRanges);
Object[][] rangesWithNoPageSkipped = cartesianProduct(columnReaders, batchSkippers, rowRanges, pageRowRanges);
Object[][] rangesWithPagesSkipped = cartesianProduct(
columnReaders,
batchSkippers,
new Object[] {Optional.of(toRowRanges(range(56, 80), range(120, 200), range(350, 455), range(600, 940)))},
new Object[] {ImmutableList.of(range(50, 100), range(120, 275), range(290, 455), range(590, 800), range(801, 1000))});
return combine(rangesWithNoPageSkipped, rangesWithPagesSkipped);
Expand Down Expand Up @@ -151,6 +165,38 @@ public PrimitiveField getField()
}
}

private enum BatchSkipper
{
NO_SEEK {
@Override
Supplier<Boolean> getFunction()
{
return () -> false;
}
},
RANDOM_SEEK {
@Override
Supplier<Boolean> getFunction()
{
Random random = new Random(42);
return random::nextBoolean;
}
},
ALTERNATE_SEEK {
@Override
Supplier<Boolean> getFunction()
{
AtomicBoolean last = new AtomicBoolean();
return () -> {
last.set(!last.get());
return last.get();
};
}
};

abstract Supplier<Boolean> getFunction();
}

private static class TestingPageReader
extends PageReader
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ public void testPageSkipping(String sortByColumn, String sortByColumnType, Objec
assertThat(assertColumnIndexResults(format("SELECT * FROM %s WHERE %s < %s", tableName, sortByColumn, lowValue))).isGreaterThan(0);
assertThat(assertColumnIndexResults(format("SELECT * FROM %s WHERE %s > %s", tableName, sortByColumn, highValue))).isGreaterThan(0);
assertThat(assertColumnIndexResults(format("SELECT * FROM %s WHERE %s BETWEEN %s AND %s", tableName, sortByColumn, middleLowValue, middleHighValue))).isGreaterThan(0);
// Nested data
assertColumnIndexResults(format("SELECT rvalues FROM %s WHERE %s IN (%s, %s, %s, %s)", tableName, sortByColumn, lowValue, middleLowValue, middleHighValue, highValue));
// Without nested data
assertColumnIndexResults(format("SELECT orderkey, orderdate FROM %s WHERE %s IN (%s, %s, %s, %s)", tableName, sortByColumn, lowValue, middleLowValue, middleHighValue, highValue));
}
assertUpdate("DROP TABLE " + tableName);
}
Expand Down

0 comments on commit 2e60184

Please sign in to comment.