Skip to content

Commit

Permalink
[OPPRO-163] Use a flag to distinguish scan and iterator inputs (oap-p…
Browse files Browse the repository at this point in the history
  • Loading branch information
rui-mo authored and zhejiangxiaomai committed Sep 22, 2022
1 parent 4ed7e28 commit 0c771a9
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 38 deletions.
113 changes: 80 additions & 33 deletions velox/substrait/SubstraitToVeloxPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,47 +359,66 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
}

std::shared_ptr<const core::PlanNode> SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::ReadRel& sRead,
u_int32_t& index,
std::vector<std::string>& paths,
std::vector<u_int64_t>& starts,
std::vector<u_int64_t>& lengths) {
const ::substrait::ReadRel& sRead) {
// Check if the ReadRel specifies an input of stream. If yes, the pre-built
// input node will be used as the data source.
auto splitInfo = std::make_shared<SplitInfo>();
auto streamIdx = streamIsInput(sRead);
if (streamIdx >= 0) {
if (inputNodesMap_.find(streamIdx) == inputNodesMap_.end()) {
VELOX_FAIL(
"Could not find source index {} in input nodes map.", streamIdx);
}
return inputNodesMap_[streamIdx];
auto streamNode = inputNodesMap_[streamIdx];
splitInfo->isStream = true;
splitInfoMap_[streamNode->id()] = splitInfo;
return streamNode;
}

// Otherwise, will create TableScan node for ReadRel.
// Get output names and types.
std::vector<std::string> colNameList;
std::vector<TypePtr> veloxTypeList;
if (sRead.has_base_schema()) {
const auto& baseSchema = sRead.base_schema();
colNameList.reserve(baseSchema.names().size());
for (const auto& name : baseSchema.names()) {
colNameList.emplace_back(name);
}
auto substraitTypeList = subParser_->parseNamedStruct(baseSchema);
veloxTypeList.reserve(substraitTypeList.size());
for (const auto& substraitType : substraitTypeList) {
veloxTypeList.emplace_back(toVeloxType(substraitType->type));
}
}

// Parse local files
if (readRel.has_local_files()) {
const auto& fileList = readRel.local_files().items();
// Parse local files and construct split info.
if (sRead.has_local_files()) {
using SubstraitFileFormatCase =
::substrait::ReadRel_LocalFiles_FileOrFiles::FileFormatCase;
const auto& fileList = sRead.local_files().items();
splitInfo->paths.reserve(fileList.size());
splitInfo->starts.reserve(fileList.size());
splitInfo->lengths.reserve(fileList.size());
for (const auto& file : fileList) {
// Expect all files to share the same index.
// Expect all Partitions share the same index.
splitInfo->partitionIndex = file.partition_index();
splitInfo->paths.emplace_back(file.uri_file());
splitInfo->starts.emplace_back(file.start());
splitInfo->lengths.emplace_back(file.length());
switch (file.format()) {
case 0:
switch (file.file_format_case()) {
case SubstraitFileFormatCase::kOrc:
case SubstraitFileFormatCase::kDwrf:
splitInfo->format = dwio::common::FileFormat::DWRF;
break;
case 1:
case SubstraitFileFormatCase::kParquet:
splitInfo->format = dwio::common::FileFormat::PARQUET;
break;
default:
splitInfo->format = dwio::common::FileFormat::UNKNOWN;
}
}
}

// Do not hard-code connector ID and allow for connectors other than Hive.
static const std::string kHiveConnectorId = "test-hive";

Expand All @@ -408,30 +427,58 @@ std::shared_ptr<const core::PlanNode> SubstraitVeloxPlanConverter::toVeloxPlan(
std::shared_ptr<connector::hive::HiveTableHandle> tableHandle;
if (!sRead.has_filter()) {
tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
kHiveConnectorId,
"hive_table",
filterPushdownEnabled,
connector::hive::SubfieldFilters{},
nullptr);
} else {
// Flatten the conditions connected with 'and'.
std::vector<::substrait::Expression_ScalarFunction> scalarFunctions;
flattenConditions(sRead.filter(), scalarFunctions);
std::vector<::substrait::Expression_SingularOrList> singularOrLists;
flattenConditions(sRead.filter(), scalarFunctions, singularOrLists);

// Separate the filters to be two parts. The first part can be pushed
// down.
std::vector<::substrait::Expression_ScalarFunction> subfieldFunctions;
std::vector<::substrait::Expression_ScalarFunction> remainingFunctions;
separateFilters(scalarFunctions, subfieldFunctions, remainingFunctions);
std::unordered_map<uint32_t, std::shared_ptr<RangeRecorder>> rangeRecorders;
for (uint32_t idx = 0; idx < veloxTypeList.size(); idx++) {
rangeRecorders[idx] = std::make_shared<RangeRecorder>();
}

// Create the filters to be pushed down.
connector::hive::SubfieldFilters subfieldFilters =
toSubfieldFilters(colNameList, veloxTypeList, subfieldFunctions);
// Separate the filters to be two parts. The subfield part can be
// pushed down.
std::vector<::substrait::Expression_ScalarFunction> subfieldFunctions;
std::vector<::substrait::Expression_SingularOrList> subfieldrOrLists;

std::vector<::substrait::Expression_ScalarFunction> remainingFunctions;
std::vector<::substrait::Expression_SingularOrList> remainingrOrLists;

separateFilters(
rangeRecorders,
scalarFunctions,
subfieldFunctions,
remainingFunctions,
singularOrLists,
subfieldrOrLists,
remainingrOrLists);

// Create subfield filters based on the constructed filter info map.
connector::hive::SubfieldFilters subfieldFilters = toSubfieldFilters(
colNameList, veloxTypeList, subfieldFunctions, subfieldrOrLists);
// Connect the remaining filters with 'and'.
std::shared_ptr<const core::ITypedExpr> remainingFilter =
connectWithAnd(colNameList, veloxTypeList, remainingFunctions);
std::shared_ptr<const core::ITypedExpr> remainingFilter;

if (!isPushDownSupportedByFormat(splitInfo->format, subfieldFilters)) {
// A subfieldFilter is not supported by the format,
// mark all filter as remaining filters.
subfieldFilters.clear();
remainingFilter = connectWithAnd(
colNameList, veloxTypeList, scalarFunctions, singularOrLists);
} else {
remainingFilter = connectWithAnd(
colNameList, veloxTypeList, remainingFunctions, remainingrOrLists);
}

tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
kHiveConnectorId,
"hive_table",
filterPushdownEnabled,
std::move(subfieldFilters),
Expand All @@ -444,7 +491,7 @@ std::shared_ptr<const core::PlanNode> SubstraitVeloxPlanConverter::toVeloxPlan(
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
assignments;
for (int idx = 0; idx < colNameList.size(); idx++) {
auto outName = substraitParser_->makeNodeName(planNodeId_, idx);
auto outName = subParser_->makeNodeName(planNodeId_, idx);
assignments[outName] = std::make_shared<connector::hive::HiveColumnHandle>(
colNameList[idx],
connector::hive::HiveColumnHandle::ColumnType::kRegular,
Expand All @@ -453,19 +500,19 @@ std::shared_ptr<const core::PlanNode> SubstraitVeloxPlanConverter::toVeloxPlan(
}
auto outputType = ROW(std::move(outNames), std::move(veloxTypeList));

if (readRel.has_virtual_table()) {
return toVeloxPlan(readRel, pool, outputType);
if (sRead.has_virtual_table()) {
return toVeloxPlan(sRead, outputType);
} else {
auto tableScanNode = std::make_shared<core::TableScanNode>(
nextPlanNodeId(), outputType, tableHandle, assignments);
// Set split info map.
splitInfoMap_[tableScanNode->id()] = splitInfo;
return tableScanNode;
}
} // namespace facebook::velox::substrait
} // namespace facebook::velox::substrait
}

core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::ReadRel& readRel,
memory::MemoryPool* pool,
const RowTypePtr& type) {
::substrait::ReadRel_VirtualTable readVirtualTable = readRel.virtual_table();
int64_t numVectors = readVirtualTable.values_size();
Expand Down Expand Up @@ -508,11 +555,11 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
}
}
children.emplace_back(
setVectorFromVariants(outputChildType, batchChild, pool));
setVectorFromVariants(outputChildType, batchChild, pool_));
}

vectors.emplace_back(
std::make_shared<RowVector>(pool, type, nullptr, batchSize, children));
std::make_shared<RowVector>(pool_, type, nullptr, batchSize, children));
}
return std::make_shared<core::ValuesNode>(nextPlanNodeId(), vectors);
}
Expand Down
3 changes: 1 addition & 2 deletions velox/substrait/SubstraitToVeloxPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ class SubstraitVeloxPlanConverter {
/// Starts: the start positions in byte to read from the items.
/// Lengths: the lengths in byte to read from the items.
std::shared_ptr<const core::PlanNode> toVeloxPlan(
const ::substrait::ReadRel& sRead,
std::shared_ptr<SplitInfo>& splitInfo);
const ::substrait::ReadRel& sRead);

/// Used to convert Substrait Rel into Velox PlanNode.
std::shared_ptr<const core::PlanNode> toVeloxPlan(
Expand Down
4 changes: 1 addition & 3 deletions velox/substrait/SubstraitToVeloxPlanValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,7 @@ bool SubstraitToVeloxPlanValidator::validate(
bool SubstraitToVeloxPlanValidator::validate(
const ::substrait::ReadRel& sRead) {
try {
auto splitInfo = std::make_shared<SplitInfo>();

planConverter_->toVeloxPlan(sRead, splitInfo);
planConverter_->toVeloxPlan(sRead);
} catch (const VeloxException& err) {
std::cout << "ReadRel validation failed due to:" << err.message()
<< std::endl;
Expand Down

0 comments on commit 0c771a9

Please sign in to comment.