Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31973][SQL] Skip partial aggregates if grouping keys have high cardinality #28804

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
db8a62d
Fix: Init commit
karuppayya Jun 12, 2020
9a59925
Fix: Fix UT name
karuppayya Jun 12, 2020
feacdcf
Fix: Fix codegen
karuppayya Jun 12, 2020
ab98ea4
Revert "Fix: Fix codegen"
karuppayya Jun 12, 2020
2e102d1
Fix: Fix codegen logic
karuppayya Jun 12, 2020
5fa601b
Fix: Fix codegen logic
karuppayya Jun 12, 2020
220eaed
Fix: Fix codegen logic
karuppayya Jun 17, 2020
452b632
Fix: clean up
karuppayya Jun 17, 2020
68dd5a3
Fix: remove partialmerge
karuppayya Jun 17, 2020
692fd1b
Fix: fix typo, remove whitelines
karuppayya Jun 17, 2020
f1b6ac1
Fix: Fix UT attempt
karuppayya Jun 18, 2020
05c891f
Fix: Address review comments
karuppayya Jun 18, 2020
dd3c56a
Fix: UT fixes, refactoring
karuppayya Jun 19, 2020
cb8b922
Fix: fix indent
karuppayya Jun 19, 2020
7952aa7
UT: Add more test
karuppayya Jun 19, 2020
56c95e2
Fix UT attempt
karuppayya Jun 19, 2020
43237ba
Enabling the conf to runn all tests with the feature
karuppayya Jun 20, 2020
99c1d22
Unit test fix attempt
karuppayya Jun 24, 2020
d2873a3
UT fix attmpt
karuppayya Jun 24, 2020
afc2903
Ut fix attempt
karuppayya Jun 25, 2020
7766401
Add heuristic
karuppayya Jul 3, 2020
75125d9
Fix: Include missing change, remove unnecessary changes, handle comments
karuppayya Jul 6, 2020
3ca81ae
Refactor: avoid additional code on reducer, fix tests,
karuppayya Jul 8, 2020
8850777
gst
karuppayya Jul 8, 2020
26a2fd6
Address review comments
karuppayya Jul 15, 2020
c088816
Address review commenst
karuppayya Aug 6, 2020
c49f106
Fix forward reference
karuppayya Aug 6, 2020
69f1d71
UT fixes, address review comments
karuppayya Aug 7, 2020
c9a415d
Address review copmments
karuppayya Aug 7, 2020
ceaa4e5
Fix style check
karuppayya Aug 7, 2020
2ae5525
Fix UT
karuppayya Aug 7, 2020
0a186f0
UT fix
karuppayya Aug 7, 2020
11572a1
Address review comments
karuppayya Aug 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2196,6 +2196,34 @@ object SQLConf {
.checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in [10, 30].")
.createWithDefault(16)


val SKIP_PARTIAL_AGGREGATE_MINROWS =
buildConf("spark.sql.aggregate.skipPartialAggregate.minNumRows")
.internal()
.doc("Number of records after which aggregate operator checks if " +
"partial aggregation phase can be avoided")
.longConf
.createWithDefault(100000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it must be data-dependent. If I have 10^10 records, and partial agg cuts it down to 10^8 (1% of the original inputs), it's still worth to do partial agg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan we skip partial aggregartion only when the aggragation was not able to cut down records by 50%(define by spark.sql.aggregate.partialaggregate.skip.ratio). In this case it will not kick in.


val SKIP_PARTIAL_AGGREGATE_AGGREGATE_RATIO =
buildConf("spark.sql.aggregate.skipPartialAggregate.aggregateRatio")
.internal()
.doc("Ratio of number of records present in map of Aggregate operator" +
"to the total number of records processed by the Aggregate operator")
.doubleConf
.createWithDefault(0.5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need the two params for this optimiation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, could you check performance numbers by varying the params?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu I have borrowed this heuristic from Hive. We can merge them into one. Any suggestions here?


val SKIP_PARTIAL_AGGREGATE_ENABLED =
buildConf("spark.sql.aggregate.skipPartialAggregate")
.internal()
.doc("When enabled, the partial aggregation is skipped when the following" +
"two conditions are met. 1. When the total number of records processed is greater" +
s"than threshold defined by ${SKIP_PARTIAL_AGGREGATE_MINROWS.key} 2. When the ratio" +
"of recornd count in map to the total records is less that value defined by " +
s"${SKIP_PARTIAL_AGGREGATE_AGGREGATE_RATIO.key}")
.booleanConf
.createWithDefault(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use a threadhold + column stats instead of this boolean config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didnt get the threshold part. Can you pleas elaborate

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That meant a ratio of a distinct row count and total row count in group-by key column stats. For example, if a number distinctCount / rowCount is close to 1.0, you apply the optimization; otherwise, you don't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @maropu for explaining, I will make this change

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu This is very useful suggestion. One issue is columns stats are rarely computed. We came across this work in HIVE https://issues.apache.org/jira/browse/HIVE-291. They turn off map side aggregate (i.e., partial aggregate will be pass through) in Physical operator (i.e., Group-By operator) if map-side aggregation reduce the entries by at least half and they look at 100000 rows to do that (ref: patch https://issues.apache.org/jira/secure/attachment/12400257/291.1.txt). Should we do something similar in HashAggregateExec here ? Any thoughts on this ?

Copy link
Member

@maropu maropu Jun 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They turn off map side aggregate (i.e., partial aggregate will be pass through) in Physical operator (i.e., Group-By operator) if map-side aggregation reduce the entries by at least half and they look at 100000 rows to do that

I think whether that approach improves performance depends on IO performance, but the idea looks interesting to me. WDYT? @cloud-fan


val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec")
.doc("Compression codec used in writing of AVRO files. Supported codecs: " +
"uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.")
Expand Down Expand Up @@ -2922,6 +2950,12 @@ class SQLConf extends Serializable with Logging {

def fastHashAggregateRowMaxCapacityBit: Int = getConf(FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT)

def skipPartialAggregate: Boolean = getConf(SKIP_PARTIAL_AGGREGATE_ENABLED)

def skipPartialAggregateThreshold: Long = getConf(SKIP_PARTIAL_AGGREGATE_MINROWS)

def skipPartialAggregateRatio: Double = getConf(SKIP_PARTIAL_AGGREGATE_AGGREGATE_RATIO)

def datetimeJava8ApiEnabled: Boolean = getConf(DATETIME_JAVA8API_ENABLED)

def uiExplainMode: String = getConf(UI_EXPLAIN_MODE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public final class UnsafeFixedWidthAggregationMap {
*/
private final UnsafeRow currentAggregationBuffer;

/**
* Number of rows that were added to the map
* This includes the elements that were passed on sorter
* using {@link #destructAndCreateExternalSorter()}
*/
private long numRowsAdded = 0L;

/**
* @return true if UnsafeFixedWidthAggregationMap supports aggregation buffers with the given
* schema, false otherwise.
Expand Down Expand Up @@ -147,6 +154,8 @@ public UnsafeRow getAggregationBufferFromUnsafeRow(UnsafeRow key, int hash) {
);
if (!putSucceeded) {
return null;
} else {
numRowsAdded = numRowsAdded + 1;
}
}

Expand Down Expand Up @@ -249,4 +258,8 @@ public UnsafeKVExternalSorter destructAndCreateExternalSorter() throws IOExcepti
package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD()),
map);
}

public long getNumRows() {
return numRowsAdded;
}
}
Loading