Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
karuppayya committed Aug 11, 2020
1 parent 0a186f0 commit 11572a1
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2202,15 +2202,22 @@ object SQLConf {
.internal()
.doc("Number of records after which aggregate operator checks if " +
"partial aggregation phase can be avoided")
.version("3.1.0")
.longConf
.createWithDefault(100000)

val SKIP_PARTIAL_AGGREGATE_AGGREGATE_RATIO =
buildConf("spark.sql.aggregate.skipPartialAggregate.aggregateRatio")
.internal()
.doc("Ratio of number of records present in map of Aggregate operator" +
"to the total number of records processed by the Aggregate operator")
.doc("Ratio beyond which the partial aggregation is skipped." +
"This is computed by taking the ratio of number of records present" +
" in map of Aggregate operator to the total number of records processed" +
" by the Aggregate operator.")
.version("3.1.0")
.doubleConf
.checkValue(ratio => ratio > 0 && ratio < 1, "Invalid value for " +
"spark.sql.aggregate.skipPartialAggregate.aggregateRatio. Valid value needs" +
" to be between 0 and 1" )
.createWithDefault(0.5)

val SKIP_PARTIAL_AGGREGATE_ENABLED =
Expand All @@ -2219,8 +2226,9 @@ object SQLConf {
.doc("When enabled, the partial aggregation is skipped when the following" +
"two conditions are met. 1. When the total number of records processed is greater" +
s"than threshold defined by ${SKIP_PARTIAL_AGGREGATE_MINROWS.key} 2. When the ratio" +
"of recornd count in map to the total records is less that value defined by " +
"of record count in map to the total records is less that value defined by " +
s"${SKIP_PARTIAL_AGGREGATE_AGGREGATE_RATIO.key}")
.version("3.1.0")
.booleanConf
.createWithDefault(true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
}

test("Avoid spill in partial aggregation" ) {
withSQLConf((SQLConf.SKIP_PARTIAL_AGGREGATE_ENABLED.key, "true"),
(SQLConf.SKIP_PARTIAL_AGGREGATE_MINROWS.key, "2")) {
withSQLConf((SQLConf.SKIP_PARTIAL_AGGREGATE_ENABLED.key -> "true"),
(SQLConf.SKIP_PARTIAL_AGGREGATE_MINROWS.key -> "2")) {
// Create Dataframes
val data = Seq(("James", 1), ("James", 1), ("Phil", 1))
val aggDF = data.toDF("name", "values").groupBy("name").sum("values")
Expand All @@ -73,9 +73,9 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
}
}

test(s"Distinct: Partial aggregation should happen for" +
s" HashAggregate nodes performing partial Aggregate operations " ) {
withSQLConf((SQLConf.SKIP_PARTIAL_AGGREGATE_ENABLED.key, "true")) {
test(s"Distinct: Partial aggregation should happen for " +
"HashAggregate nodes performing partial Aggregate operations " ) {
withSQLConf((SQLConf.SKIP_PARTIAL_AGGREGATE_ENABLED.key -> "true")) {
val aggDF = testData2.select(sumDistinct($"a"), sum($"b"))
val aggNodes = aggDF.queryExecution.executedPlan.collect {
case h: HashAggregateExec => h
Expand Down

0 comments on commit 11572a1

Please sign in to comment.