Skip to content

Commit

Permalink
[native] Drop redundant sorting keys from Window and TopNRowNumber nodes
Browse files Browse the repository at this point in the history
Velox doesn't allow partitioning and sorting keys to overlap since
it is redundant to sort by partitioning keys.

Drop sorting keys present in partitioning keys during Presto-to-Velox
plan translation.
  • Loading branch information
mbasmanova committed Oct 27, 2023
1 parent a627d9b commit 4a93220
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2408,16 +2408,20 @@ std::pair<
std::vector<core::SortOrder>>
toSortFieldsAndOrders(
const protocol::OrderingScheme* orderingScheme,
VeloxExprConverter& exprConverter) {
VeloxExprConverter& exprConverter,
const std::unordered_set<std::string>& partitionKeys) {
std::vector<core::FieldAccessTypedExprPtr> sortFields;
std::vector<core::SortOrder> sortOrders;
if (orderingScheme != nullptr) {
auto nodeSpecOrdering = orderingScheme->orderBy;
sortFields.reserve(nodeSpecOrdering.size());
sortOrders.reserve(nodeSpecOrdering.size());
for (const auto& spec : nodeSpecOrdering) {
sortFields.emplace_back(exprConverter.toVeloxExpr(spec.variable));
sortOrders.emplace_back(toVeloxSortOrder(spec.sortOrder));
// Drop sorting keys that are present in partitioning keys.
if (partitionKeys.count(spec.variable.name) == 0) {
sortFields.emplace_back(exprConverter.toVeloxExpr(spec.variable));
sortOrders.emplace_back(toVeloxSortOrder(spec.sortOrder));
}
}
}
return {sortFields, sortOrders};
Expand All @@ -2431,12 +2435,16 @@ VeloxQueryPlanConverterBase::toVeloxQueryPlan(
const protocol::TaskId& taskId) {
std::vector<core::FieldAccessTypedExprPtr> partitionFields;
partitionFields.reserve(node->specification.partitionBy.size());
std::unordered_set<std::string> partitionFieldNames;
for (const auto& entry : node->specification.partitionBy) {
partitionFields.emplace_back(exprConverter_.toVeloxExpr(entry));
partitionFieldNames.emplace(partitionFields.back()->name());
}

auto [sortFields, sortOrders] = toSortFieldsAndOrders(
node->specification.orderingScheme.get(), exprConverter_);
node->specification.orderingScheme.get(),
exprConverter_,
partitionFieldNames);

std::vector<std::string> windowNames;
std::vector<core::WindowNode::Function> windowFunctions;
Expand Down Expand Up @@ -2514,12 +2522,16 @@ VeloxQueryPlanConverterBase::toVeloxQueryPlan(
const protocol::TaskId& taskId) {
std::vector<core::FieldAccessTypedExprPtr> partitionFields;
partitionFields.reserve(node->specification.partitionBy.size());
std::unordered_set<std::string> partitionFieldNames;
for (const auto& entry : node->specification.partitionBy) {
partitionFields.emplace_back(exprConverter_.toVeloxExpr(entry));
partitionFieldNames.emplace(partitionFields.back()->name());
}

auto [sortFields, sortOrders] = toSortFieldsAndOrders(
node->specification.orderingScheme.get(), exprConverter_);
node->specification.orderingScheme.get(),
exprConverter_,
partitionFieldNames);

std::optional<std::string> rowNumberColumnName;
if (!node->partial) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,4 +189,12 @@ public void testLag()
testWindowFunction("lag(orderkey, 5)", FunctionType.VALUE);
testWindowFunction("lag(totalprice, 2, -123.456)", FunctionType.VALUE);
}

@Test
public void testOverlappingPartitionAndSortingKeys()
{
assertQuery("SELECT row_number() OVER (PARTITION BY orderdate ORDER BY orderdate) FROM orders");
assertQuery("SELECT min(orderkey) OVER (PARTITION BY orderdate ORDER BY orderdate, totalprice) FROM orders");
assertQuery("SELECT * FROM (SELECT row_number() over(partition by orderstatus order by orderkey, orderstatus) rn, * from orders) WHERE rn = 1");
}
}

0 comments on commit 4a93220

Please sign in to comment.