Skip to content

Commit

Permalink
adds flag to force old behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
LukasRupprecht committed Feb 23, 2025
1 parent d520650 commit 64df5f4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2273,6 +2273,16 @@ trait DeltaSQLConfBase {
.checkValue(v => v >= 1, "Must be at least 1.")
.createWithDefault(100)

val DELTA_STATS_COLLECTION_FALLBACK_TO_INTERPRETED_PROJECTION =
buildConf("collectStats.fallbackToInterpretedProjection")
.internal()
.doc("When enabled, the updateStats expression will use the standard code path" +
" that falls back to an interpreted expression if codegen fails. This should" +
" always be true. The config only exists to force the old behavior, which was" +
" to always use codegen.")
.booleanConf
.createWithDefault(true)

val DELTA_CONVERT_ICEBERG_STATS = buildConf("collectStats.convertIceberg")
.internal()
.doc("When enabled, attempts to convert Iceberg stats to Delta stats when cloning from " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.stats
import scala.collection.mutable

import org.apache.spark.sql.delta.expressions.JoinedProjection
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

Expand All @@ -27,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -90,7 +92,19 @@ class DeltaTaskStatisticsTracker(
private val updateStats: MutableProjection = {
val aggs = aggregates.flatMap(_.updateExpressions)
val expressions = JoinedProjection.bind(aggBufferAttrs, dataCols, aggs)
MutableProjection.create(expressions, Nil)
if (SQLConf.get.getConf(
DeltaSQLConf.DELTA_STATS_COLLECTION_FALLBACK_TO_INTERPRETED_PROJECTION)) {
MutableProjection.create(
exprs = expressions,
inputSchema = Nil
)
} else {
GenerateMutableProjection.generate(
expressions = expressions,
inputSchema = Nil,
useSubexprElimination = true
)
}
}

// This executes the whole statsColExpr in order to compute the final stats value for the file.
Expand Down

0 comments on commit 64df5f4

Please sign in to comment.