Skip to content

Commit

Permalink
Parallel calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Jan 28, 2025
1 parent 2ffdbbf commit 8f25e39
Showing 1 changed file with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,22 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan]
limitOpt: Option[Int],
filters: Seq[Expression],
delta: LogicalRelation): DeltaScan = {
// Remove non-deterministic filters (e.g., rand() < 0.25) to prevent incorrect file pruning.
val deterministicFilters = filters.filter(_.deterministic)
withStatusCode("DELTA", "Filtering files for query") {
if (limitOpt.nonEmpty) {
// If we trigger limit push down, the filters must be partition filters. Since
// there are no data filters, we don't need to apply Generated Columns
// optimization. See `DeltaTableScan` for more details.
return scanGenerator.filesForScan(limitOpt.get, filters)
return scanGenerator.filesForScan(limitOpt.get, deterministicFilters)
}
val filtersForScan =
if (!GeneratedColumn.partitionFilterOptimizationEnabled(spark)) {
filters
deterministicFilters
} else {
val generatedPartitionFilters = GeneratedColumn.generatePartitionFilters(
spark, scanGenerator.snapshotToScan, filters, delta)
filters ++ generatedPartitionFilters
spark, scanGenerator.snapshotToScan, deterministicFilters, delta)
deterministicFilters ++ generatedPartitionFilters
}
scanGenerator.filesForScan(filtersForScan)
}
Expand Down

0 comments on commit 8f25e39

Please sign in to comment.