Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set icu4c path in CMAKE_PREFIX_PATH only when it's not there #691

Closed
wants to merge 5 commits into from

Conversation

mshang816
Copy link
Contributor

This is for the cmake to pick the icu4c installed under ~/deps for our circle ci runs. Or it will use the one under /usr/local/opt/icu4c

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Dec 2, 2021
@@ -219,7 +219,11 @@ find_package(ZLIB)
find_library(SNAPPY snappy)

if(CMAKE_SYSTEM_NAME MATCHES "Darwin")
set(CMAKE_PREFIX_PATH "/usr/local/opt/icu4c" ${CMAKE_PREFIX_PATH})
if(NOT CMAKE_PREFIX_PATH MATCHES ".*\/icu4c.*")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we should not add paths that were not created by the CMake script / part of the repo to the prefix path in our CMake script, but I can't find any discussion of best practices around this.

If you want to keep this default, at least append rather than prepend the path; that way it will not override developer-provided paths.

@mbasmanova
Copy link
Contributor

Closing stale PRs. Feel free to re-open if decide to continue working on this.

@mbasmanova mbasmanova closed this Mar 17, 2022
mbasmanova added a commit to mbasmanova/velox-1 that referenced this pull request Apr 1, 2022
Summary:
X-link: facebookexternal/presto_cpp#691

Pull Request resolved: facebookincubator#1330

Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10.

To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.

It is now possible that final aggregation runs after local repartitioning which
produces dictionary encoded vectors. It turns out that many aggregate functions
are not prepared to handle non-flat intermediate results. To avoid breaking
these we forcibly flatten intermediate results before feeding these to
aggregate functions. [1]

There is now a local gather exchange upstream of the final Limit operator. When
Limit finishes early the producing pipeline before the exchange needs to be
notified so it can finish early as well. [2]

[1] Consider this global aggregation query.

```
presto> explain (type distributed)
           SELECT avg(discount), avg(quantity) FROM lineitem;
                                                                                     Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [avg, avg_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[_col0, _col1] => [avg:double, avg_2:double]
             _col0 := avg (1:35)
             _col1 := avg_2 (1:50)
         - Aggregate(FINAL) => [avg:double, avg_2:double]
                 avg := "presto.default.avg"((avg_10)) (1:35)
                 avg_2 := "presto.default.avg"((avg_11)) (1:50)
             - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
                 - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]

 Fragment 1 [tpch:orders:15000]
     Output layout: [avg_10, avg_11]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
             avg_10 := "presto.default.avg"((discount)) (1:35)
             avg_11 := "presto.default.avg"((quantity)) (1:50)
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double]
                 Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00}
                 quantity := tpch:quantity (1:69)
                 discount := tpch:discount (1:69)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL)  and there will be a gather local exchange in between.

[2] Consider this LIMIT query:

```
presto> explain (type distributed)
        SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17;
                                                                             Query Plan
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey] => [orderkey:bigint]
         - DistinctLimit[17] => [orderkey:bigint]
             - LocalExchange[SINGLE] () => [orderkey:bigint]
                 - RemoteSource[1] => [orderkey:bigint]

 Fragment 1 [tpch:orders:15000]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - DistinctLimitPartial[17] => [orderkey:bigint]
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint]
                 Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00}
                 orderkey := tpch:orderkey (1:49)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between.

Differential Revision: D35304854

fbshipit-source-id: cc51d0bd17e15cc98d56c50c66f70703cafa8949
mbasmanova added a commit to mbasmanova/velox-1 that referenced this pull request Apr 1, 2022
Summary:
X-link: facebookexternal/presto_cpp#691

Pull Request resolved: facebookincubator#1330

Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10.

To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.

It is now possible that final aggregation runs after local repartitioning which
produces dictionary encoded vectors. It turns out that many aggregate functions
are not prepared to handle non-flat intermediate results. To avoid breaking
these we forcibly flatten intermediate results before feeding these to
aggregate functions. [1]

There is now a local gather exchange upstream of the final Limit operator. When
Limit finishes early the producing pipeline before the exchange needs to be
notified so it can finish early as well. [2]

[1] Consider this global aggregation query.

