From d53a813d9dabdb855573c00b5eff20bc24bcc292 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 28 Jun 2022 16:30:45 +0800 Subject: [PATCH] [OPPRO-170] Filter validation for Parquet reader at runtime (#27) * Filter validation for Parquet reader at runtime * Style * Style * Format --- velox/substrait/SubstraitToVeloxPlan.cpp | 65 ++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 5 deletions(-) diff --git a/velox/substrait/SubstraitToVeloxPlan.cpp b/velox/substrait/SubstraitToVeloxPlan.cpp index b57ee4961e3c..2f50603b988a 100644 --- a/velox/substrait/SubstraitToVeloxPlan.cpp +++ b/velox/substrait/SubstraitToVeloxPlan.cpp @@ -495,6 +495,52 @@ SubstraitVeloxPlanConverter::toVeloxAggWithRowConstruct( childNode); } + bool isPushDownSupportedByFormat( + const dwio::common::FileFormat& format, + connector::hive::SubfieldFilters& subfieldFilters) { + switch (format) { + case dwio::common::FileFormat::PARQUET: { + for (const auto& filter : subfieldFilters) { + switch (filter.second->kind()) { + // see ParquetReader.cpp:175 + + // supported + case common::FilterKind::kBigintRange: + case common::FilterKind::kDoubleRange: + case common::FilterKind::kBytesValues: + case common::FilterKind::kBytesRange: + case common::FilterKind::kBigintValuesUsingBitmask: + case common::FilterKind::kBigintValuesUsingHashTable: + break; + + // not supported + case common::FilterKind::kAlwaysFalse: + case common::FilterKind::kAlwaysTrue: + case common::FilterKind::kIsNull: + case common::FilterKind::kIsNotNull: + case common::FilterKind::kBoolValue: + case common::FilterKind::kFloatRange: + case common::FilterKind::kBigintMultiRange: + case common::FilterKind::kMultiRange: + default: + return false; + } + } + break; + } + case dwio::common::FileFormat::ORC: + case dwio::common::FileFormat::RC: + case dwio::common::FileFormat::RC_TEXT: + case dwio::common::FileFormat::RC_BINARY: + case dwio::common::FileFormat::TEXT: + case dwio::common::FileFormat::JSON: + case dwio::common::FileFormat::ALPHA: + case dwio::common::FileFormat::UNKNOWN: + default: + break; + } + return true; + } std::shared_ptr SubstraitVeloxPlanConverter::toVeloxPlan(const ::substrait::ReadRel& sRead) { // Check if the ReadRel specifies an input of stream. If yes, the pre-built @@ -569,8 +615,8 @@ SubstraitVeloxPlanConverter::toVeloxAggWithRowConstruct( std::vector<::substrait::Expression_ScalarFunction> scalarFunctions; flattenConditions(sRead.filter(), scalarFunctions); - // Separate the filters to be two parts. The first part can be pushed - // down. + // Separate the filters to be two parts. The first part can be + // pushed down. std::vector<::substrait::Expression_ScalarFunction> subfieldFunctions; std::vector<::substrait::Expression_ScalarFunction> remainingFunctions; separateFilters(scalarFunctions, subfieldFunctions, remainingFunctions); @@ -580,8 +626,18 @@ SubstraitVeloxPlanConverter::toVeloxAggWithRowConstruct( toSubfieldFilters(colNameList, veloxTypeList, subfieldFunctions); // Connect the remaining filters with 'and'. - std::shared_ptr remainingFilter = - connectWithAnd(colNameList, veloxTypeList, remainingFunctions); + std::shared_ptr remainingFilter; + + if (!isPushDownSupportedByFormat(splitInfo->format, subfieldFilters)) { + // A subfieldFilter is not supported by the format, + // mark all filter as remaining filters. + subfieldFilters.clear(); + remainingFilter = + connectWithAnd(colNameList, veloxTypeList, scalarFunctions); + } else { + remainingFilter = + connectWithAnd(colNameList, veloxTypeList, remainingFunctions); + } tableHandle = std::make_shared( "hive_table", @@ -669,7 +725,6 @@ SubstraitVeloxPlanConverter::toVeloxAggWithRowConstruct( vectors.emplace_back(std::make_shared( pool, type, nullptr, batchSize, children)); } - return std::make_shared(nextPlanNodeId(), vectors); }