diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala index 22cde96412567..34a09f71b7a1d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala @@ -264,7 +264,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { }.unzip3 // Setup expand & aggregate operators for distinct aggregate expressions. - val distinctAggChildAttrLookup = distinctAggChildAttrMap.toMap + val distinctAggChildAttrLookup = distinctAggChildAttrMap.filter(!_._1.foldable).toMap val distinctAggFilterAttrLookup = distinctAggFilters.zip(maxConds.map(_.toAttribute)).toMap val distinctAggOperatorMap = distinctAggGroups.toSeq.zipWithIndex.map { case ((group, expressions), i) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index c074a207afe78..b0ae820ae81e2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -1432,6 +1432,16 @@ class DataFrameAggregateSuite extends QueryTest val df = Seq(1).toDF("id").groupBy(Stream($"id" + 1, $"id" + 2): _*).sum("id") checkAnswer(df, Row(2, 3, 1)) } + + test("SPARK-41035: Reuse of literal in distinct aggregations should work") { + val res = sql( + """select a, count(distinct 100), count(distinct b, 100) + |from values (1, 2), (4, 5), (4, 6) as data(a, b) + |group by a; + |""".stripMargin + ) + checkAnswer(res, Row(1, 1, 1) :: Row(4, 1, 2) :: Nil) + } } case class B(c: Option[Double])