diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 25b5eac3866..cdb46f48692 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -2273,6 +2273,16 @@ trait DeltaSQLConfBase { .checkValue(v => v >= 1, "Must be at least 1.") .createWithDefault(100) + val DELTA_STATS_COLLECTION_FALLBACK_TO_INTERPRETED_PROJECTION = + buildConf("collectStats.fallbackToInterpretedProjection") + .internal() + .doc("When enabled, the updateStats expression will use the standard code path" + + " that falls back to an interpreted expression if codegen fails. This should" + + " always be true. The config only exists to force the old behavior, which was" + + " to always use codegen.") + .booleanConf + .createWithDefault(true) + val DELTA_CONVERT_ICEBERG_STATS = buildConf("collectStats.convertIceberg") .internal() .doc("When enabled, attempts to convert Iceberg stats to Delta stats when cloning from " + diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingStatsTracker.scala b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingStatsTracker.scala index 5e80c400177..407c6bd2fca 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingStatsTracker.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingStatsTracker.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.stats import scala.collection.mutable import org.apache.spark.sql.delta.expressions.JoinedProjection +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -27,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration @@ -90,7 +92,19 @@ class DeltaTaskStatisticsTracker( private val updateStats: MutableProjection = { val aggs = aggregates.flatMap(_.updateExpressions) val expressions = JoinedProjection.bind(aggBufferAttrs, dataCols, aggs) - MutableProjection.create(expressions, Nil) + if (SQLConf.get.getConf( + DeltaSQLConf.DELTA_STATS_COLLECTION_FALLBACK_TO_INTERPRETED_PROJECTION)) { + MutableProjection.create( + exprs = expressions, + inputSchema = Nil + ) + } else { + GenerateMutableProjection.generate( + expressions = expressions, + inputSchema = Nil, + useSubexprElimination = true + ) + } } // This executes the whole statsColExpr in order to compute the final stats value for the file.