Skip to content

Commit

Permalink
Add sort operator support in velox (oap-project#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf authored and zhejiangxiaomai committed Nov 18, 2022
1 parent 15da07d commit d5ac021
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 0 deletions.
54 changes: 54 additions & 0 deletions velox/substrait/SubstraitToVeloxPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,60 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
childNode);
}

core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::SortRel& sortRel) {
core::PlanNodePtr childNode;
if (sortRel.has_input()) {
childNode = toVeloxPlan(sortRel.input());
} else {
VELOX_FAIL("Child Rel is expected in SortRel.");
}

const auto& inputType = childNode->outputType();

std::vector<core::FieldAccessTypedExprPtr> sortingKeys;
std::vector<core::SortOrder> sortingOrders;

const auto& sorts = sortRel.sorts();
sortingKeys.reserve(sorts.size());
sortingOrders.reserve(sorts.size());

for (const auto& sort : sorts) {
switch (sort.direction()) {
case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_FIRST:
sortingOrders.emplace_back(core::kAscNullsFirst);
break;
case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_LAST:
sortingOrders.emplace_back(core::kAscNullsLast);
break;
case ::substrait::SortField_SortDirection_SORT_DIRECTION_DESC_NULLS_FIRST:
sortingOrders.emplace_back(core::kDescNullsFirst);
break;
case ::substrait::SortField_SortDirection_SORT_DIRECTION_DESC_NULLS_LAST:
sortingOrders.emplace_back(core::kDescNullsLast);
break;
default:
VELOX_FAIL("Sort direction is not support in SortRel");
}

if (sort.has_expr()) {
auto expression = exprConverter_->toVeloxExpr(sort.expr(), inputType);
auto expr_field =
dynamic_cast<const core::FieldAccessTypedExpr*>(expression.get());
VELOX_CHECK(
expr_field != nullptr,
" the sorting key in Sort Operator only support field")

sortingKeys.emplace_back(
std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(
expression));
}
}

return std::make_shared<core::OrderByNode>(
nextPlanNodeId(), sortingKeys, sortingOrders, false, childNode);
}

core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::FilterRel& filterRel) {
core::PlanNodePtr childNode;
Expand Down
3 changes: 3 additions & 0 deletions velox/substrait/SubstraitToVeloxPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class SubstraitVeloxPlanConverter {
explicit SubstraitVeloxPlanConverter(memory::MemoryPool* pool)
: pool_(pool) {}

/// Used to convert Substrait SortRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::SortRel& sSort);

/// Used to convert Substrait JoinRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::JoinRel& sJoin);

Expand Down
61 changes: 61 additions & 0 deletions velox/substrait/SubstraitToVeloxPlanValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,64 @@ bool SubstraitToVeloxPlanValidator::validateInputTypes(
return true;
}

bool SubstraitToVeloxPlanValidator::validate(
const ::substrait::SortRel& sSort) {
if (sSort.has_input() && !validate(sSort.input())) {
return false;
}
// Get and validate the input types from extension.
if (!sSort.has_advanced_extension()) {
std::cout << "Input types are expected in SortRel." << std::endl;
return false;
}
const auto& extension = sSort.advanced_extension();
std::vector<TypePtr> types;
if (!validateInputTypes(extension, types)) {
std::cout << "Validation failed for input types in SortRel." << std::endl;
return false;
}

int32_t inputPlanNodeId = 0;
std::vector<std::string> names;
names.reserve(types.size());
for (auto colIdx = 0; colIdx < types.size(); colIdx++) {
names.emplace_back(subParser_->makeNodeName(inputPlanNodeId, colIdx));
}
auto rowType = std::make_shared<RowType>(std::move(names), std::move(types));

const auto& sorts = sSort.sorts();
for (const auto& sort : sorts) {
switch (sort.direction()) {
case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_FIRST:
case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_LAST:
case ::substrait::SortField_SortDirection_SORT_DIRECTION_DESC_NULLS_FIRST:
case ::substrait::SortField_SortDirection_SORT_DIRECTION_DESC_NULLS_LAST:
break;
default:
return false;
}

if (sort.has_expr()) {
try {
auto expression = exprConverter_->toVeloxExpr(sort.expr(), rowType);
auto expr_field =
dynamic_cast<const core::FieldAccessTypedExpr*>(expression.get());
VELOX_CHECK(
expr_field != nullptr,
" the sorting key in Sort Operator only support field")

exec::ExprSet exprSet({std::move(expression)}, execCtx_);
} catch (const VeloxException& err) {
std::cout << "Validation failed for expression in SortRel due to:"
<< err.message() << std::endl;
return false;
}
}
}

return true;
}

bool SubstraitToVeloxPlanValidator::validate(
const ::substrait::ProjectRel& sProject) {
if (sProject.has_input() && !validate(sProject.input())) {
Expand Down Expand Up @@ -321,6 +379,9 @@ bool SubstraitToVeloxPlanValidator::validate(const ::substrait::Rel& sRel) {
if (sRel.has_read()) {
return validate(sRel.read());
}
if (sRel.has_sort()) {
return validate(sRel.sort());
}
return false;
}

Expand Down
3 changes: 3 additions & 0 deletions velox/substrait/SubstraitToVeloxPlanValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ class SubstraitToVeloxPlanValidator {
/// Used to validate whether the computing of this type is supported.
bool validate(const ::substrait::Type& sType);

/// Used to validate whether the computing of this Sort is supported.
bool validate(const ::substrait::SortRel& sSort);

/// Used to validate whether the computing of this Aggregation is supported.
bool validate(const ::substrait::AggregateRel& sAgg);

Expand Down

0 comments on commit d5ac021

Please sign in to comment.