-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add "Extended Clickbench" benchmark for median and approx_median for high cardinality aggregates #12438
Conversation
…high cardinality aggregates
@@ -2,3 +2,4 @@ SELECT COUNT(DISTINCT "SearchPhrase"), COUNT(DISTINCT "MobilePhone"), COUNT(DIST | |||
SELECT COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserCountry"), COUNT(DISTINCT "BrowserLanguage") FROM hits; | |||
SELECT "BrowserCountry", COUNT(DISTINCT "SocialNetwork"), COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserLanguage"), COUNT(DISTINCT "SocialAction") FROM hits GROUP BY 1 ORDER BY 2 DESC LIMIT 10; | |||
SELECT "SocialSourceNetworkID", "RegionID", COUNT(*), AVG("Age"), AVG("ParamPrice"), STDDEV("ParamPrice") as s, VAR("ParamPrice") FROM hits GROUP BY "SocialSourceNetworkID", "RegionID" HAVING s IS NOT NULL ORDER BY s DESC LIMIT 10; | |||
SELECT MIN("ResponseStartTiming") tmin, MEDIAN("ResponseStartTiming") tmed, approx_percentile_cont("ResponseStartTiming", 0.95) tp95, approx_percentile_cont("ResponseStartTiming", 0.95) tp99, MAX("ResponseStartTiming") tmax, "UserID" FROM hits GROUP BY "UserID" HAVING tmin > 0 AND tmed > 0 ORDER BY tp95 DESC LIMIT 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since by default partial aggregation is skipped in case unique groups / input records > 0.8
, this grouping cardinality is likely not high enough. Maybe it's worth using "WatchID", "ClientIP"
from q32 here, as it for sure benefited from skipping partial aggregation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think q32 may be the typical high cardinality query too, I am modifying the query and checking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran EXPLAIN ANALYZE
on this branch and I believe you are correct that the partial aggregation skipping is not happening
Specifically skipped_aggregation_rows=0
in the partial aggregate:
AggregateExec: mode=Partial, gby=[UserID@0 as UserID], aggr=[min(hits.parquet.ResponseStartTiming), median(hits.parquet.ResponseStartTiming), approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95)), max(hits.parquet.ResponseStartTiming)], metrics=[output_rows=21164213, elapsed_compute=142.856836062s, skipped_aggregation_rows=0]
> EXPLAIN ANALYZE SELECT MIN("ResponseStartTiming") tmin, MEDIAN("ResponseStartTiming") tmed, approx_percentile_cont("ResponseStartTiming", 0.95) tp95, approx_percentile_cont("ResponseStartTiming", 0.95) tp99, MAX("ResponseStartTiming") tmax, "UserID" FROM 'hits.parquet' GROUP BY "UserID" HAVING tmin > 0 AND tmed > 0 ORDER BY tp95 DESC LIMIT 10;
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | SortPreservingMergeExec: [tp95@2 DESC], fetch=10, metrics=[output_rows=10, elapsed_compute=2.666µs] |
| | SortExec: TopK(fetch=10), expr=[tp95@2 DESC], preserve_partitioning=[true], metrics=[output_rows=160, elapsed_compute=54.827251ms, row_replacements=1255] |
| | ProjectionExec: expr=[min(hits.parquet.ResponseStartTiming)@1 as tmin, median(hits.parquet.ResponseStartTiming)@2 as tmed, approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95))@3 as tp95, approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95))@3 as tp99, max(hits.parquet.ResponseStartTiming)@4 as tmax, UserID@0 as UserID], metrics=[output_rows=1994511, elapsed_compute=82.127µs] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1994511, elapsed_compute=250.672µs] |
| | FilterExec: min(hits.parquet.ResponseStartTiming)@1 > 0 AND median(hits.parquet.ResponseStartTiming)@2 > 0, metrics=[output_rows=1994511, elapsed_compute=37.563416ms] |
| | AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID], aggr=[min(hits.parquet.ResponseStartTiming), median(hits.parquet.ResponseStartTiming), approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95)), max(hits.parquet.ResponseStartTiming)], metrics=[output_rows=17630976, elapsed_compute=45.058854603s] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=21164213, elapsed_compute=135.756351ms] |
| | RepartitionExec: partitioning=Hash([UserID@0], 16), input_partitions=16, metrics=[send_time=8.034269627s, repartition_time=2.160110804s, fetch_time=144.660128063s] |
| | AggregateExec: mode=Partial, gby=[UserID@0 as UserID], aggr=[min(hits.parquet.ResponseStartTiming), median(hits.parquet.ResponseStartTiming), approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95)), max(hits.parquet.ResponseStartTiming)], metrics=[output_rows=21164213, elapsed_compute=142.856836062s, skipped_aggregation_rows=0] |
| | ParquetExec: file_groups={16 groups: [[Users/andrewlamb/Downloads/hits.parquet:0..923748528], [Users/andrewlamb/Downloads/hits.parquet:923748528..1847497056], [Users/andrewlamb/Downloads/hits.parquet:1847497056..2771245584], [Users/andrewlamb/Downloads/hits.parquet:2771245584..3694994112], [Users/andrewlamb/Downloads/hits.parquet:3694994112..4618742640], ...]}, projection=[UserID, ResponseStartTiming], metrics=[output_rows=99997497, elapsed_compute=16ns, bytes_scanned=334711589, file_scan_errors=0, row_groups_pruned_bloom_filter=0, row_groups_matched_statistics=0, page_index_rows_filtered=0, row_groups_pruned_statistics=0, pushdown_rows_filtered=0, num_predicate_creation_errors=0, predicate_evaluation_errors=0, file_open_errors=0, row_groups_matched_bloom_filter=0, pushdown_eval_time=32ns, time_elapsed_opening=308.845002ms, time_elapsed_processing=1.314136113s, time_elapsed_scanning_total=82.246142751s, page_index_eval_time=32ns, time_elapsed_scanning_until_data=25.709795ms] |
| | |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched.
Elapsed 14.296 seconds.
main:
> EXPLAIN ANALYZE SELECT MIN("ResponseStartTiming") tmin, MEDIAN("ResponseStartTiming") tmed, approx_percentile_cont("ResponseStartTiming", 0.95) tp95, approx_percentile_cont("ResponseStartTiming", 0.95) tp99, MAX("ResponseStartTiming") tmax, "UserID" FROM 'hits.parquet' GROUP BY "UserID" HAVING tmin > 0 AND tmed > 0 ORDER BY tp95 DESC LIMIT 10;
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | SortPreservingMergeExec: [tp95@2 DESC], fetch=10, metrics=[output_rows=10, elapsed_compute=6.583µs] |
| | SortExec: TopK(fetch=10), expr=[tp95@2 DESC], preserve_partitioning=[true], metrics=[output_rows=160, elapsed_compute=44.490069ms, row_replacements=1259] |
| | ProjectionExec: expr=[min(hits.parquet.ResponseStartTiming)@1 as tmin, median(hits.parquet.ResponseStartTiming)@2 as tmed, approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95))@3 as tp95, approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95))@3 as tp99, max(hits.parquet.ResponseStartTiming)@4 as tmax, UserID@0 as UserID], metrics=[output_rows=1994511, elapsed_compute=80.704µs] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1994511, elapsed_compute=266.832µs] |
| | FilterExec: min(hits.parquet.ResponseStartTiming)@1 > 0 AND median(hits.parquet.ResponseStartTiming)@2 > 0, metrics=[output_rows=1994511, elapsed_compute=50.816018ms] |
| | AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID], aggr=[min(hits.parquet.ResponseStartTiming), median(hits.parquet.ResponseStartTiming), approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95)), max(hits.parquet.ResponseStartTiming)], metrics=[output_rows=17630976, elapsed_compute=43.153463868s] |
| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=21164213, elapsed_compute=86.665877ms] |
| | RepartitionExec: partitioning=Hash([UserID@0], 16), input_partitions=16, metrics=[repartition_time=2.097894133s, fetch_time=141.020531259s, send_time=8.982681573s] |
| | AggregateExec: mode=Partial, gby=[UserID@0 as UserID], aggr=[min(hits.parquet.ResponseStartTiming), median(hits.parquet.ResponseStartTiming), approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95)), max(hits.parquet.ResponseStartTiming)], metrics=[output_rows=21164213, elapsed_compute=139.1215463s] |
| | ParquetExec: file_groups={16 groups: [[Users/andrewlamb/Downloads/hits.parquet:0..923748528], [Users/andrewlamb/Downloads/hits.parquet:923748528..1847497056], [Users/andrewlamb/Downloads/hits.parquet:1847497056..2771245584], [Users/andrewlamb/Downloads/hits.parquet:2771245584..3694994112], [Users/andrewlamb/Downloads/hits.parquet:3694994112..4618742640], ...]}, projection=[UserID, ResponseStartTiming], metrics=[output_rows=99997497, elapsed_compute=16ns, predicate_evaluation_errors=0, file_open_errors=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bytes_scanned=334711589, page_index_rows_filtered=0, num_predicate_creation_errors=0, file_scan_errors=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, pushdown_rows_filtered=0, time_elapsed_opening=285.717582ms, time_elapsed_scanning_until_data=64.90646ms, time_elapsed_scanning_total=80.855628351s, time_elapsed_processing=1.21575998s, page_index_eval_time=32ns, pushdown_eval_time=32ns] |
| | |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I changed it to GROUP BY "UserID","WatchID", "ClientIP"
I see the skipped aggregation rows now:
skipped_aggregation_rows=98293561
AggregateExec: mode=Partial, gby=[UserID@2 as UserID, WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[min(hits.parquet.ResponseStartTiming), median(hits.parquet.ResponseStartTiming), approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95)), max(hits.parquet.ResponseStartTiming)], metrics=[output_rows=99997497, elapsed_compute=246.594546516s, skipped_aggregation_rows=98293561]
> EXPLAIN ANALYZE SELECT MIN("ResponseStartTiming") tmin, MEDIAN("ResponseStartTiming") tmed, approx_percentile_cont("ResponseStartTiming", 0.95) tp95, approx_percentile_cont("ResponseStartTiming", 0.95) tp99, MAX("ResponseStartTiming") tmax, "UserID", "WatchID", "ClientIP" FROM 'hits.parquet' GROUP BY "UserID","WatchID", "ClientIP" HAVING tmin > 0 AND tmed > 0 ORDER BY tp95 DESC LIMIT 10;
And with that change now the query doesn't finish on my laptop (I killed it after using 100GB of memory) but with #11827 it finishes in 45 seconds.
I will think about how to modify this query to reflect the benefit without making it impossibly to run on low resource machines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After creating a q32 like query, seems obvious improvement in my local
(I run it with just a subset of hits_partitioned
to avoid situation that the quey can't finish).
#11827 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After creating a q32 like query, seems obvious improvement in my local
Yes, I agree that #11827 provides an improvement. I just posted a comment #11827 (comment)
Co-authored-by: Eduard Karacharov <[email protected]>
…sion into alamb/test_mediate_perf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you @alamb
Thank you for the review @korowa |
Which issue does this PR close?
Part of #11819
Rationale for this change
As part of reviewing #11827 from @Rachelint I realized there is no benchmark coverage for the code being optimized. Thus we should implement such a benchmark.
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?