-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Allow parallelism for final aggregation #1330
Conversation
This pull request was exported from Phabricator. Differential Revision: D35304854 |
This pull request was exported from Phabricator. Differential Revision: D35304854 |
Summary: X-link: facebookexternal/presto_cpp#691 Pull Request resolved: facebookincubator#1330 Final and single aggregations can run multi-threaded as long as data is partitioned on the grouping keys, but we used to artificially restrict these to run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10. To make this work we must not ignore any local exchange node in the query plan. Prestissimo used to silently drop gather and round-robin repartitioning local exchanges. We no longer do that. It is now possible that final aggregation runs after local repartitioning which produces dictionary encoded vectors. It turns out that many aggregate functions are not prepared to handle non-flat intermediate results. To avoid breaking these we forcibly flatten intermediate results before feeding these to aggregate functions. [1] There is now a local gather exchange upstream of the final Limit operator. When Limit finishes early the producing pipeline before the exchange needs to be notified so it can finish early as well. [2] [1] Consider this global aggregation query. ``` presto> explain (type distributed) SELECT avg(discount), avg(quantity) FROM lineitem; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [avg, avg_2] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[_col0, _col1] => [avg:double, avg_2:double] _col0 := avg (1:35) _col1 := avg_2 (1:50) - Aggregate(FINAL) => [avg:double, avg_2:double] avg := "presto.default.avg"((avg_10)) (1:35) avg_2 := "presto.default.avg"((avg_11)) (1:50) - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] Fragment 1 [tpch:orders:15000] Output layout: [avg_10, avg_11] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] avg_10 := "presto.default.avg"((discount)) (1:35) avg_11 := "presto.default.avg"((quantity)) (1:50) - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double] Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00} quantity := tpch:quantity (1:69) discount := tpch:discount (1:69) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL) and there will be a gather local exchange in between. [2] Consider this LIMIT query: ``` presto> explain (type distributed) SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17; Query Plan -------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[orderkey] => [orderkey:bigint] - DistinctLimit[17] => [orderkey:bigint] - LocalExchange[SINGLE] () => [orderkey:bigint] - RemoteSource[1] => [orderkey:bigint] Fragment 1 [tpch:orders:15000] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - DistinctLimitPartial[17] => [orderkey:bigint] - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint] Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00} orderkey := tpch:orderkey (1:49) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between. Differential Revision: D35304854 fbshipit-source-id: cc51d0bd17e15cc98d56c50c66f70703cafa8949
deb8204
to
aef55e8
Compare
Summary: X-link: facebookexternal/presto_cpp#691 Pull Request resolved: facebookincubator#1330 Final and single aggregations can run multi-threaded as long as data is partitioned on the grouping keys, but we used to artificially restrict these to run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10. To make this work we must not ignore any local exchange node in the query plan. Prestissimo used to silently drop gather and round-robin repartitioning local exchanges. We no longer do that. It is now possible that final aggregation runs after local repartitioning which produces dictionary encoded vectors. It turns out that many aggregate functions are not prepared to handle non-flat intermediate results. To avoid breaking these we forcibly flatten intermediate results before feeding these to aggregate functions. [1] There is now a local gather exchange upstream of the final Limit operator. When Limit finishes early the producing pipeline before the exchange needs to be notified so it can finish early as well. [2] [1] Consider this global aggregation query. ``` presto> explain (type distributed) SELECT avg(discount), avg(quantity) FROM lineitem; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [avg, avg_2] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[_col0, _col1] => [avg:double, avg_2:double] _col0 := avg (1:35) _col1 := avg_2 (1:50) - Aggregate(FINAL) => [avg:double, avg_2:double] avg := "presto.default.avg"((avg_10)) (1:35) avg_2 := "presto.default.avg"((avg_11)) (1:50) - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] Fragment 1 [tpch:orders:15000] Output layout: [avg_10, avg_11] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] avg_10 := "presto.default.avg"((discount)) (1:35) avg_11 := "presto.default.avg"((quantity)) (1:50) - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double] Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00} quantity := tpch:quantity (1:69) discount := tpch:discount (1:69) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL) and there will be a gather local exchange in between. [2] Consider this LIMIT query: ``` presto> explain (type distributed) SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17; Query Plan -------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[orderkey] => [orderkey:bigint] - DistinctLimit[17] => [orderkey:bigint] - LocalExchange[SINGLE] () => [orderkey:bigint] - RemoteSource[1] => [orderkey:bigint] Fragment 1 [tpch:orders:15000] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - DistinctLimitPartial[17] => [orderkey:bigint] - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint] Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00} orderkey := tpch:orderkey (1:49) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between. Differential Revision: D35304854 fbshipit-source-id: a067764d7ec5bbd86fd14d0209e7d31115f1df06
aef55e8
to
5876ab1
Compare
This pull request was exported from Phabricator. Differential Revision: D35304854 |
velox/core/PlanNode.h
Outdated
@@ -659,12 +659,16 @@ using PartitionFunctionFactory = | |||
/// different from input. | |||
class LocalPartitionNode : public PlanNode { | |||
public: | |||
enum class Type { kGather, kRepartition, kReplicate }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please can you add a comment what each of these mean ?
Also, the naming of "Type" was a bit misleading... maybe "strategy" is more clear ?
velox/exec/LocalPartition.h
Outdated
@@ -109,6 +103,7 @@ class LocalExchangeSource { | |||
std::vector<VeloxPromise<bool>> producerPromises_; | |||
int pendingProducers_{0}; | |||
bool noMoreProducers_{false}; | |||
bool isFinished_{false}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General question : I understand what this variable is doing. But why was it not needed before ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a bug where the task with a local exchange would get stuck if a limit above the exchange would finish early. See #1332
velox/exec/Task.cpp
Outdated
@@ -948,7 +952,7 @@ void Task::terminate(TaskState terminalState) { | |||
for (auto& pair : splitGroupState.second.bridges) { | |||
oldBridges.emplace_back(std::move(pair.second)); | |||
} | |||
splitGroupState.second.bridges.clear(); | |||
splitGroupState.second.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this change about ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is part of #1332
Strictly speaking it is not necessary, but it is better to clear the whole state, rather than just the join bridges.
velox/exec/Task.cpp
Outdated
@@ -404,6 +404,10 @@ void Task::removeDriver(std::shared_ptr<Task> self, Driver* driver) { | |||
splitGroupState.clear(); | |||
self->ensureSplitGroupsAreBeingProcessedLocked(self); | |||
} | |||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't understand this change either. Please can you explain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We used to clear the state only in grouped execution, but ungrouped execution is no different in that regard.
velox/core/PlanNode.h
Outdated
@@ -659,12 +659,16 @@ using PartitionFunctionFactory = | |||
/// different from input. | |||
class LocalPartitionNode : public PlanNode { | |||
public: | |||
enum class Type { kGather, kRepartition, kReplicate }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you planning to add support for kReplicate
in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't plan to support kReplicate just yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove this or throw NYI if kReplicate
is specified?
@@ -674,19 +678,24 @@ class LocalPartitionNode : public PlanNode { | |||
"Local repartitioning node requires at least one source"); | |||
} | |||
|
|||
static std::shared_ptr<LocalPartitionNode> single( | |||
static std::shared_ptr<LocalPartitionNode> gather( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this used anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nm, I see that this is called from presto_cpp.
This pull request was exported from Phabricator. Differential Revision: D35304854 |
Summary: X-link: facebookexternal/presto_cpp#691 Pull Request resolved: facebookincubator#1330 Final and single aggregations can run multi-threaded as long as data is partitioned on the grouping keys, but we used to artificially restrict these to run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10. To make this work we must not ignore any local exchange node in the query plan. Prestissimo used to silently drop gather and round-robin repartitioning local exchanges. We no longer do that. It is now possible that final aggregation runs after local repartitioning which produces dictionary encoded vectors. It turns out that many aggregate functions are not prepared to handle non-flat intermediate results. To avoid breaking these we forcibly flatten intermediate results before feeding these to aggregate functions. [1] There is now a local gather exchange upstream of the final Limit operator. When Limit finishes early the producing pipeline before the exchange needs to be notified so it can finish early as well. [2] [1] Consider this global aggregation query. ``` presto> explain (type distributed) SELECT avg(discount), avg(quantity) FROM lineitem; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [avg, avg_2] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[_col0, _col1] => [avg:double, avg_2:double] _col0 := avg (1:35) _col1 := avg_2 (1:50) - Aggregate(FINAL) => [avg:double, avg_2:double] avg := "presto.default.avg"((avg_10)) (1:35) avg_2 := "presto.default.avg"((avg_11)) (1:50) - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] Fragment 1 [tpch:orders:15000] Output layout: [avg_10, avg_11] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] avg_10 := "presto.default.avg"((discount)) (1:35) avg_11 := "presto.default.avg"((quantity)) (1:50) - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double] Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00} quantity := tpch:quantity (1:69) discount := tpch:discount (1:69) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL) and there will be a gather local exchange in between. [2] Consider this LIMIT query: ``` presto> explain (type distributed) SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17; Query Plan -------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[orderkey] => [orderkey:bigint] - DistinctLimit[17] => [orderkey:bigint] - LocalExchange[SINGLE] () => [orderkey:bigint] - RemoteSource[1] => [orderkey:bigint] Fragment 1 [tpch:orders:15000] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - DistinctLimitPartial[17] => [orderkey:bigint] - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint] Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00} orderkey := tpch:orderkey (1:49) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between. Differential Revision: D35304854 fbshipit-source-id: 0873d02ca590c815ac6add50b27440bed6fb6087
5876ab1
to
8031592
Compare
Summary: X-link: https://github.com/facebookexternal/presto_cpp/pull/691 Pull Request resolved: facebookincubator#1330 Final and single aggregations can run multi-threaded as long as data is partitioned on the grouping keys, but we used to artificially restrict these to run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10. To make this work we must not ignore any local exchange node in the query plan. Prestissimo used to silently drop gather and round-robin repartitioning local exchanges. We no longer do that. It is now possible that final aggregation runs after local repartitioning which produces dictionary encoded vectors. It turns out that many aggregate functions are not prepared to handle non-flat intermediate results. To avoid breaking these we forcibly flatten intermediate results before feeding these to aggregate functions. [1] There is now a local gather exchange upstream of the final Limit operator. When Limit finishes early the producing pipeline before the exchange needs to be notified so it can finish early as well. [2] [1] Consider this global aggregation query. ``` presto> explain (type distributed) SELECT avg(discount), avg(quantity) FROM lineitem; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [avg, avg_2] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[_col0, _col1] => [avg:double, avg_2:double] _col0 := avg (1:35) _col1 := avg_2 (1:50) - Aggregate(FINAL) => [avg:double, avg_2:double] avg := "presto.default.avg"((avg_10)) (1:35) avg_2 := "presto.default.avg"((avg_11)) (1:50) - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] Fragment 1 [tpch:orders:15000] Output layout: [avg_10, avg_11] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] avg_10 := "presto.default.avg"((discount)) (1:35) avg_11 := "presto.default.avg"((quantity)) (1:50) - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double] Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00} quantity := tpch:quantity (1:69) discount := tpch:discount (1:69) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL) and there will be a gather local exchange in between. [2] Consider this LIMIT query: ``` presto> explain (type distributed) SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17; Query Plan -------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[orderkey] => [orderkey:bigint] - DistinctLimit[17] => [orderkey:bigint] - LocalExchange[SINGLE] () => [orderkey:bigint] - RemoteSource[1] => [orderkey:bigint] Fragment 1 [tpch:orders:15000] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - DistinctLimitPartial[17] => [orderkey:bigint] - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint] Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00} orderkey := tpch:orderkey (1:49) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between. Differential Revision: D35304854 fbshipit-source-id: 0873d02ca590c815ac6add50b27440bed6fb6087
Summary: X-link: https://github.com/facebookexternal/presto_cpp/pull/691 Pull Request resolved: facebookincubator#1330 Final and single aggregations can run multi-threaded as long as data is partitioned on the grouping keys, but we used to artificially restrict these to run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10. To make this work we must not ignore any local exchange node in the query plan. Prestissimo used to silently drop gather and round-robin repartitioning local exchanges. We no longer do that. It is now possible that final aggregation runs after local repartitioning which produces dictionary encoded vectors. It turns out that many aggregate functions are not prepared to handle non-flat intermediate results. To avoid breaking these we forcibly flatten intermediate results before feeding these to aggregate functions. [1] There is now a local gather exchange upstream of the final Limit operator. When Limit finishes early the producing pipeline before the exchange needs to be notified so it can finish early as well. [2] [1] Consider this global aggregation query. ``` presto> explain (type distributed) SELECT avg(discount), avg(quantity) FROM lineitem; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [avg, avg_2] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[_col0, _col1] => [avg:double, avg_2:double] _col0 := avg (1:35) _col1 := avg_2 (1:50) - Aggregate(FINAL) => [avg:double, avg_2:double] avg := "presto.default.avg"((avg_10)) (1:35) avg_2 := "presto.default.avg"((avg_11)) (1:50) - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] Fragment 1 [tpch:orders:15000] Output layout: [avg_10, avg_11] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] avg_10 := "presto.default.avg"((discount)) (1:35) avg_11 := "presto.default.avg"((quantity)) (1:50) - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double] Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00} quantity := tpch:quantity (1:69) discount := tpch:discount (1:69) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL) and there will be a gather local exchange in between. [2] Consider this LIMIT query: ``` presto> explain (type distributed) SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17; Query Plan -------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[orderkey] => [orderkey:bigint] - DistinctLimit[17] => [orderkey:bigint] - LocalExchange[SINGLE] () => [orderkey:bigint] - RemoteSource[1] => [orderkey:bigint] Fragment 1 [tpch:orders:15000] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - DistinctLimitPartial[17] => [orderkey:bigint] - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint] Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00} orderkey := tpch:orderkey (1:49) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between. Differential Revision: D35304854 fbshipit-source-id: 0873d02ca590c815ac6add50b27440bed6fb6087
Summary: X-link: facebookexternal/presto_cpp#691 Pull Request resolved: facebookincubator#1330 Final and single aggregations can run multi-threaded as long as data is partitioned on the grouping keys, but we used to artificially restrict these to run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10. To make this work we must not ignore any local exchange node in the query plan. Prestissimo used to silently drop gather and round-robin repartitioning local exchanges. We no longer do that. [1] Consider this global aggregation query. ``` presto> explain (type distributed) SELECT avg(discount), avg(quantity) FROM lineitem; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [avg, avg_2] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[_col0, _col1] => [avg:double, avg_2:double] _col0 := avg (1:35) _col1 := avg_2 (1:50) - Aggregate(FINAL) => [avg:double, avg_2:double] avg := "presto.default.avg"((avg_10)) (1:35) avg_2 := "presto.default.avg"((avg_11)) (1:50) - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] Fragment 1 [tpch:orders:15000] Output layout: [avg_10, avg_11] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] avg_10 := "presto.default.avg"((discount)) (1:35) avg_11 := "presto.default.avg"((quantity)) (1:50) - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double] Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00} quantity := tpch:quantity (1:69) discount := tpch:discount (1:69) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL) and there will be a gather local exchange in between. [2] Consider this LIMIT query: ``` presto> explain (type distributed) SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17; Query Plan -------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[orderkey] => [orderkey:bigint] - DistinctLimit[17] => [orderkey:bigint] - LocalExchange[SINGLE] () => [orderkey:bigint] - RemoteSource[1] => [orderkey:bigint] Fragment 1 [tpch:orders:15000] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - DistinctLimitPartial[17] => [orderkey:bigint] - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint] Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00} orderkey := tpch:orderkey (1:49) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between. Reviewed By: Yuhta Differential Revision: D35304854 fbshipit-source-id: c45e99bb744f230683fe8972ca3b63f2628e6b3f
8031592
to
f57507e
Compare
This pull request was exported from Phabricator. Differential Revision: D35304854 |
Summary: X-link: https://github.com/facebookexternal/presto_cpp/pull/691 Pull Request resolved: facebookincubator#1330 Final and single aggregations can run multi-threaded as long as data is partitioned on the grouping keys, but we used to artificially restrict these to run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10. To make this work we must not ignore any local exchange node in the query plan. Prestissimo used to silently drop gather and round-robin repartitioning local exchanges. We no longer do that. [1] Consider this global aggregation query. ``` presto> explain (type distributed) SELECT avg(discount), avg(quantity) FROM lineitem; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [avg, avg_2] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[_col0, _col1] => [avg:double, avg_2:double] _col0 := avg (1:35) _col1 := avg_2 (1:50) - Aggregate(FINAL) => [avg:double, avg_2:double] avg := "presto.default.avg"((avg_10)) (1:35) avg_2 := "presto.default.avg"((avg_11)) (1:50) - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] Fragment 1 [tpch:orders:15000] Output layout: [avg_10, avg_11] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] avg_10 := "presto.default.avg"((discount)) (1:35) avg_11 := "presto.default.avg"((quantity)) (1:50) - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double] Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00} quantity := tpch:quantity (1:69) discount := tpch:discount (1:69) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL) and there will be a gather local exchange in between. [2] Consider this LIMIT query: ``` presto> explain (type distributed) SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17; Query Plan -------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[orderkey] => [orderkey:bigint] - DistinctLimit[17] => [orderkey:bigint] - LocalExchange[SINGLE] () => [orderkey:bigint] - RemoteSource[1] => [orderkey:bigint] Fragment 1 [tpch:orders:15000] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - DistinctLimitPartial[17] => [orderkey:bigint] - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint] Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00} orderkey := tpch:orderkey (1:49) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between. Reviewed By: Yuhta Differential Revision: D35304854 fbshipit-source-id: a37f8c09f6bb481edcfa29c033c4e4b092eeb173
…tor#1330) Support the partial merge phase brings by count distinct.
Summary:
Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction.
To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.
It is now possible that final aggregation runs after local repartitioning which
produces dictionary encoded vectors. It turns out that many aggregate functions
are not prepared to handle non-flat intermediate results. To avoid breaking
these we forcibly flatten intermediate results before feeding these to
aggregate functions.
There is now a local gather exchange upstream of the final Limit operator. When
Limit finishes early the producing pipeline before the exchange needs to be
notified so it can finish early as well.
Differential Revision: D35304854