Skip to content

Commit

Permalink
another fix
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jun 3, 2018
1 parent 6c4b295 commit 8386b42
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
}

// Extract distinct aggregate expressions.
val distincgAggExpressions = aggExpressions.filter(_.isDistinct)
val distinctAggGroups = distincgAggExpressions.groupBy { e =>
val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e =>
val unfoldableChildren = e.aggregateFunction.children.filter(!_.foldable).toSet
if (unfoldableChildren.nonEmpty) {
// Only expand the unfoldable children
Expand All @@ -133,7 +132,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
}

// Aggregation strategy can handle queries with a single distinct group.
if (distincgAggExpressions.size > 1) {
if (distinctAggGroups.size > 1) {
// Create the attributes for the grouping id and the group by clause.
val gid = AttributeReference("gid", IntegerType, nullable = false)()
val groupByMap = a.groupingExpressions.collect {
Expand All @@ -152,7 +151,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
}

// Setup unique distinct aggregate children.
val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq
val distinctAggChildren = distinctAggGroups.keySet.flatten.toSeq.distinct
val distinctAggChildAttrMap = distinctAggChildren.map(expressionAttributePair)
val distinctAggChildAttrs = distinctAggChildAttrMap.map(_._2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

val (functionsWithDistinct, functionsWithoutDistinct) =
aggregateExpressions.partition(_.isDistinct)
if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) {
if (functionsWithDistinct.map(_.aggregateFunction.children.toSet).distinct.length > 1) {
// This is a sanity check. We should not reach here when we have multiple distinct
// column sets. Our `RewriteDistinctAggregates` should take care this case.
sys.error("You hit a query analyzer bug. Please report your query to " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,27 @@ class PlannerSuite extends SharedSQLContext {
testPartialAggregationPlan(query)
}

test("mixed aggregates with same distinct columns") {
def assertNoExpand(plan: SparkPlan): Unit = {
assert(plan.collect { case e: ExpandExec => e }.isEmpty)
}

withTempView("v") {
Seq((1, 1.0, 1.0), (1, 2.0, 2.0)).toDF("i", "j", "k").createTempView("v")
// one distinct column
val query1 = sql("SELECT sum(DISTINCT j), max(DISTINCT j) FROM v GROUP BY i")
assertNoExpand(query1.queryExecution.executedPlan)

// 2 distinct columns
val query2 = sql("SELECT corr(DISTINCT j, k), count(DISTINCT j, k) FROM v GROUP BY i")
assertNoExpand(query2.queryExecution.executedPlan)

// 2 distinct columns with different order
val query3 = sql("SELECT corr(DISTINCT j, k), count(DISTINCT k, j) FROM v GROUP BY i")
assertNoExpand(query3.queryExecution.executedPlan)
}
}

test("sizeInBytes estimation of limit operator for broadcast hash join optimization") {
def checkPlan(fieldTypes: Seq[DataType]): Unit = {
withTempView("testLimit") {
Expand Down

0 comments on commit 8386b42

Please sign in to comment.