Skip to content

Commit

Permalink
[OPPRO-170] Filter validation for Parquet reader at runtime (facebook…
Browse files Browse the repository at this point in the history
…incubator#27)

* Filter validation for Parquet reader at runtime

* Style

* Style

* Format
  • Loading branch information
zhztheplayer authored and zhejiangxiaomai committed Jun 29, 2022
1 parent b2c30fb commit d53a813
Showing 1 changed file with 60 additions and 5 deletions.
65 changes: 60 additions & 5 deletions velox/substrait/SubstraitToVeloxPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,52 @@ SubstraitVeloxPlanConverter::toVeloxAggWithRowConstruct(
childNode);
}

bool isPushDownSupportedByFormat(
const dwio::common::FileFormat& format,
connector::hive::SubfieldFilters& subfieldFilters) {
switch (format) {
case dwio::common::FileFormat::PARQUET: {
for (const auto& filter : subfieldFilters) {
switch (filter.second->kind()) {
// see ParquetReader.cpp:175

// supported
case common::FilterKind::kBigintRange:
case common::FilterKind::kDoubleRange:
case common::FilterKind::kBytesValues:
case common::FilterKind::kBytesRange:
case common::FilterKind::kBigintValuesUsingBitmask:
case common::FilterKind::kBigintValuesUsingHashTable:
break;

// not supported
case common::FilterKind::kAlwaysFalse:
case common::FilterKind::kAlwaysTrue:
case common::FilterKind::kIsNull:
case common::FilterKind::kIsNotNull:
case common::FilterKind::kBoolValue:
case common::FilterKind::kFloatRange:
case common::FilterKind::kBigintMultiRange:
case common::FilterKind::kMultiRange:
default:
return false;
}
}
break;
}
case dwio::common::FileFormat::ORC:
case dwio::common::FileFormat::RC:
case dwio::common::FileFormat::RC_TEXT:
case dwio::common::FileFormat::RC_BINARY:
case dwio::common::FileFormat::TEXT:
case dwio::common::FileFormat::JSON:
case dwio::common::FileFormat::ALPHA:
case dwio::common::FileFormat::UNKNOWN:
default:
break;
}
return true;
}
std::shared_ptr<const core::PlanNode>
SubstraitVeloxPlanConverter::toVeloxPlan(const ::substrait::ReadRel& sRead) {
// Check if the ReadRel specifies an input of stream. If yes, the pre-built
Expand Down Expand Up @@ -569,8 +615,8 @@ SubstraitVeloxPlanConverter::toVeloxAggWithRowConstruct(
std::vector<::substrait::Expression_ScalarFunction> scalarFunctions;
flattenConditions(sRead.filter(), scalarFunctions);

// Separate the filters to be two parts. The first part can be pushed
// down.
// Separate the filters to be two parts. The first part can be
// pushed down.
std::vector<::substrait::Expression_ScalarFunction> subfieldFunctions;
std::vector<::substrait::Expression_ScalarFunction> remainingFunctions;
separateFilters(scalarFunctions, subfieldFunctions, remainingFunctions);
Expand All @@ -580,8 +626,18 @@ SubstraitVeloxPlanConverter::toVeloxAggWithRowConstruct(
toSubfieldFilters(colNameList, veloxTypeList, subfieldFunctions);

// Connect the remaining filters with 'and'.
std::shared_ptr<const core::ITypedExpr> remainingFilter =
connectWithAnd(colNameList, veloxTypeList, remainingFunctions);
std::shared_ptr<const core::ITypedExpr> remainingFilter;

if (!isPushDownSupportedByFormat(splitInfo->format, subfieldFilters)) {
// A subfieldFilter is not supported by the format,
// mark all filter as remaining filters.
subfieldFilters.clear();
remainingFilter =
connectWithAnd(colNameList, veloxTypeList, scalarFunctions);
} else {
remainingFilter =
connectWithAnd(colNameList, veloxTypeList, remainingFunctions);
}

tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
"hive_table",
Expand Down Expand Up @@ -669,7 +725,6 @@ SubstraitVeloxPlanConverter::toVeloxAggWithRowConstruct(
vectors.emplace_back(std::make_shared<RowVector>(
pool, type, nullptr, batchSize, children));
}

return std::make_shared<core::ValuesNode>(nextPlanNodeId(), vectors);
}

Expand Down

0 comments on commit d53a813

Please sign in to comment.