```
presto> explain (type distributed)
           SELECT avg(discount), avg(quantity) FROM lineitem;
                                                                                     Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [avg, avg_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[_col0, _col1] => [avg:double, avg_2:double]
             _col0 := avg (1:35)
             _col1 := avg_2 (1:50)
         - Aggregate(FINAL) => [avg:double, avg_2:double]
                 avg := "presto.default.avg"((avg_10)) (1:35)
                 avg_2 := "presto.default.avg"((avg_11)) (1:50)
             - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
                 - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]

 Fragment 1 [tpch:orders:15000]
     Output layout: [avg_10, avg_11]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
             avg_10 := "presto.default.avg"((discount)) (1:35)
             avg_11 := "presto.default.avg"((quantity)) (1:50)
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double]
                 Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00}
                 quantity := tpch:quantity (1:69)
                 discount := tpch:discount (1:69)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL)  and there will be a gather local exchange in between.

[2] Consider this LIMIT query:

```
presto> explain (type distributed)
        SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17;
                                                                             Query Plan
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey] => [orderkey:bigint]
         - DistinctLimit[17] => [orderkey:bigint]
             - LocalExchange[SINGLE] () => [orderkey:bigint]
                 - RemoteSource[1] => [orderkey:bigint]

 Fragment 1 [tpch:orders:15000]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - DistinctLimitPartial[17] => [orderkey:bigint]
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint]
                 Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00}
                 orderkey := tpch:orderkey (1:49)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between.

Differential Revision: D35304854

fbshipit-source-id: a067764d7ec5bbd86fd14d0209e7d31115f1df06
mbasmanova added a commit to mbasmanova/velox-1 that referenced this pull request Apr 4, 2022
Summary:
X-link: facebookexternal/presto_cpp#691

Pull Request resolved: facebookincubator#1330

Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction. This allows queries like
 TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10.

To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.

It is now possible that final aggregation runs after local repartitioning which
produces dictionary encoded vectors. It turns out that many aggregate functions
are not prepared to handle non-flat intermediate results. To avoid breaking
these we forcibly flatten intermediate results before feeding these to
aggregate functions. [1]

There is now a local gather exchange upstream of the final Limit operator. When
Limit finishes early the producing pipeline before the exchange needs to be
notified so it can finish early as well. [2]

[1] Consider this global aggregation query.

```
presto> explain (type distributed)
           SELECT avg(discount), avg(quantity) FROM lineitem;
                                                                                     Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [avg, avg_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[_col0, _col1] => [avg:double, avg_2:double]
             _col0 := avg (1:35)
             _col1 := avg_2 (1:50)
         - Aggregate(FINAL) => [avg:double, avg_2:double]
                 avg := "presto.default.avg"((avg_10)) (1:35)
                 avg_2 := "presto.default.avg"((avg_11)) (1:50)
             - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
                 - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]

 Fragment 1 [tpch:orders:15000]
     Output layout: [avg_10, avg_11]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
             avg_10 := "presto.default.avg"((discount)) (1:35)
             avg_11 := "presto.default.avg"((quantity)) (1:50)
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double]
                 Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00}
                 quantity := tpch:quantity (1:69)
                 discount := tpch:discount (1:69)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL)  and there will be a gather local exchange in between.

[2] Consider this LIMIT query:

```
presto> explain (type distributed)
        SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17;
                                                                             Query Plan
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey] => [orderkey:bigint]
         - DistinctLimit[17] => [orderkey:bigint]
             - LocalExchange[SINGLE] () => [orderkey:bigint]
                 - RemoteSource[1] => [orderkey:bigint]

 Fragment 1 [tpch:orders:15000]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - DistinctLimitPartial[17] => [orderkey:bigint]
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint]
                 Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00}
                 orderkey := tpch:orderkey (1:49)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between.

Differential Revision: D35304854

fbshipit-source-id: 0873d02ca590c815ac6add50b27440bed6fb6087
mbasmanova added a commit to mbasmanova/velox-1 that referenced this pull request Apr 4, 2022
Summary:
X-link: https://github.com/facebookexternal/presto_cpp/pull/691

Pull Request resolved: facebookincubator#1330

Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction. This allows queries like
 TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10.

To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.

It is now possible that final aggregation runs after local repartitioning which
produces dictionary encoded vectors. It turns out that many aggregate functions
are not prepared to handle non-flat intermediate results. To avoid breaking
these we forcibly flatten intermediate results before feeding these to
aggregate functions. [1]

There is now a local gather exchange upstream of the final Limit operator. When
Limit finishes early the producing pipeline before the exchange needs to be
notified so it can finish early as well. [2]

[1] Consider this global aggregation query.

```
presto> explain (type distributed)
           SELECT avg(discount), avg(quantity) FROM lineitem;
                                                                                     Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [avg, avg_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[_col0, _col1] => [avg:double, avg_2:double]
             _col0 := avg (1:35)
             _col1 := avg_2 (1:50)
         - Aggregate(FINAL) => [avg:double, avg_2:double]
                 avg := "presto.default.avg"((avg_10)) (1:35)
                 avg_2 := "presto.default.avg"((avg_11)) (1:50)
             - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
                 - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]

 Fragment 1 [tpch:orders:15000]
     Output layout: [avg_10, avg_11]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
             avg_10 := "presto.default.avg"((discount)) (1:35)
             avg_11 := "presto.default.avg"((quantity)) (1:50)
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double]
                 Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00}
                 quantity := tpch:quantity (1:69)
                 discount := tpch:discount (1:69)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL)  and there will be a gather local exchange in between.

[2] Consider this LIMIT query:

```
presto> explain (type distributed)
        SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17;
                                                                             Query Plan
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey] => [orderkey:bigint]
         - DistinctLimit[17] => [orderkey:bigint]
             - LocalExchange[SINGLE] () => [orderkey:bigint]
                 - RemoteSource[1] => [orderkey:bigint]

 Fragment 1 [tpch:orders:15000]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - DistinctLimitPartial[17] => [orderkey:bigint]
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint]
                 Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00}
                 orderkey := tpch:orderkey (1:49)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between.

