Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cases of filter-only column scans A filtered column may or may not #2807

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 43 additions & 7 deletions velox/dwio/common/tests/E2EFilterTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,22 @@ void E2EFilterTestBase::readWithFilter(
// Outside of timed section.
for (int32_t i = 0; i < batch->size(); ++i) {
uint32_t hit = hitRows[rowIndex++];
ASSERT_TRUE(batch->equalValueAt(
batches[batchNumber(hit)].get(), i, batchRow(hit)))
<< "Content mismatch at " << rowIndex - 1 << ": expected: "
<< batches[batchNumber(hit)]->toString(batchRow(hit))
<< " actual: " << batch->toString(i);
auto expectedBatch = batches[batchNumber(hit)].get();
auto expectedRow = batchRow(hit);
// We compare column by column, skipping over filter-only columns.
for (auto childIndex = 0; childIndex < spec->children().size();
++childIndex) {
if (!spec->children()[childIndex]->keepValues()) {
continue;
}
auto column = spec->children()[childIndex]->channel();
auto result = batch->asUnchecked<RowVector>()->childAt(column);
auto expectedColumn = expectedBatch->childAt(column).get();
ASSERT_TRUE(result->equalValueAt(expectedColumn, i, expectedRow))
<< "Content mismatch at " << rowIndex - 1 << " column " << column
<< ": expected: " << expectedColumn->toString(expectedRow)
<< " actual: " << result->toString(i);
}
}
// Check no overwrites after all LazyVectors are loaded.
ownershipChecker.check(batch);
Expand Down Expand Up @@ -313,6 +324,7 @@ void E2EFilterTestBase::testFilterSpecs(
auto filters =
filterGenerator->makeSubfieldFilters(filterSpecs, batches_, hitRows);
auto spec = filterGenerator->makeScanSpec(std::move(filters));
unprojectSomeFilters(*spec);
uint64_t timeWithFilter = 0;
readWithFilter(spec, batches_, hitRows, timeWithFilter, false);

Expand All @@ -331,6 +343,17 @@ void E2EFilterTestBase::testFilterSpecs(
readWithFilter(spec, batches_, hitRows, timeWithFilter, true);
}

void E2EFilterTestBase::unprojectSomeFilters(ScanSpec& spec) {
// Each filtered column has a 1 in 5 chance to be filter-only.
for (auto& child : spec.children()) {
if (child->filter() &&
folly::Random::rand32(filterGenerator->rng()) % 5 == 0) {
child->setProjectOut(false);
child->setExtractValues(false);
}
}
}

void E2EFilterTestBase::testRowGroupSkip(
const std::vector<std::string>& filterable) {
std::vector<FilterSpec> specs;
Expand Down Expand Up @@ -399,6 +422,19 @@ void OwnershipChecker::check(const VectorPtr& batch) {
if (batchCounter_ > 11) {
return;
}
// We fill filter-only columns with nulls to make the RowVector well formed
// for copy.
auto rowVector = batch->as<RowVector>();
// Columns corresponding to filter-only access will not be filled in and have
// a zero-length or null child vector. Fill these in with nulls for the size
// of the batch to make the batch well formed for copy.
for (auto i = 0; i < rowVector->childrenSize(); ++i) {
auto& child = rowVector->children()[i];
if (!child || child->size() != batch->size()) {
child = BaseVector::createNullConstant(
batch->type()->childAt(i), batch->size(), batch->pool());
}
}
if (batchCounter_ % 2 == 0) {
previousBatch_ = std::make_shared<RowVector>(
batch->pool(),
Expand All @@ -411,8 +447,8 @@ void OwnershipChecker::check(const VectorPtr& batch) {
if (batchCounter_ % 2 == 1) {
for (auto i = 0; i < previousBatch_->size(); ++i) {
ASSERT_TRUE(previousBatch_->equalValueAt(previousBatchCopy_.get(), i, i))
<< "Retained reference of a batch has been overwritten by the next "
<< "index " << i << " batch " << previousBatch_->toString(i)
<< "Retained reference of a batch has been overwritten by the next: "
<< " index " << i << " batch " << previousBatch_->toString(i)
<< " original " << previousBatchCopy_->toString(i);
}
}
Expand Down
4 changes: 4 additions & 0 deletions velox/dwio/common/tests/E2EFilterTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ class E2EFilterTestBase : public testing::Test {
bool useValueHook,
bool skipCheck = false);

// Sets one in five filtered columns to be filter-only, so no
// passing values are retained.
void unprojectSomeFilters(ScanSpec& spec);

template <TypeKind Kind>
bool checkLoadWithHook(
RowVector* batch,
Expand Down
2 changes: 0 additions & 2 deletions velox/dwio/dwrf/common/RLEv1.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,6 @@ uint64_t RleEncoderV1<isSigned>::addImpl(
return count;
}

struct DropValues;

template <bool isSigned>
class RleDecoderV1 : public dwio::common::IntDecoder<isSigned> {
public:
Expand Down
6 changes: 4 additions & 2 deletions velox/dwio/dwrf/reader/SelectiveByteRleColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,12 @@ void SelectiveByteRleColumnReader::processFilter(
break;
case FilterKind::kIsNull:
filterNulls<int8_t>(
rows, true, !std::is_same_v<decltype(extractValues), DropValues>);
rows,
true,
!std::is_same_v<decltype(extractValues), dwio::common::DropValues>);
break;
case FilterKind::kIsNotNull:
if (std::is_same_v<decltype(extractValues), DropValues>) {
if (std::is_same_v<decltype(extractValues), dwio::common::DropValues>) {
filterNulls<int8_t>(rows, false, false);
} else {
readHelper<common::IsNotNull, isDense>(filter, rows, extractValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,12 @@ void SelectiveStringDictionaryColumnReader::processFilter(
break;
case common::FilterKind::kIsNull:
filterNulls<int32_t>(
rows, true, !std::is_same_v<decltype(extractValues), DropValues>);
rows,
true,
!std::is_same_v<decltype(extractValues), dwio::common::DropValues>);
break;
case common::FilterKind::kIsNotNull:
if (std::is_same_v<decltype(extractValues), DropValues>) {
if (std::is_same_v<decltype(extractValues), dwio::common::DropValues>) {
filterNulls<int32_t>(rows, false, false);
} else {
readHelper<common::IsNotNull, isDense>(filter, rows, extractValues);
Expand Down
6 changes: 4 additions & 2 deletions velox/dwio/dwrf/reader/SelectiveStringDirectColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,10 +384,12 @@ void SelectiveStringDirectColumnReader::processFilter(
break;
case common::FilterKind::kIsNull:
filterNulls<StringView>(
rows, true, !std::is_same_v<decltype(extractValues), DropValues>);
rows,
true,
!std::is_same_v<decltype(extractValues), dwio::common::DropValues>);
break;
case common::FilterKind::kIsNotNull:
if (std::is_same_v<decltype(extractValues), DropValues>) {
if (std::is_same_v<decltype(extractValues), dwio::common::DropValues>) {
filterNulls<StringView>(rows, false, false);
} else {
readHelper<common::IsNotNull, isDense>(filter, rows, extractValues);
Expand Down
22 changes: 18 additions & 4 deletions velox/dwio/parquet/reader/PageReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,19 @@ class PageReader {
}
}

// Returns the number of passed rows/values gathered by
// 'reader'. Only numRows() is set for a filter-only case, only
// numValues() is set for a non-filtered case.
template <bool hasFilter>
static int32_t numRowsInReader(
const dwio::common::SelectiveColumnReader& reader) {
if (hasFilter) {
return reader.numRows();
} else {
return reader.numValues();
}
}

memory::MemoryPool& pool_;

std::unique_ptr<dwio::common::SeekableInputStream> inputStream_;
Expand Down Expand Up @@ -319,7 +332,7 @@ void PageReader::readWithVisitor(Visitor& visitor) {
bool isMultiPage = false;
while (rowsForPage(reader, hasFilter, pageRows, nulls)) {
bool nullsFromFastPath = false;
int32_t numValuesBeforePage = reader.numValues();
int32_t numValuesBeforePage = numRowsInReader<hasFilter>(reader);
visitor.setNumValuesBias(numValuesBeforePage);
visitor.setRows(pageRows);
callDecoder(nulls, nullsFromFastPath, visitor);
Expand All @@ -335,20 +348,21 @@ void PageReader::readWithVisitor(Visitor& visitor) {
}
if (!nulls) {
nullConcatenation_.appendOnes(
reader.numValues() - numValuesBeforePage);
numRowsInReader<hasFilter>(reader) - numValuesBeforePage);
} else if (reader.returnReaderNulls()) {
// Nulls from decoding go directly to result.
nullConcatenation_.append(
reader.nullsInReadRange()->template as<uint64_t>(),
0,
reader.numValues() - numValuesBeforePage);
numRowsInReader<hasFilter>(reader) - numValuesBeforePage);
} else {
// Add the nulls produced from the decoder to the result.
auto firstNullIndex = nullsFromFastPath ? 0 : numValuesBeforePage;
nullConcatenation_.append(
reader.mutableNulls(0),
firstNullIndex,
firstNullIndex + reader.numValues() - numValuesBeforePage);
firstNullIndex + numRowsInReader<hasFilter>(reader) -
numValuesBeforePage);
}
}
isMultiPage = true;
Expand Down