From 1597f129a3ba8845f4928bffcb939711dec99a4b Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Tue, 5 Apr 2022 18:27:58 -0700 Subject: [PATCH] Allow parallelism for final aggregation (#691) Summary: X-link: https://github.com/facebookexternal/presto_cpp/pull/691 Pull Request resolved: https://github.com/facebookincubator/velox/pull/1330 Final and single aggregations can run multi-threaded as long as data is partitioned on the grouping keys, but we used to artificially restrict these to run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10. To make this work we must not ignore any local exchange node in the query plan. Prestissimo used to silently drop gather and round-robin repartitioning local exchanges. We no longer do that. [1] Consider this global aggregation query. ``` presto> explain (type distributed) SELECT avg(discount), avg(quantity) FROM lineitem; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [avg, avg_2] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[_col0, _col1] => [avg:double, avg_2:double] _col0 := avg (1:35) _col1 := avg_2 (1:50) - Aggregate(FINAL) => [avg:double, avg_2:double] avg := "presto.default.avg"((avg_10)) (1:35) avg_2 := "presto.default.avg"((avg_11)) (1:50) - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] Fragment 1 [tpch:orders:15000] Output layout: [avg_10, avg_11] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] avg_10 := "presto.default.avg"((discount)) (1:35) avg_11 := "presto.default.avg"((quantity)) (1:50) - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double] Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00} quantity := tpch:quantity (1:69) discount := tpch:discount (1:69) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL) and there will be a gather local exchange in between. [2] Consider this LIMIT query: ``` presto> explain (type distributed) SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17; Query Plan -------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[orderkey] => [orderkey:bigint] - DistinctLimit[17] => [orderkey:bigint] - LocalExchange[SINGLE] () => [orderkey:bigint] - RemoteSource[1] => [orderkey:bigint] Fragment 1 [tpch:orders:15000] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - DistinctLimitPartial[17] => [orderkey:bigint] - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint] Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00} orderkey := tpch:orderkey (1:49) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between. Reviewed By: Yuhta Differential Revision: D35304854 fbshipit-source-id: a37f8c09f6bb481edcfa29c033c4e4b092eeb173 --- velox/core/PlanNode.h | 2 +- velox/exec/LocalPlanner.cpp | 10 +--------- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index bd0d3c76818a..bac4419cc861 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -683,7 +683,7 @@ class LocalPartitionNode : public PlanNode { "Local repartitioning node requires at least one source"); } - static std::shared_ptr single( + static std::shared_ptr gather( const PlanNodeId& id, RowTypePtr outputType, std::vector> sources) { diff --git a/velox/exec/LocalPlanner.cpp b/velox/exec/LocalPlanner.cpp index f3d27f02f519..cd5c5c34ee54 100644 --- a/velox/exec/LocalPlanner.cpp +++ b/velox/exec/LocalPlanner.cpp @@ -168,15 +168,7 @@ uint32_t maxDrivers(const DriverFactory& driverFactory) { return count; } for (auto& node : driverFactory.planNodes) { - if (auto aggregation = - std::dynamic_pointer_cast(node)) { - if (aggregation->step() == core::AggregationNode::Step::kFinal || - aggregation->step() == core::AggregationNode::Step::kSingle) { - // final aggregations must run single-threaded - return 1; - } - } else if ( - auto topN = std::dynamic_pointer_cast(node)) { + if (auto topN = std::dynamic_pointer_cast(node)) { if (!topN->isPartial()) { // final topN must run single-threaded return 1;