Differential Revision: D35304854

fbshipit-source-id: 0873d02ca590c815ac6add50b27440bed6fb6087
mbasmanova added a commit to mbasmanova/velox-1 that referenced this pull request Apr 4, 2022
Summary:
X-link: https://github.com/facebookexternal/presto_cpp/pull/691

Pull Request resolved: facebookincubator#1330

Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction. This allows queries like
 TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10.

To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.

It is now possible that final aggregation runs after local repartitioning which
produces dictionary encoded vectors. It turns out that many aggregate functions
are not prepared to handle non-flat intermediate results. To avoid breaking
these we forcibly flatten intermediate results before feeding these to
aggregate functions. [1]

There is now a local gather exchange upstream of the final Limit operator. When
Limit finishes early the producing pipeline before the exchange needs to be
notified so it can finish early as well. [2]

[1] Consider this global aggregation query.

```
presto> explain (type distributed)
           SELECT avg(discount), avg(quantity) FROM lineitem;
                                                                                     Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [avg, avg_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[_col0, _col1] => [avg:double, avg_2:double]
             _col0 := avg (1:35)
             _col1 := avg_2 (1:50)
         - Aggregate(FINAL) => [avg:double, avg_2:double]
                 avg := "presto.default.avg"((avg_10)) (1:35)
                 avg_2 := "presto.default.avg"((avg_11)) (1:50)
             - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
                 - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]

 Fragment 1 [tpch:orders:15000]
     Output layout: [avg_10, avg_11]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
             avg_10 := "presto.default.avg"((discount)) (1:35)
             avg_11 := "presto.default.avg"((quantity)) (1:50)
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double]
                 Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00}
                 quantity := tpch:quantity (1:69)
                 discount := tpch:discount (1:69)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL)  and there will be a gather local exchange in between.

[2] Consider this LIMIT query:

```
presto> explain (type distributed)
        SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17;
                                                                             Query Plan
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey] => [orderkey:bigint]
         - DistinctLimit[17] => [orderkey:bigint]
             - LocalExchange[SINGLE] () => [orderkey:bigint]
                 - RemoteSource[1] => [orderkey:bigint]

 Fragment 1 [tpch:orders:15000]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - DistinctLimitPartial[17] => [orderkey:bigint]
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint]
                 Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00}
                 orderkey := tpch:orderkey (1:49)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between.

Differential Revision: D35304854

fbshipit-source-id: 0873d02ca590c815ac6add50b27440bed6fb6087
mbasmanova added a commit to mbasmanova/velox-1 that referenced this pull request Apr 5, 2022
Summary:
X-link: facebookexternal/presto_cpp#691

Pull Request resolved: facebookincubator#1330

Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction. This allows queries like
 TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10.

To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.

[1] Consider this global aggregation query.

```
presto> explain (type distributed)
           SELECT avg(discount), avg(quantity) FROM lineitem;
                                                                                     Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [avg, avg_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[_col0, _col1] => [avg:double, avg_2:double]
             _col0 := avg (1:35)
             _col1 := avg_2 (1:50)
         - Aggregate(FINAL) => [avg:double, avg_2:double]
                 avg := "presto.default.avg"((avg_10)) (1:35)
                 avg_2 := "presto.default.avg"((avg_11)) (1:50)
             - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
                 - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]

 Fragment 1 [tpch:orders:15000]
     Output layout: [avg_10, avg_11]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
             avg_10 := "presto.default.avg"((discount)) (1:35)
             avg_11 := "presto.default.avg"((quantity)) (1:50)
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double]
                 Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00}
                 quantity := tpch:quantity (1:69)
                 discount := tpch:discount (1:69)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL)  and there will be a gather local exchange in between.

[2] Consider this LIMIT query:

```
presto> explain (type distributed)
        SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17;
                                                                             Query Plan
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey] => [orderkey:bigint]
         - DistinctLimit[17] => [orderkey:bigint]
             - LocalExchange[SINGLE] () => [orderkey:bigint]
                 - RemoteSource[1] => [orderkey:bigint]

 Fragment 1 [tpch:orders:15000]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - DistinctLimitPartial[17] => [orderkey:bigint]
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint]
                 Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00}
                 orderkey := tpch:orderkey (1:49)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between.

