-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-31973][SQL] Skip partial aggregates if grouping keys have high cardinality #28804
Changes from 32 commits
db8a62d
9a59925
feacdcf
ab98ea4
2e102d1
5fa601b
220eaed
452b632
68dd5a3
692fd1b
f1b6ac1
05c891f
dd3c56a
cb8b922
7952aa7
56c95e2
43237ba
99c1d22
d2873a3
afc2903
7766401
75125d9
3ca81ae
8850777
26a2fd6
c088816
c49f106
69f1d71
c9a415d
ceaa4e5
2ae5525
0a186f0
11572a1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2196,6 +2196,34 @@ object SQLConf { | |
.checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in [10, 30].") | ||
.createWithDefault(16) | ||
|
||
|
||
val SKIP_PARTIAL_AGGREGATE_MINROWS = | ||
buildConf("spark.sql.aggregate.skipPartialAggregate.minNumRows") | ||
.internal() | ||
.doc("Number of records after which aggregate operator checks if " + | ||
"partial aggregation phase can be avoided") | ||
.longConf | ||
.createWithDefault(100000) | ||
|
||
val SKIP_PARTIAL_AGGREGATE_AGGREGATE_RATIO = | ||
buildConf("spark.sql.aggregate.skipPartialAggregate.aggregateRatio") | ||
.internal() | ||
.doc("Ratio of number of records present in map of Aggregate operator" + | ||
"to the total number of records processed by the Aggregate operator") | ||
karuppayya marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.doubleConf | ||
.createWithDefault(0.5) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why we need the two params for this optimiation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, could you check performance numbers by varying the params? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @maropu I have borrowed this heuristic from Hive. We can merge them into one. Any suggestions here? |
||
|
||
val SKIP_PARTIAL_AGGREGATE_ENABLED = | ||
buildConf("spark.sql.aggregate.skipPartialAggregate") | ||
.internal() | ||
.doc("When enabled, the partial aggregation is skipped when the following" + | ||
"two conditions are met. 1. When the total number of records processed is greater" + | ||
s"than threshold defined by ${SKIP_PARTIAL_AGGREGATE_MINROWS.key} 2. When the ratio" + | ||
"of recornd count in map to the total records is less that value defined by " + | ||
s"${SKIP_PARTIAL_AGGREGATE_AGGREGATE_RATIO.key}") | ||
.booleanConf | ||
.createWithDefault(true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we use a threadhold + column stats instead of this boolean config? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didnt get the threshold part. Can you pleas elaborate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That meant a ratio of a distinct row count and total row count in group-by key column stats. For example, if a number There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @maropu for explaining, I will make this change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @maropu This is very useful suggestion. One issue is columns stats are rarely computed. We came across this work in HIVE https://issues.apache.org/jira/browse/HIVE-291. They turn off map side aggregate (i.e., partial aggregate will be pass through) in Physical operator (i.e., Group-By operator) if map-side aggregation reduce the entries by at least half and they look at 100000 rows to do that (ref: patch https://issues.apache.org/jira/secure/attachment/12400257/291.1.txt). Should we do something similar in HashAggregateExec here ? Any thoughts on this ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think whether that approach improves performance depends on IO performance, but the idea looks interesting to me. WDYT? @cloud-fan |
||
|
||
val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec") | ||
.doc("Compression codec used in writing of AVRO files. Supported codecs: " + | ||
"uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.") | ||
|
@@ -2922,6 +2950,12 @@ class SQLConf extends Serializable with Logging { | |
|
||
def fastHashAggregateRowMaxCapacityBit: Int = getConf(FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT) | ||
|
||
def skipPartialAggregate: Boolean = getConf(SKIP_PARTIAL_AGGREGATE_ENABLED) | ||
|
||
def skipPartialAggregateThreshold: Long = getConf(SKIP_PARTIAL_AGGREGATE_MINROWS) | ||
|
||
def skipPartialAggregateRatio: Double = getConf(SKIP_PARTIAL_AGGREGATE_AGGREGATE_RATIO) | ||
|
||
def datetimeJava8ApiEnabled: Boolean = getConf(DATETIME_JAVA8API_ENABLED) | ||
|
||
def uiExplainMode: String = getConf(UI_EXPLAIN_MODE) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it must be data-dependent. If I have 10^10 records, and partial agg cuts it down to 10^8 (1% of the original inputs), it's still worth to do partial agg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan we skip partial aggregartion only when the aggragation was not able to cut down records by 50%(define by spark.sql.aggregate.partialaggregate.skip.ratio). In this case it will not kick in.