Skip to content

Commit

Permalink
Enable PhysicalOptimizerRule lazily (#4806) (#4807)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Jan 3, 2023
1 parent 34a8b86 commit 68dc644
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 62 deletions.
101 changes: 48 additions & 53 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1449,59 +1449,54 @@ impl SessionState {
}

// We need to take care of the rule ordering. They may influence each other.
let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> =
vec![Arc::new(AggregateStatistics::new())];

// - In order to increase the parallelism, it will change the output partitioning
// of some operators in the plan tree, which will influence other rules.
// Therefore, it should be run as soon as possible.
// - The reason to make it optional is
// - it's not used for the distributed engine, Ballista.
// - it's conflicted with some parts of the BasicEnforcement, since it will
// introduce additional repartitioning while the BasicEnforcement aims at
// reducing unnecessary repartitioning.
if config.options.optimizer.enable_round_robin_repartition {
physical_optimizers.push(Arc::new(Repartition::new()));
}
//- Currently it will depend on the partition number to decide whether to change the
// single node sort to parallel local sort and merge. Therefore, it should be run
// after the Repartition.
// - Since it will change the output ordering of some operators, it should be run
// before JoinSelection and BasicEnforcement, which may depend on that.
physical_optimizers.push(Arc::new(GlobalSortSelection::new()));
// Statistics-base join selection will change the Auto mode to real join implementation,
// like collect left, or hash join, or future sort merge join, which will
// influence the BasicEnforcement to decide whether to add additional repartition
// and local sort to meet the distribution and ordering requirements.
// Therefore, it should be run before BasicEnforcement
physical_optimizers.push(Arc::new(JoinSelection::new()));
// If the query is processing infinite inputs, the PipelineFixer rule applies the
// necessary transformations to make the query runnable (if it is not already runnable).
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
// Since the transformations it applies may alter output partitioning properties of operators
// (e.g. by swapping hash join sides), this rule runs before BasicEnforcement.
physical_optimizers.push(Arc::new(PipelineFixer::new()));
// It's for adding essential repartition and local sorting operator to satisfy the
// required distribution and local sort.
// Please make sure that the whole plan tree is determined.
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
// `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
// However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
// These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
// rule below performs this analysis and removes unnecessary `SortExec`s.
physical_optimizers.push(Arc::new(OptimizeSorts::new()));
// It will not influence the distribution and ordering of the whole plan tree.
// Therefore, to avoid influencing other rules, it should be run at last.
if config.options.execution.coalesce_batches {
physical_optimizers.push(Arc::new(CoalesceBatches::new(
config.options.execution.batch_size,
)));
}
// The PipelineChecker rule will reject non-runnable query plans that use
// pipeline-breaking operators on infinite input(s). The rule generates a
// diagnostic error message when this happens. It makes no changes to the
// given query plan; i.e. it only acts as a final gatekeeping rule.
physical_optimizers.push(Arc::new(PipelineChecker::new()));
let physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(AggregateStatistics::new()),
// - In order to increase the parallelism, it will change the output partitioning
// of some operators in the plan tree, which will influence other rules.
// Therefore, it should be run as soon as possible.
// - The reason to make it optional is
// - it's not used for the distributed engine, Ballista.
// - it's conflicted with some parts of the BasicEnforcement, since it will
// introduce additional repartitioning while the BasicEnforcement aims at
// reducing unnecessary repartitioning.
Arc::new(Repartition::new()),
//- Currently it will depend on the partition number to decide whether to change the
// single node sort to parallel local sort and merge. Therefore, it should be run
// after the Repartition.
// - Since it will change the output ordering of some operators, it should be run
// before JoinSelection and BasicEnforcement, which may depend on that.
Arc::new(GlobalSortSelection::new()),
// Statistics-base join selection will change the Auto mode to real join implementation,
// like collect left, or hash join, or future sort merge join, which will
// influence the BasicEnforcement to decide whether to add additional repartition
// and local sort to meet the distribution and ordering requirements.
// Therefore, it should be run before BasicEnforcement
Arc::new(JoinSelection::new()),
// If the query is processing infinite inputs, the PipelineFixer rule applies the
// necessary transformations to make the query runnable (if it is not already runnable).
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
// Since the transformations it applies may alter output partitioning properties of operators
// (e.g. by swapping hash join sides), this rule runs before BasicEnforcement.
Arc::new(PipelineFixer::new()),
// It's for adding essential repartition and local sorting operator to satisfy the
// required distribution and local sort.
// Please make sure that the whole plan tree is determined.
Arc::new(BasicEnforcement::new()),
// `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
// However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
// These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
// rule below performs this analysis and removes unnecessary `SortExec`s.
Arc::new(OptimizeSorts::new()),
// It will not influence the distribution and ordering of the whole plan tree.
// Therefore, to avoid influencing other rules, it should be run at last.
Arc::new(CoalesceBatches::new()),
// The PipelineChecker rule will reject non-runnable query plans that use
// pipeline-breaking operators on infinite input(s). The rule generates a
// diagnostic error message when this happens. It makes no changes to the
// given query plan; i.e. it only acts as a final gatekeeping rule.
Arc::new(PipelineChecker::new()),
];

SessionState {
session_id,
optimizer: Optimizer::new(),
Expand Down
17 changes: 9 additions & 8 deletions datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,25 @@ use std::sync::Arc;
/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that
/// are produced by highly selective filters
#[derive(Default)]
pub struct CoalesceBatches {
/// Target batch size
target_batch_size: usize,
}
pub struct CoalesceBatches {}

impl CoalesceBatches {
#[allow(missing_docs)]
pub fn new(target_batch_size: usize) -> Self {
Self { target_batch_size }
pub fn new() -> Self {
Self::default()
}
}
impl PhysicalOptimizerRule for CoalesceBatches {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
_config: &ConfigOptions,
config: &ConfigOptions,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
let target_batch_size = self.target_batch_size;
if !config.execution.coalesce_batches {
return Ok(plan);
}

let target_batch_size = config.execution.batch_size;
plan.transform_up(&|plan| {
let plan_any = plan.as_any();
// The goal here is to detect operators that could produce small batches and only
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,9 @@ impl PhysicalOptimizerRule for Repartition {
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let target_partitions = config.execution.target_partitions;
let enabled = config.optimizer.enable_round_robin_repartition;
// Don't run optimizer if target_partitions == 1
if target_partitions == 1 {
if !enabled || target_partitions == 1 {
Ok(plan)
} else {
optimize_partitions(
Expand Down

0 comments on commit 68dc644

Please sign in to comment.