Reviewed By: Yuhta

Differential Revision: D35304854

fbshipit-source-id: c45e99bb744f230683fe8972ca3b63f2628e6b3f
facebook-github-bot pushed a commit that referenced this pull request Apr 6, 2022
Summary:
X-link: facebookexternal/presto_cpp#691

Pull Request resolved: #1330

Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction. This allows queries like
 TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10.

To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.

[1] Consider this global aggregation query.

```
presto> explain (type distributed)
           SELECT avg(discount), avg(quantity) FROM lineitem;
                                                                                     Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [avg, avg_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[_col0, _col1] => [avg:double, avg_2:double]
             _col0 := avg (1:35)
             _col1 := avg_2 (1:50)
         - Aggregate(FINAL) => [avg:double, avg_2:double]
                 avg := "presto.default.avg"((avg_10)) (1:35)
                 avg_2 := "presto.default.avg"((avg_11)) (1:50)
             - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
                 - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]

 Fragment 1 [tpch:orders:15000]
     Output layout: [avg_10, avg_11]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
             avg_10 := "presto.default.avg"((discount)) (1:35)
             avg_11 := "presto.default.avg"((quantity)) (1:50)
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double]
                 Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00}
                 quantity := tpch:quantity (1:69)
                 discount := tpch:discount (1:69)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL)  and there will be a gather local exchange in between.

[2] Consider this LIMIT query:

```
presto> explain (type distributed)
        SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17;
                                                                             Query Plan
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey] => [orderkey:bigint]
         - DistinctLimit[17] => [orderkey:bigint]
             - LocalExchange[SINGLE] () => [orderkey:bigint]
                 - RemoteSource[1] => [orderkey:bigint]

 Fragment 1 [tpch:orders:15000]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - DistinctLimitPartial[17] => [orderkey:bigint]
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint]
                 Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00}
                 orderkey := tpch:orderkey (1:49)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between.

Reviewed By: Yuhta

Differential Revision: D35304854

fbshipit-source-id: a37f8c09f6bb481edcfa29c033c4e4b092eeb173
shiyu-bytedance pushed a commit to shiyu-bytedance/velox-1 that referenced this pull request Aug 18, 2022
Summary:
X-link: https://github.com/facebookexternal/presto_cpp/pull/691

Pull Request resolved: facebookincubator#1330

Final and single aggregations can run multi-threaded as long as data is
partitioned on the grouping keys, but we used to artificially restrict these to
run single-threaded. This change lifts the restriction. This allows queries like
 TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10.

To make this work we must not ignore any local exchange node in the query plan.
Prestissimo used to silently drop gather and round-robin repartitioning local
exchanges. We no longer do that.

[1] Consider this global aggregation query.

```
presto> explain (type distributed)
           SELECT avg(discount), avg(quantity) FROM lineitem;
                                                                                     Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [avg, avg_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[_col0, _col1] => [avg:double, avg_2:double]
             _col0 := avg (1:35)
             _col1 := avg_2 (1:50)
         - Aggregate(FINAL) => [avg:double, avg_2:double]
                 avg := "presto.default.avg"((avg_10)) (1:35)
                 avg_2 := "presto.default.avg"((avg_11)) (1:50)
             - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
                 - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]

 Fragment 1 [tpch:orders:15000]
     Output layout: [avg_10, avg_11]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)]
             avg_10 := "presto.default.avg"((discount)) (1:35)
             avg_11 := "presto.default.avg"((quantity)) (1:50)
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double]
                 Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00}
                 quantity := tpch:quantity (1:69)
                 discount := tpch:discount (1:69)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL)  and there will be a gather local exchange in between.

[2] Consider this LIMIT query:

```
presto> explain (type distributed)
        SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17;
                                                                             Query Plan
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 0 [SINGLE]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey] => [orderkey:bigint]
         - DistinctLimit[17] => [orderkey:bigint]
             - LocalExchange[SINGLE] () => [orderkey:bigint]
                 - RemoteSource[1] => [orderkey:bigint]

 Fragment 1 [tpch:orders:15000]
     Output layout: [orderkey]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - DistinctLimitPartial[17] => [orderkey:bigint]
         - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint]
                 Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00}
                 orderkey := tpch:orderkey (1:49)
```

Fragment 0 includes a gather local exchange: LocalExchange[SINGLE].

Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between.

Reviewed By: Yuhta

Differential Revision: D35304854

fbshipit-source-id: a37f8c09f6bb481edcfa29c033c4e4b092eeb173
rui-mo pushed a commit to rui-mo/velox that referenced this pull request Mar 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants