Skip to content

Commit

Permalink
Allow parallelism for final aggregation (#691)
Browse files Browse the repository at this point in the history
Summary:
X-link: facebookexternal/presto_cpp#691

Pull Request resolved: #1330

Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction. This allows queries like
 TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10.

To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.

[1] Consider this global aggregation query.

```
presto> explain (type distributed)
           SELECT avg(discount), avg(quantity) FROM lineitem;
                                                                                     Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [avg, avg_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[_col0, _col1] => [avg:double, avg_2:double]
             _col0 := avg (1:35)
             _col1 := avg_2 (1:50)
         - Aggregate(FINAL) => [avg:double, avg_2:double]
                 avg := "presto.default.avg"((avg_10)) (1:35)
                 avg_2 := "presto.default.avg"((avg_11)) (1:50)
             - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
                 - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]

 Fragment 1 [tpch:orders:15000]
     Output layout: [avg_10, avg_11]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
             avg_10 := "presto.default.avg"((discount)) (1:35)
             avg_11 := "presto.default.avg"((quantity)) (1:50)
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double]
                 Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00}
                 quantity := tpch:quantity (1:69)
                 discount := tpch:discount (1:69)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL)  and there will be a gather local exchange in between.

[2] Consider this LIMIT query:

```
presto> explain (type distributed)
        SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17;
                                                                             Query Plan
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey] => [orderkey:bigint]
         - DistinctLimit[17] => [orderkey:bigint]
             - LocalExchange[SINGLE] () => [orderkey:bigint]
                 - RemoteSource[1] => [orderkey:bigint]

 Fragment 1 [tpch:orders:15000]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - DistinctLimitPartial[17] => [orderkey:bigint]
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint]
                 Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00}
                 orderkey := tpch:orderkey (1:49)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between.

Reviewed By: Yuhta

Differential Revision: D35304854

fbshipit-source-id: a37f8c09f6bb481edcfa29c033c4e4b092eeb173
  • Loading branch information
mbasmanova authored and facebook-github-bot committed Apr 6, 2022
1 parent 5d0451f commit 1597f12
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 10 deletions.
2 changes: 1 addition & 1 deletion velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ class LocalPartitionNode : public PlanNode {
"Local repartitioning node requires at least one source");
}

static std::shared_ptr<LocalPartitionNode> single(
static std::shared_ptr<LocalPartitionNode> gather(
const PlanNodeId& id,
RowTypePtr outputType,
std::vector<std::shared_ptr<const PlanNode>> sources) {
Expand Down
10 changes: 1 addition & 9 deletions velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,7 @@ uint32_t maxDrivers(const DriverFactory& driverFactory) {
return count;
}
for (auto& node : driverFactory.planNodes) {
if (auto aggregation =
std::dynamic_pointer_cast<const core::AggregationNode>(node)) {
if (aggregation->step() == core::AggregationNode::Step::kFinal ||
aggregation->step() == core::AggregationNode::Step::kSingle) {
// final aggregations must run single-threaded
return 1;
}
} else if (
auto topN = std::dynamic_pointer_cast<const core::TopNNode>(node)) {
if (auto topN = std::dynamic_pointer_cast<const core::TopNNode>(node)) {
if (!topN->isPartial()) {
// final topN must run single-threaded
return 1;
Expand Down

0 comments on commit 1597f12

Please sign in to comment.