-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Set icu4c path in CMAKE_PREFIX_PATH only when it's not there #691
Conversation
@@ -219,7 +219,11 @@ find_package(ZLIB) | |||
find_library(SNAPPY snappy) | |||
|
|||
if(CMAKE_SYSTEM_NAME MATCHES "Darwin") | |||
set(CMAKE_PREFIX_PATH "/usr/local/opt/icu4c" ${CMAKE_PREFIX_PATH}) | |||
if(NOT CMAKE_PREFIX_PATH MATCHES ".*\/icu4c.*") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should not add paths that were not created by the CMake script / part of the repo to the prefix path in our CMake script, but I can't find any discussion of best practices around this.
If you want to keep this default, at least append rather than prepend the path; that way it will not override developer-provided paths.
Closing stale PRs. Feel free to re-open if decide to continue working on this. |
Summary: X-link: facebookexternal/presto_cpp#691 Pull Request resolved: facebookincubator#1330 Final and single aggregations can run multi-threaded as long as data is partitioned on the grouping keys, but we used to artificially restrict these to run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10. To make this work we must not ignore any local exchange node in the query plan. Prestissimo used to silently drop gather and round-robin repartitioning local exchanges. We no longer do that. It is now possible that final aggregation runs after local repartitioning which produces dictionary encoded vectors. It turns out that many aggregate functions are not prepared to handle non-flat intermediate results. To avoid breaking these we forcibly flatten intermediate results before feeding these to aggregate functions. [1] There is now a local gather exchange upstream of the final Limit operator. When Limit finishes early the producing pipeline before the exchange needs to be notified so it can finish early as well. [2] [1] Consider this global aggregation query. ``` presto> explain (type distributed) SELECT avg(discount), avg(quantity) FROM lineitem; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [avg, avg_2] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[_col0, _col1] => [avg:double, avg_2:double] _col0 := avg (1:35) _col1 := avg_2 (1:50) - Aggregate(FINAL) => [avg:double, avg_2:double] avg := "presto.default.avg"((avg_10)) (1:35) avg_2 := "presto.default.avg"((avg_11)) (1:50) - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] Fragment 1 [tpch:orders:15000] Output layout: [avg_10, avg_11] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] avg_10 := "presto.default.avg"((discount)) (1:35) avg_11 := "presto.default.avg"((quantity)) (1:50) - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double] Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00} quantity := tpch:quantity (1:69) discount := tpch:discount (1:69) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL) and there will be a gather local exchange in between. [2] Consider this LIMIT query: ``` presto> explain (type distributed) SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17; Query Plan -------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[orderkey] => [orderkey:bigint] - DistinctLimit[17] => [orderkey:bigint] - LocalExchange[SINGLE] () => [orderkey:bigint] - RemoteSource[1] => [orderkey:bigint] Fragment 1 [tpch:orders:15000] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - DistinctLimitPartial[17] => [orderkey:bigint] - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint] Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00} orderkey := tpch:orderkey (1:49) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between. Differential Revision: D35304854 fbshipit-source-id: cc51d0bd17e15cc98d56c50c66f70703cafa8949
Summary: X-link: facebookexternal/presto_cpp#691 Pull Request resolved: facebookincubator#1330 Final and single aggregations can run multi-threaded as long as data is partitioned on the grouping keys, but we used to artificially restrict these to run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10. To make this work we must not ignore any local exchange node in the query plan. Prestissimo used to silently drop gather and round-robin repartitioning local exchanges. We no longer do that. It is now possible that final aggregation runs after local repartitioning which produces dictionary encoded vectors. It turns out that many aggregate functions are not prepared to handle non-flat intermediate results. To avoid breaking these we forcibly flatten intermediate results before feeding these to aggregate functions. [1] There is now a local gather exchange upstream of the final Limit operator. When Limit finishes early the producing pipeline before the exchange needs to be notified so it can finish early as well. [2] [1] Consider this global aggregation query. ``` presto> explain (type distributed) SELECT avg(discount), avg(quantity) FROM lineitem; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [avg, avg_2] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[_col0, _col1] => [avg:double, avg_2:double] _col0 := avg (1:35) _col1 := avg_2 (1:50) - Aggregate(FINAL) => [avg:double, avg_2:double] avg := "presto.default.avg"((avg_10)) (1:35) avg_2 := "presto.default.avg"((avg_11)) (1:50) - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] Fragment 1 [tpch:orders:15000] Output layout: [avg_10, avg_11] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] avg_10 := "presto.default.avg"((discount)) (1:35) avg_11 := "presto.default.avg"((quantity)) (1:50) - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double] Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00} quantity := tpch:quantity (1:69) discount := tpch:discount (1:69) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL) and there will be a gather local exchange in between. [2] Consider this LIMIT query: ``` presto> explain (type distributed) SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17; Query Plan -------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[orderkey] => [orderkey:bigint] - DistinctLimit[17] => [orderkey:bigint] - LocalExchange[SINGLE] () => [orderkey:bigint] - RemoteSource[1] => [orderkey:bigint] Fragment 1 [tpch:orders:15000] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - DistinctLimitPartial[17] => [orderkey:bigint] - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint] Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00} orderkey := tpch:orderkey (1:49) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between. Differential Revision: D35304854 fbshipit-source-id: a067764d7ec5bbd86fd14d0209e7d31115f1df06
Summary: X-link: facebookexternal/presto_cpp#691 Pull Request resolved: facebookincubator#1330 Final and single aggregations can run multi-threaded as long as data is partitioned on the grouping keys, but we used to artificially restrict these to run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10. To make this work we must not ignore any local exchange node in the query plan. Prestissimo used to silently drop gather and round-robin repartitioning local exchanges. We no longer do that. It is now possible that final aggregation runs after local repartitioning which produces dictionary encoded vectors. It turns out that many aggregate functions are not prepared to handle non-flat intermediate results. To avoid breaking these we forcibly flatten intermediate results before feeding these to aggregate functions. [1] There is now a local gather exchange upstream of the final Limit operator. When Limit finishes early the producing pipeline before the exchange needs to be notified so it can finish early as well. [2] [1] Consider this global aggregation query. ``` presto> explain (type distributed) SELECT avg(discount), avg(quantity) FROM lineitem; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [avg, avg_2] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[_col0, _col1] => [avg:double, avg_2:double] _col0 := avg (1:35) _col1 := avg_2 (1:50) - Aggregate(FINAL) => [avg:double, avg_2:double] avg := "presto.default.avg"((avg_10)) (1:35) avg_2 := "presto.default.avg"((avg_11)) (1:50) - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] Fragment 1 [tpch:orders:15000] Output layout: [avg_10, avg_11] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] avg_10 := "presto.default.avg"((discount)) (1:35) avg_11 := "presto.default.avg"((quantity)) (1:50) - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double] Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00} quantity := tpch:quantity (1:69) discount := tpch:discount (1:69) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL) and there will be a gather local exchange in between. [2] Consider this LIMIT query: ``` presto> explain (type distributed) SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17; Query Plan -------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[orderkey] => [orderkey:bigint] - DistinctLimit[17] => [orderkey:bigint] - LocalExchange[SINGLE] () => [orderkey:bigint] - RemoteSource[1] => [orderkey:bigint] Fragment 1 [tpch:orders:15000] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - DistinctLimitPartial[17] => [orderkey:bigint] - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint] Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00} orderkey := tpch:orderkey (1:49) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between. Differential Revision: D35304854 fbshipit-source-id: 0873d02ca590c815ac6add50b27440bed6fb6087
Summary: X-link: https://github.com/facebookexternal/presto_cpp/pull/691 Pull Request resolved: facebookincubator#1330 Final and single aggregations can run multi-threaded as long as data is partitioned on the grouping keys, but we used to artificially restrict these to run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10. To make this work we must not ignore any local exchange node in the query plan. Prestissimo used to silently drop gather and round-robin repartitioning local exchanges. We no longer do that. It is now possible that final aggregation runs after local repartitioning which produces dictionary encoded vectors. It turns out that many aggregate functions are not prepared to handle non-flat intermediate results. To avoid breaking these we forcibly flatten intermediate results before feeding these to aggregate functions. [1] There is now a local gather exchange upstream of the final Limit operator. When Limit finishes early the producing pipeline before the exchange needs to be notified so it can finish early as well. [2] [1] Consider this global aggregation query. ``` presto> explain (type distributed) SELECT avg(discount), avg(quantity) FROM lineitem; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [avg, avg_2] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[_col0, _col1] => [avg:double, avg_2:double] _col0 := avg (1:35) _col1 := avg_2 (1:50) - Aggregate(FINAL) => [avg:double, avg_2:double] avg := "presto.default.avg"((avg_10)) (1:35) avg_2 := "presto.default.avg"((avg_11)) (1:50) - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] Fragment 1 [tpch:orders:15000] Output layout: [avg_10, avg_11] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] avg_10 := "presto.default.avg"((discount)) (1:35) avg_11 := "presto.default.avg"((quantity)) (1:50) - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double] Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00} quantity := tpch:quantity (1:69) discount := tpch:discount (1:69) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL) and there will be a gather local exchange in between. [2] Consider this LIMIT query: ``` presto> explain (type distributed) SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17; Query Plan -------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[orderkey] => [orderkey:bigint] - DistinctLimit[17] => [orderkey:bigint] - LocalExchange[SINGLE] () => [orderkey:bigint] - RemoteSource[1] => [orderkey:bigint] Fragment 1 [tpch:orders:15000] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - DistinctLimitPartial[17] => [orderkey:bigint] - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint] Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00} orderkey := tpch:orderkey (1:49) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between. Differential Revision: D35304854 fbshipit-source-id: 0873d02ca590c815ac6add50b27440bed6fb6087
Summary: X-link: https://github.com/facebookexternal/presto_cpp/pull/691 Pull Request resolved: facebookincubator#1330 Final and single aggregations can run multi-threaded as long as data is partitioned on the grouping keys, but we used to artificially restrict these to run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10. To make this work we must not ignore any local exchange node in the query plan. Prestissimo used to silently drop gather and round-robin repartitioning local exchanges. We no longer do that. It is now possible that final aggregation runs after local repartitioning which produces dictionary encoded vectors. It turns out that many aggregate functions are not prepared to handle non-flat intermediate results. To avoid breaking these we forcibly flatten intermediate results before feeding these to aggregate functions. [1] There is now a local gather exchange upstream of the final Limit operator. When Limit finishes early the producing pipeline before the exchange needs to be notified so it can finish early as well. [2] [1] Consider this global aggregation query. ``` presto> explain (type distributed) SELECT avg(discount), avg(quantity) FROM lineitem; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [avg, avg_2] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[_col0, _col1] => [avg:double, avg_2:double] _col0 := avg (1:35) _col1 := avg_2 (1:50) - Aggregate(FINAL) => [avg:double, avg_2:double] avg := "presto.default.avg"((avg_10)) (1:35) avg_2 := "presto.default.avg"((avg_11)) (1:50) - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] Fragment 1 [tpch:orders:15000] Output layout: [avg_10, avg_11] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] avg_10 := "presto.default.avg"((discount)) (1:35) avg_11 := "presto.default.avg"((quantity)) (1:50) - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double] Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00} quantity := tpch:quantity (1:69) discount := tpch:discount (1:69) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL) and there will be a gather local exchange in between. [2] Consider this LIMIT query: ``` presto> explain (type distributed) SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17; Query Plan -------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[orderkey] => [orderkey:bigint] - DistinctLimit[17] => [orderkey:bigint] - LocalExchange[SINGLE] () => [orderkey:bigint] - RemoteSource[1] => [orderkey:bigint] Fragment 1 [tpch:orders:15000] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - DistinctLimitPartial[17] => [orderkey:bigint] - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint] Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00} orderkey := tpch:orderkey (1:49) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between. Differential Revision: D35304854 fbshipit-source-id: 0873d02ca590c815ac6add50b27440bed6fb6087
Summary: X-link: facebookexternal/presto_cpp#691 Pull Request resolved: facebookincubator#1330 Final and single aggregations can run multi-threaded as long as data is partitioned on the grouping keys, but we used to artificially restrict these to run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10. To make this work we must not ignore any local exchange node in the query plan. Prestissimo used to silently drop gather and round-robin repartitioning local exchanges. We no longer do that. [1] Consider this global aggregation query. ``` presto> explain (type distributed) SELECT avg(discount), avg(quantity) FROM lineitem; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [avg, avg_2] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[_col0, _col1] => [avg:double, avg_2:double] _col0 := avg (1:35) _col1 := avg_2 (1:50) - Aggregate(FINAL) => [avg:double, avg_2:double] avg := "presto.default.avg"((avg_10)) (1:35) avg_2 := "presto.default.avg"((avg_11)) (1:50) - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] Fragment 1 [tpch:orders:15000] Output layout: [avg_10, avg_11] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] avg_10 := "presto.default.avg"((discount)) (1:35) avg_11 := "presto.default.avg"((quantity)) (1:50) - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double] Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00} quantity := tpch:quantity (1:69) discount := tpch:discount (1:69) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL) and there will be a gather local exchange in between. [2] Consider this LIMIT query: ``` presto> explain (type distributed) SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17; Query Plan -------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[orderkey] => [orderkey:bigint] - DistinctLimit[17] => [orderkey:bigint] - LocalExchange[SINGLE] () => [orderkey:bigint] - RemoteSource[1] => [orderkey:bigint] Fragment 1 [tpch:orders:15000] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - DistinctLimitPartial[17] => [orderkey:bigint] - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint] Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00} orderkey := tpch:orderkey (1:49) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between. Reviewed By: Yuhta Differential Revision: D35304854 fbshipit-source-id: c45e99bb744f230683fe8972ca3b63f2628e6b3f
Summary: X-link: facebookexternal/presto_cpp#691 Pull Request resolved: #1330 Final and single aggregations can run multi-threaded as long as data is partitioned on the grouping keys, but we used to artificially restrict these to run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10. To make this work we must not ignore any local exchange node in the query plan. Prestissimo used to silently drop gather and round-robin repartitioning local exchanges. We no longer do that. [1] Consider this global aggregation query. ``` presto> explain (type distributed) SELECT avg(discount), avg(quantity) FROM lineitem; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [avg, avg_2] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[_col0, _col1] => [avg:double, avg_2:double] _col0 := avg (1:35) _col1 := avg_2 (1:50) - Aggregate(FINAL) => [avg:double, avg_2:double] avg := "presto.default.avg"((avg_10)) (1:35) avg_2 := "presto.default.avg"((avg_11)) (1:50) - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] Fragment 1 [tpch:orders:15000] Output layout: [avg_10, avg_11] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] avg_10 := "presto.default.avg"((discount)) (1:35) avg_11 := "presto.default.avg"((quantity)) (1:50) - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double] Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00} quantity := tpch:quantity (1:69) discount := tpch:discount (1:69) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL) and there will be a gather local exchange in between. [2] Consider this LIMIT query: ``` presto> explain (type distributed) SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17; Query Plan -------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[orderkey] => [orderkey:bigint] - DistinctLimit[17] => [orderkey:bigint] - LocalExchange[SINGLE] () => [orderkey:bigint] - RemoteSource[1] => [orderkey:bigint] Fragment 1 [tpch:orders:15000] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - DistinctLimitPartial[17] => [orderkey:bigint] - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint] Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00} orderkey := tpch:orderkey (1:49) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between. Reviewed By: Yuhta Differential Revision: D35304854 fbshipit-source-id: a37f8c09f6bb481edcfa29c033c4e4b092eeb173
Summary: X-link: https://github.com/facebookexternal/presto_cpp/pull/691 Pull Request resolved: facebookincubator#1330 Final and single aggregations can run multi-threaded as long as data is partitioned on the grouping keys, but we used to artificially restrict these to run single-threaded. This change lifts the restriction. This allows queries like TPC-H q18 to run a lot faster: 2s vs. 5s for a single-node run over scale 10. To make this work we must not ignore any local exchange node in the query plan. Prestissimo used to silently drop gather and round-robin repartitioning local exchanges. We no longer do that. [1] Consider this global aggregation query. ``` presto> explain (type distributed) SELECT avg(discount), avg(quantity) FROM lineitem; Query Plan ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [avg, avg_2] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[_col0, _col1] => [avg:double, avg_2:double] _col0 := avg (1:35) _col1 := avg_2 (1:50) - Aggregate(FINAL) => [avg:double, avg_2:double] avg := "presto.default.avg"((avg_10)) (1:35) avg_2 := "presto.default.avg"((avg_11)) (1:50) - LocalExchange[SINGLE] () => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] - RemoteSource[1] => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] Fragment 1 [tpch:orders:15000] Output layout: [avg_10, avg_11] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Aggregate(PARTIAL) => [avg_10:row("field0" double, "field1" bigint), avg_11:row("field0" double, "field1" bigint)] avg_10 := "presto.default.avg"((discount)) (1:35) avg_11 := "presto.default.avg"((quantity)) (1:50) - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [quantity:double, discount:double] Estimates: {rows: 60175 (1.03MB), cpu: 1083150.00, memory: 0.00, network: 0.00} quantity := tpch:quantity (1:69) discount := tpch:discount (1:69) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with Aggregate(FINAL) directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - Aggregate(FINAL) and there will be a gather local exchange in between. [2] Consider this LIMIT query: ``` presto> explain (type distributed) SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17; Query Plan -------------------------------------------------------------------------------------------------------------------------------------------------------------------- Fragment 0 [SINGLE] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - Output[orderkey] => [orderkey:bigint] - DistinctLimit[17] => [orderkey:bigint] - LocalExchange[SINGLE] () => [orderkey:bigint] - RemoteSource[1] => [orderkey:bigint] Fragment 1 [tpch:orders:15000] Output layout: [orderkey] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION - DistinctLimitPartial[17] => [orderkey:bigint] - TableScan[TableHandle {connectorId='tpch', connectorHandle='lineitem:sf0.01', layout='Optional[lineitem:sf0.01]'}, grouped = false] => [orderkey:bigint] Estimates: {rows: 60175 (528.88kB), cpu: 541575.00, memory: 0.00, network: 0.00} orderkey := tpch:orderkey (1:49) ``` Fragment 0 includes a gather local exchange: LocalExchange[SINGLE]. Before this change, the exchange was ignored and Velox ran a plan with DistinctLimit node directly over RemoteSource in the same pipeline. The pipeline was restricted to single-threaded because it includes final aggregation and final limit. Now, Velox will run a plan with 2 pipelines. One pipeline will run RemoteSource, another - DistinctLimit and there will be a gather local exchange in between. Reviewed By: Yuhta Differential Revision: D35304854 fbshipit-source-id: a37f8c09f6bb481edcfa29c033c4e4b092eeb173
…#231 (facebookincubator#691) add check_overflow transform support. test by CH[[231]]
This is for the cmake to pick the icu4c installed under ~/deps for our circle ci runs. Or it will use the one under /usr/local/opt/icu4c