Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Allow parallelism for final aggregation #1330

Closed

Conversation

mbasmanova
Copy link
Contributor

Summary:
Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction.

To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.

It is now possible that final aggregation runs after local repartitioning which
produces dictionary encoded vectors. It turns out that many aggregate functions
are not prepared to handle non-flat intermediate results. To avoid breaking
these we forcibly flatten intermediate results before feeding these to
aggregate functions.

There is now a local gather exchange upstream of the final Limit operator. When
Limit finishes early the producing pipeline before the exchange needs to be
notified so it can finish early as well.

Differential Revision: D35304854

@facebook-github-bot facebook-github-bot added CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. fb-exported labels Apr 1, 2022
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D35304854

@mbasmanova mbasmanova changed the title Allow parallelism for final aggregation [WIP] Allow parallelism for final aggregation Apr 1, 2022
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D35304854

mbasmanova added a commit to mbasmanova/velox-1 that referenced this pull request Apr 1, 2022
Summary:
X-link: facebookexternal/presto_cpp#691

Pull Request resolved: facebookincubator#1330

Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10.

To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.

It is now possible that final aggregation runs after local repartitioning which
produces dictionary encoded vectors. It turns out that many aggregate functions
are not prepared to handle non-flat intermediate results. To avoid breaking
these we forcibly flatten intermediate results before feeding these to
aggregate functions. [1]

There is now a local gather exchange upstream of the final Limit operator. When
Limit finishes early the producing pipeline before the exchange needs to be
notified so it can finish early as well. [2]

[1] Consider this global aggregation query.

```
presto> explain (type distributed)
           SELECT avg(discount), avg(quantity) FROM lineitem;
                                                                                     Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [avg, avg_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[_col0, _col1] => [avg:double, avg_2:double]
             _col0 := avg (1:35)
             _col1 := avg_2 (1:50)
         - Aggregate(FINAL) => [avg:double, avg_2:double]
                 avg := "presto.default.avg"((avg_10)) (1:35)
                 avg_2 := "presto.default.avg"((avg_11)) (1:50)
             - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
                 - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]

 Fragment 1 [tpch:orders:15000]
     Output layout: [avg_10, avg_11]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
             avg_10 := "presto.default.avg"((discount)) (1:35)
             avg_11 := "presto.default.avg"((quantity)) (1:50)
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double]
                 Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00}
                 quantity := tpch:quantity (1:69)
                 discount := tpch:discount (1:69)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL)  and there will be a gather local exchange in between.

[2] Consider this LIMIT query:

```
presto> explain (type distributed)
        SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17;
                                                                             Query Plan
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey] => [orderkey:bigint]
         - DistinctLimit[17] => [orderkey:bigint]
             - LocalExchange[SINGLE] () => [orderkey:bigint]
                 - RemoteSource[1] => [orderkey:bigint]

 Fragment 1 [tpch:orders:15000]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - DistinctLimitPartial[17] => [orderkey:bigint]
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint]
                 Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00}
                 orderkey := tpch:orderkey (1:49)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between.

Differential Revision: D35304854

fbshipit-source-id: cc51d0bd17e15cc98d56c50c66f70703cafa8949
mbasmanova added a commit to mbasmanova/velox-1 that referenced this pull request Apr 1, 2022
Summary:
X-link: facebookexternal/presto_cpp#691

Pull Request resolved: facebookincubator#1330

Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10.

To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.

It is now possible that final aggregation runs after local repartitioning which
produces dictionary encoded vectors. It turns out that many aggregate functions
are not prepared to handle non-flat intermediate results. To avoid breaking
these we forcibly flatten intermediate results before feeding these to
aggregate functions. [1]

There is now a local gather exchange upstream of the final Limit operator. When
Limit finishes early the producing pipeline before the exchange needs to be
notified so it can finish early as well. [2]

[1] Consider this global aggregation query.

```
presto> explain (type distributed)
           SELECT avg(discount), avg(quantity) FROM lineitem;
                                                                                     Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [avg, avg_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[_col0, _col1] => [avg:double, avg_2:double]
             _col0 := avg (1:35)
             _col1 := avg_2 (1:50)
         - Aggregate(FINAL) => [avg:double, avg_2:double]
                 avg := "presto.default.avg"((avg_10)) (1:35)
                 avg_2 := "presto.default.avg"((avg_11)) (1:50)
             - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
                 - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]

 Fragment 1 [tpch:orders:15000]
     Output layout: [avg_10, avg_11]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
             avg_10 := "presto.default.avg"((discount)) (1:35)
             avg_11 := "presto.default.avg"((quantity)) (1:50)
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double]
                 Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00}
                 quantity := tpch:quantity (1:69)
                 discount := tpch:discount (1:69)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL)  and there will be a gather local exchange in between.

[2] Consider this LIMIT query:

```
presto> explain (type distributed)
        SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17;
                                                                             Query Plan
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey] => [orderkey:bigint]
         - DistinctLimit[17] => [orderkey:bigint]
             - LocalExchange[SINGLE] () => [orderkey:bigint]
                 - RemoteSource[1] => [orderkey:bigint]

 Fragment 1 [tpch:orders:15000]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - DistinctLimitPartial[17] => [orderkey:bigint]
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint]
                 Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00}
                 orderkey := tpch:orderkey (1:49)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between.

Differential Revision: D35304854

fbshipit-source-id: a067764d7ec5bbd86fd14d0209e7d31115f1df06
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D35304854

@@ -659,12 +659,16 @@ using PartitionFunctionFactory =
/// different from input.
class LocalPartitionNode : public PlanNode {
public:
enum class Type { kGather, kRepartition, kReplicate };
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can you add a comment what each of these mean ?

Also, the naming of "Type" was a bit misleading... maybe "strategy" is more clear ?

@@ -109,6 +103,7 @@ class LocalExchangeSource {
std::vector<VeloxPromise<bool>> producerPromises_;
int pendingProducers_{0};
bool noMoreProducers_{false};
bool isFinished_{false};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General question : I understand what this variable is doing. But why was it not needed before ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a bug where the task with a local exchange would get stuck if a limit above the exchange would finish early. See #1332

@@ -948,7 +952,7 @@ void Task::terminate(TaskState terminalState) {
for (auto& pair : splitGroupState.second.bridges) {
oldBridges.emplace_back(std::move(pair.second));
}
splitGroupState.second.bridges.clear();
splitGroupState.second.clear();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this change about ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of #1332

Strictly speaking it is not necessary, but it is better to clear the whole state, rather than just the join bridges.

@@ -404,6 +404,10 @@ void Task::removeDriver(std::shared_ptr<Task> self, Driver* driver) {
splitGroupState.clear();
self->ensureSplitGroupsAreBeingProcessedLocked(self);
}
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't understand this change either. Please can you explain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We used to clear the state only in grouped execution, but ungrouped execution is no different in that regard.

@@ -659,12 +659,16 @@ using PartitionFunctionFactory =
/// different from input.
class LocalPartitionNode : public PlanNode {
public:
enum class Type { kGather, kRepartition, kReplicate };
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you planning to add support for kReplicate in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't plan to support kReplicate just yet.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove this or throw NYI if kReplicate is specified?

@@ -674,19 +678,24 @@ class LocalPartitionNode : public PlanNode {
"Local repartitioning node requires at least one source");
}

static std::shared_ptr<LocalPartitionNode> single(
static std::shared_ptr<LocalPartitionNode> gather(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used anywhere?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nm, I see that this is called from presto_cpp.

@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D35304854

mbasmanova added a commit to mbasmanova/velox-1 that referenced this pull request Apr 4, 2022
Summary:
X-link: facebookexternal/presto_cpp#691

Pull Request resolved: facebookincubator#1330

Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction. This allows queries like
 TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10.

To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.

It is now possible that final aggregation runs after local repartitioning which
produces dictionary encoded vectors. It turns out that many aggregate functions
are not prepared to handle non-flat intermediate results. To avoid breaking
these we forcibly flatten intermediate results before feeding these to
aggregate functions. [1]

There is now a local gather exchange upstream of the final Limit operator. When
Limit finishes early the producing pipeline before the exchange needs to be
notified so it can finish early as well. [2]

[1] Consider this global aggregation query.

```
presto> explain (type distributed)
           SELECT avg(discount), avg(quantity) FROM lineitem;
                                                                                     Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [avg, avg_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[_col0, _col1] => [avg:double, avg_2:double]
             _col0 := avg (1:35)
             _col1 := avg_2 (1:50)
         - Aggregate(FINAL) => [avg:double, avg_2:double]
                 avg := "presto.default.avg"((avg_10)) (1:35)
                 avg_2 := "presto.default.avg"((avg_11)) (1:50)
             - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
                 - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]

 Fragment 1 [tpch:orders:15000]
     Output layout: [avg_10, avg_11]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
             avg_10 := "presto.default.avg"((discount)) (1:35)
             avg_11 := "presto.default.avg"((quantity)) (1:50)
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double]
                 Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00}
                 quantity := tpch:quantity (1:69)
                 discount := tpch:discount (1:69)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL)  and there will be a gather local exchange in between.

[2] Consider this LIMIT query:

```
presto> explain (type distributed)
        SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17;
                                                                             Query Plan
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey] => [orderkey:bigint]
         - DistinctLimit[17] => [orderkey:bigint]
             - LocalExchange[SINGLE] () => [orderkey:bigint]
                 - RemoteSource[1] => [orderkey:bigint]

 Fragment 1 [tpch:orders:15000]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - DistinctLimitPartial[17] => [orderkey:bigint]
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint]
                 Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00}
                 orderkey := tpch:orderkey (1:49)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between.

Differential Revision: D35304854

fbshipit-source-id: 0873d02ca590c815ac6add50b27440bed6fb6087
mbasmanova added a commit to mbasmanova/velox-1 that referenced this pull request Apr 4, 2022
Summary:
X-link: https://github.com/facebookexternal/presto_cpp/pull/691

Pull Request resolved: facebookincubator#1330

Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction. This allows queries like
 TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10.

To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.

It is now possible that final aggregation runs after local repartitioning which
produces dictionary encoded vectors. It turns out that many aggregate functions
are not prepared to handle non-flat intermediate results. To avoid breaking
these we forcibly flatten intermediate results before feeding these to
aggregate functions. [1]

There is now a local gather exchange upstream of the final Limit operator. When
Limit finishes early the producing pipeline before the exchange needs to be
notified so it can finish early as well. [2]

[1] Consider this global aggregation query.

```
presto> explain (type distributed)
           SELECT avg(discount), avg(quantity) FROM lineitem;
                                                                                     Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [avg, avg_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[_col0, _col1] => [avg:double, avg_2:double]
             _col0 := avg (1:35)
             _col1 := avg_2 (1:50)
         - Aggregate(FINAL) => [avg:double, avg_2:double]
                 avg := "presto.default.avg"((avg_10)) (1:35)
                 avg_2 := "presto.default.avg"((avg_11)) (1:50)
             - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
                 - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]

 Fragment 1 [tpch:orders:15000]
     Output layout: [avg_10, avg_11]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
             avg_10 := "presto.default.avg"((discount)) (1:35)
             avg_11 := "presto.default.avg"((quantity)) (1:50)
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double]
                 Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00}
                 quantity := tpch:quantity (1:69)
                 discount := tpch:discount (1:69)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL)  and there will be a gather local exchange in between.

[2] Consider this LIMIT query:

```
presto> explain (type distributed)
        SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17;
                                                                             Query Plan
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey] => [orderkey:bigint]
         - DistinctLimit[17] => [orderkey:bigint]
             - LocalExchange[SINGLE] () => [orderkey:bigint]
                 - RemoteSource[1] => [orderkey:bigint]

 Fragment 1 [tpch:orders:15000]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - DistinctLimitPartial[17] => [orderkey:bigint]
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint]
                 Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00}
                 orderkey := tpch:orderkey (1:49)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between.

Differential Revision: D35304854

fbshipit-source-id: 0873d02ca590c815ac6add50b27440bed6fb6087
mbasmanova added a commit to mbasmanova/velox-1 that referenced this pull request Apr 4, 2022
Summary:
X-link: https://github.com/facebookexternal/presto_cpp/pull/691

Pull Request resolved: facebookincubator#1330

Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction. This allows queries like
 TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10.

To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.

It is now possible that final aggregation runs after local repartitioning which
produces dictionary encoded vectors. It turns out that many aggregate functions
are not prepared to handle non-flat intermediate results. To avoid breaking
these we forcibly flatten intermediate results before feeding these to
aggregate functions. [1]

There is now a local gather exchange upstream of the final Limit operator. When
Limit finishes early the producing pipeline before the exchange needs to be
notified so it can finish early as well. [2]

[1] Consider this global aggregation query.

```
presto> explain (type distributed)
           SELECT avg(discount), avg(quantity) FROM lineitem;
                                                                                     Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [avg, avg_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[_col0, _col1] => [avg:double, avg_2:double]
             _col0 := avg (1:35)
             _col1 := avg_2 (1:50)
         - Aggregate(FINAL) => [avg:double, avg_2:double]
                 avg := "presto.default.avg"((avg_10)) (1:35)
                 avg_2 := "presto.default.avg"((avg_11)) (1:50)
             - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
                 - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]

 Fragment 1 [tpch:orders:15000]
     Output layout: [avg_10, avg_11]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
             avg_10 := "presto.default.avg"((discount)) (1:35)
             avg_11 := "presto.default.avg"((quantity)) (1:50)
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double]
                 Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00}
                 quantity := tpch:quantity (1:69)
                 discount := tpch:discount (1:69)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL)  and there will be a gather local exchange in between.

[2] Consider this LIMIT query:

```
presto> explain (type distributed)
        SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17;
                                                                             Query Plan
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey] => [orderkey:bigint]
         - DistinctLimit[17] => [orderkey:bigint]
             - LocalExchange[SINGLE] () => [orderkey:bigint]
                 - RemoteSource[1] => [orderkey:bigint]

 Fragment 1 [tpch:orders:15000]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - DistinctLimitPartial[17] => [orderkey:bigint]
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint]
                 Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00}
                 orderkey := tpch:orderkey (1:49)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between.

Differential Revision: D35304854

fbshipit-source-id: 0873d02ca590c815ac6add50b27440bed6fb6087
Summary:
X-link: facebookexternal/presto_cpp#691

Pull Request resolved: facebookincubator#1330

Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction. This allows queries like
 TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10.

To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.

[1] Consider this global aggregation query.

```
presto> explain (type distributed)
           SELECT avg(discount), avg(quantity) FROM lineitem;
                                                                                     Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [avg, avg_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[_col0, _col1] => [avg:double, avg_2:double]
             _col0 := avg (1:35)
             _col1 := avg_2 (1:50)
         - Aggregate(FINAL) => [avg:double, avg_2:double]
                 avg := "presto.default.avg"((avg_10)) (1:35)
                 avg_2 := "presto.default.avg"((avg_11)) (1:50)
             - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
                 - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]

 Fragment 1 [tpch:orders:15000]
     Output layout: [avg_10, avg_11]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
             avg_10 := "presto.default.avg"((discount)) (1:35)
             avg_11 := "presto.default.avg"((quantity)) (1:50)
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double]
                 Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00}
                 quantity := tpch:quantity (1:69)
                 discount := tpch:discount (1:69)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL)  and there will be a gather local exchange in between.

[2] Consider this LIMIT query:

```
presto> explain (type distributed)
        SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17;
                                                                             Query Plan
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey] => [orderkey:bigint]
         - DistinctLimit[17] => [orderkey:bigint]
             - LocalExchange[SINGLE] () => [orderkey:bigint]
                 - RemoteSource[1] => [orderkey:bigint]

 Fragment 1 [tpch:orders:15000]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - DistinctLimitPartial[17] => [orderkey:bigint]
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint]
                 Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00}
                 orderkey := tpch:orderkey (1:49)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between.

Reviewed By: Yuhta

Differential Revision: D35304854

fbshipit-source-id: c45e99bb744f230683fe8972ca3b63f2628e6b3f
@facebook-github-bot
Copy link
Contributor

This pull request was exported from Phabricator. Differential Revision: D35304854

shiyu-bytedance pushed a commit to shiyu-bytedance/velox-1 that referenced this pull request Aug 18, 2022
Summary:
X-link: https://github.com/facebookexternal/presto_cpp/pull/691

Pull Request resolved: facebookincubator#1330

Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction. This allows queries like
 TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10.

To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.

[1] Consider this global aggregation query.

```
presto> explain (type distributed)
           SELECT avg(discount), avg(quantity) FROM lineitem;
                                                                                     Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [avg, avg_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[_col0, _col1] => [avg:double, avg_2:double]
             _col0 := avg (1:35)
             _col1 := avg_2 (1:50)
         - Aggregate(FINAL) => [avg:double, avg_2:double]
                 avg := "presto.default.avg"((avg_10)) (1:35)
                 avg_2 := "presto.default.avg"((avg_11)) (1:50)
             - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
                 - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]

 Fragment 1 [tpch:orders:15000]
     Output layout: [avg_10, avg_11]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
             avg_10 := "presto.default.avg"((discount)) (1:35)
             avg_11 := "presto.default.avg"((quantity)) (1:50)
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double]
                 Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00}
                 quantity := tpch:quantity (1:69)
                 discount := tpch:discount (1:69)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL)  and there will be a gather local exchange in between.

[2] Consider this LIMIT query:

```
presto> explain (type distributed)
        SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17;
                                                                             Query Plan
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey] => [orderkey:bigint]
         - DistinctLimit[17] => [orderkey:bigint]
             - LocalExchange[SINGLE] () => [orderkey:bigint]
                 - RemoteSource[1] => [orderkey:bigint]

 Fragment 1 [tpch:orders:15000]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - DistinctLimitPartial[17] => [orderkey:bigint]
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint]
                 Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00}
                 orderkey := tpch:orderkey (1:49)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between.

Reviewed By: Yuhta

Differential Revision: D35304854

fbshipit-source-id: a37f8c09f6bb481edcfa29c033c4e4b092eeb173
marin-ma pushed a commit to marin-ma/velox-oap that referenced this pull request Dec 15, 2023
…tor#1330)

Support the partial merge phase brings by count distinct.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. fb-exported
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants