From d3ca9b05b90368fb5a8696a366d727f2b7358b4e Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Wed, 28 Dec 2022 11:58:50 +0000 Subject: [PATCH] Decouple physical optimizer from SessionConfig (#3887) (#4749) * Decouple physical optimizer from SessionConfig (#3887) * Logical conflicts --- .../aggregate_statistics.rs | 13 +++++----- .../physical_optimizer/coalesce_batches.rs | 3 ++- .../src/physical_optimizer/enforcement.rs | 16 +++++++------ .../global_sort_selection.rs | 4 ++-- .../src/physical_optimizer/join_selection.rs | 20 +++++++--------- .../src/physical_optimizer/optimize_sorts.rs | 24 +++++++++---------- .../core/src/physical_optimizer/optimizer.rs | 7 +++--- .../src/physical_optimizer/repartition.rs | 14 +++++++---- .../core/src/physical_optimizer/utils.rs | 6 ++--- datafusion/core/src/physical_plan/planner.rs | 2 +- 10 files changed, 57 insertions(+), 52 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs index ff4c2190fcac..3014f5c54298 100644 --- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs +++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs @@ -18,9 +18,9 @@ //! Utilizing exact statistics from sources to avoid scanning data use std::sync::Arc; +use crate::config::ConfigOptions; use datafusion_expr::utils::COUNT_STAR_EXPANSION; -use crate::execution::context::SessionConfig; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode}; use crate::physical_plan::empty::EmptyExec; use crate::physical_plan::projection::ProjectionExec; @@ -51,7 +51,7 @@ impl PhysicalOptimizerRule for AggregateStatistics { fn optimize( &self, plan: Arc, - config: &SessionConfig, + config: &ConfigOptions, ) -> Result> { if let Some(partial_agg_exec) = take_optimizable(&*plan) { let partial_agg_exec = partial_agg_exec @@ -307,9 +307,10 @@ mod tests { agg: TestAggregate, ) -> Result<()> { let session_ctx = SessionContext::new(); - let conf = session_ctx.copied_config(); + let state = session_ctx.state(); let plan = Arc::new(plan) as _; - let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &conf)?; + let optimized = AggregateStatistics::new() + .optimize(Arc::clone(&plan), state.config_options())?; // A ProjectionExec is a sign that the count optimization was applied assert!(optimized.as_any().is::()); @@ -548,7 +549,7 @@ mod tests { Arc::clone(&schema), )?; - let conf = SessionConfig::new(); + let conf = ConfigOptions::new(); let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; @@ -591,7 +592,7 @@ mod tests { Arc::clone(&schema), )?; - let conf = SessionConfig::new(); + let conf = ConfigOptions::new(); let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?; diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs index e0d20be16646..40de861ecc38 100644 --- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs +++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs @@ -18,6 +18,7 @@ //! CoalesceBatches optimizer that groups batches together rows //! in bigger batches to avoid overhead with small batches +use crate::config::ConfigOptions; use crate::{ error::Result, physical_optimizer::PhysicalOptimizerRule, @@ -46,7 +47,7 @@ impl PhysicalOptimizerRule for CoalesceBatches { fn optimize( &self, plan: Arc, - _config: &crate::execution::context::SessionConfig, + _config: &ConfigOptions, ) -> Result> { let target_batch_size = self.target_batch_size; plan.transform_up(&|plan| { diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs index 30a796191953..59e7a0190d63 100644 --- a/datafusion/core/src/physical_optimizer/enforcement.rs +++ b/datafusion/core/src/physical_optimizer/enforcement.rs @@ -18,7 +18,9 @@ //! Enforcement optimizer rules are used to make sure the plan's Distribution and Ordering //! requirements are met by inserting necessary [[RepartitionExec]] and [[SortExec]]. //! -use crate::config::OPT_TOP_DOWN_JOIN_KEY_REORDERING; +use crate::config::{ + ConfigOptions, OPT_TARGET_PARTITIONS, OPT_TOP_DOWN_JOIN_KEY_REORDERING, +}; use crate::error::Result; use crate::physical_optimizer::utils::{add_sort_above_child, ordering_satisfy}; use crate::physical_optimizer::PhysicalOptimizerRule; @@ -34,7 +36,6 @@ use crate::physical_plan::sorts::sort::SortOptions; use crate::physical_plan::windows::WindowAggExec; use crate::physical_plan::Partitioning; use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan}; -use crate::prelude::SessionConfig; use arrow::datatypes::SchemaRef; use datafusion_expr::logical_plan::JoinType; use datafusion_physical_expr::equivalence::EquivalenceProperties; @@ -69,11 +70,10 @@ impl PhysicalOptimizerRule for BasicEnforcement { fn optimize( &self, plan: Arc, - config: &SessionConfig, + config: &ConfigOptions, ) -> Result> { - let target_partitions = config.target_partitions(); + let target_partitions = config.get_usize(OPT_TARGET_PARTITIONS).unwrap(); let top_down_join_key_reordering = config - .config_options() .get_bool(OPT_TOP_DOWN_JOIN_KEY_REORDERING) .unwrap_or_default(); let new_plan = if top_down_join_key_reordering { @@ -1135,10 +1135,12 @@ mod tests { ($EXPECTED_LINES: expr, $PLAN: expr) => { let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); + let mut config = ConfigOptions::new(); + config.set_usize(OPT_TARGET_PARTITIONS, 10); + // run optimizer let optimizer = BasicEnforcement {}; - let optimized = optimizer - .optimize($PLAN, &SessionConfig::new().with_target_partitions(10))?; + let optimized = optimizer.optimize($PLAN, &config)?; // Now format correctly let plan = displayable(optimized.as_ref()).indent().to_string(); diff --git a/datafusion/core/src/physical_optimizer/global_sort_selection.rs b/datafusion/core/src/physical_optimizer/global_sort_selection.rs index a6bb8229c05b..81b4b59e3a14 100644 --- a/datafusion/core/src/physical_optimizer/global_sort_selection.rs +++ b/datafusion/core/src/physical_optimizer/global_sort_selection.rs @@ -19,13 +19,13 @@ use std::sync::Arc; +use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::rewrite::TreeNodeRewritable; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::ExecutionPlan; -use crate::prelude::SessionConfig; /// Currently for a sort operator, if /// - there are more than one input partitions @@ -48,7 +48,7 @@ impl PhysicalOptimizerRule for GlobalSortSelection { fn optimize( &self, plan: Arc, - _config: &SessionConfig, + _config: &ConfigOptions, ) -> Result> { plan.transform_up(&|plan| { Ok(plan diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 63e7937fe531..69e9e0f4d701 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -20,8 +20,7 @@ use std::sync::Arc; use arrow::datatypes::Schema; -use crate::config::OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD; -use crate::execution::context::SessionConfig; +use crate::config::{ConfigOptions, OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD}; use crate::logical_expr::JoinType; use crate::physical_plan::expressions::Column; use crate::physical_plan::joins::{ @@ -211,10 +210,9 @@ impl PhysicalOptimizerRule for JoinSelection { fn optimize( &self, plan: Arc, - session_config: &SessionConfig, + config: &ConfigOptions, ) -> Result> { - let collect_left_threshold: usize = session_config - .config_options() + let collect_left_threshold: usize = config .get_u64(OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD) .unwrap_or_default() .try_into() @@ -508,7 +506,7 @@ mod tests { .unwrap(); let optimized_join = JoinSelection::new() - .optimize(Arc::new(join), &SessionConfig::new()) + .optimize(Arc::new(join), &ConfigOptions::new()) .unwrap(); let swapping_projection = optimized_join @@ -556,7 +554,7 @@ mod tests { .unwrap(); let optimized_join = JoinSelection::new() - .optimize(Arc::new(join), &SessionConfig::new()) + .optimize(Arc::new(join), &ConfigOptions::new()) .unwrap(); let swapping_projection = optimized_join @@ -609,7 +607,7 @@ mod tests { let original_schema = join.schema(); let optimized_join = JoinSelection::new() - .optimize(Arc::new(join), &SessionConfig::new()) + .optimize(Arc::new(join), &ConfigOptions::new()) .unwrap(); let swapped_join = optimized_join @@ -638,7 +636,7 @@ mod tests { $EXPECTED_LINES.iter().map(|s| *s).collect::>(); let optimized = JoinSelection::new() - .optimize(Arc::new($PLAN), &SessionConfig::new()) + .optimize(Arc::new($PLAN), &ConfigOptions::new()) .unwrap(); let plan = displayable(optimized.as_ref()).indent().to_string(); @@ -725,7 +723,7 @@ mod tests { .unwrap(); let optimized_join = JoinSelection::new() - .optimize(Arc::new(join), &SessionConfig::new()) + .optimize(Arc::new(join), &ConfigOptions::new()) .unwrap(); let swapped_join = optimized_join @@ -950,7 +948,7 @@ mod tests { .unwrap(); let optimized_join = JoinSelection::new() - .optimize(Arc::new(join), &SessionConfig::new()) + .optimize(Arc::new(join), &ConfigOptions::new()) .unwrap(); if !is_swapped { diff --git a/datafusion/core/src/physical_optimizer/optimize_sorts.rs b/datafusion/core/src/physical_optimizer/optimize_sorts.rs index cb421b7b82fd..0a3be1d5b88e 100644 --- a/datafusion/core/src/physical_optimizer/optimize_sorts.rs +++ b/datafusion/core/src/physical_optimizer/optimize_sorts.rs @@ -25,6 +25,7 @@ //! " SortExec: [non_nullable_col@1 ASC]", //! in the physical plan. The first sort is unnecessary since its result is overwritten //! by another SortExec. Therefore, this rule removes it from the physical plan. +use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_optimizer::utils::{ add_sort_above_child, ordering_satisfy, ordering_satisfy_concrete, @@ -34,7 +35,6 @@ use crate::physical_plan::rewrite::TreeNodeRewritable; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::windows::WindowAggExec; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; -use crate::prelude::SessionConfig; use arrow::datatypes::SchemaRef; use datafusion_common::{reverse_sort_options, DataFusionError}; use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; @@ -122,7 +122,7 @@ impl PhysicalOptimizerRule for OptimizeSorts { fn optimize( &self, plan: Arc, - _config: &SessionConfig, + _config: &ConfigOptions, ) -> Result> { // Execute a post-order traversal to adjust input key ordering: let plan_requirements = PlanWithCorrespondingSort::new(plan); @@ -557,7 +557,7 @@ mod tests { #[tokio::test] async fn test_remove_unnecessary_sort() -> Result<()> { let session_ctx = SessionContext::new(); - let conf = session_ctx.copied_config(); + let state = session_ctx.state(); let schema = create_test_schema()?; let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) as Arc; @@ -589,7 +589,7 @@ mod tests { expected, actual ); let optimized_physical_plan = - OptimizeSorts::new().optimize(physical_plan, &conf)?; + OptimizeSorts::new().optimize(physical_plan, state.config_options())?; let formatted = displayable(optimized_physical_plan.as_ref()) .indent() .to_string(); @@ -608,7 +608,7 @@ mod tests { #[tokio::test] async fn test_remove_unnecessary_sort_window_multilayer() -> Result<()> { let session_ctx = SessionContext::new(); - let conf = session_ctx.copied_config(); + let state = session_ctx.state(); let schema = create_test_schema()?; let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) as Arc; @@ -690,7 +690,7 @@ mod tests { expected, actual ); let optimized_physical_plan = - OptimizeSorts::new().optimize(physical_plan, &conf)?; + OptimizeSorts::new().optimize(physical_plan, state.config_options())?; let formatted = displayable(optimized_physical_plan.as_ref()) .indent() .to_string(); @@ -715,7 +715,7 @@ mod tests { #[tokio::test] async fn test_add_required_sort() -> Result<()> { let session_ctx = SessionContext::new(); - let conf = session_ctx.copied_config(); + let state = session_ctx.state(); let schema = create_test_schema()?; let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) as Arc; @@ -736,7 +736,7 @@ mod tests { expected, actual ); let optimized_physical_plan = - OptimizeSorts::new().optimize(physical_plan, &conf)?; + OptimizeSorts::new().optimize(physical_plan, state.config_options())?; let formatted = displayable(optimized_physical_plan.as_ref()) .indent() .to_string(); @@ -760,7 +760,7 @@ mod tests { #[tokio::test] async fn test_remove_unnecessary_sort1() -> Result<()> { let session_ctx = SessionContext::new(); - let conf = session_ctx.copied_config(); + let state = session_ctx.state(); let schema = create_test_schema()?; let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) as Arc; @@ -803,7 +803,7 @@ mod tests { expected, actual ); let optimized_physical_plan = - OptimizeSorts::new().optimize(physical_plan, &conf)?; + OptimizeSorts::new().optimize(physical_plan, state.config_options())?; let formatted = displayable(optimized_physical_plan.as_ref()) .indent() .to_string(); @@ -827,7 +827,7 @@ mod tests { #[tokio::test] async fn test_change_wrong_sorting() -> Result<()> { let session_ctx = SessionContext::new(); - let conf = session_ctx.copied_config(); + let state = session_ctx.state(); let schema = create_test_schema()?; let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) as Arc; @@ -865,7 +865,7 @@ mod tests { expected, actual ); let optimized_physical_plan = - OptimizeSorts::new().optimize(physical_plan, &conf)?; + OptimizeSorts::new().optimize(physical_plan, state.config_options())?; let formatted = displayable(optimized_physical_plan.as_ref()) .indent() .to_string(); diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs index 18cbc139d3e1..26ec137e2b7b 100644 --- a/datafusion/core/src/physical_optimizer/optimizer.rs +++ b/datafusion/core/src/physical_optimizer/optimizer.rs @@ -19,9 +19,8 @@ use std::sync::Arc; -use crate::{ - error::Result, execution::context::SessionConfig, physical_plan::ExecutionPlan, -}; +use crate::config::ConfigOptions; +use crate::{error::Result, physical_plan::ExecutionPlan}; /// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which /// computes the same results, but in a potentially more efficient @@ -31,7 +30,7 @@ pub trait PhysicalOptimizerRule { fn optimize( &self, plan: Arc, - config: &SessionConfig, + config: &ConfigOptions, ) -> Result>; /// A human readable name for this optimizer rule diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 2d3f7a0e1767..66359ebf6220 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -19,11 +19,12 @@ use std::sync::Arc; use super::optimizer::PhysicalOptimizerRule; +use crate::config::{ConfigOptions, OPT_TARGET_PARTITIONS}; +use crate::error::Result; use crate::physical_plan::Partitioning::*; use crate::physical_plan::{ repartition::RepartitionExec, with_new_children_if_necessary, ExecutionPlan, }; -use crate::{error::Result, execution::context::SessionConfig}; /// Optimizer that introduces repartition to introduce more /// parallelism in the plan @@ -207,14 +208,15 @@ impl PhysicalOptimizerRule for Repartition { fn optimize( &self, plan: Arc, - config: &SessionConfig, + config: &ConfigOptions, ) -> Result> { + let target_partitions = config.get_usize(OPT_TARGET_PARTITIONS).unwrap(); // Don't run optimizer if target_partitions == 1 - if config.target_partitions() == 1 { + if target_partitions == 1 { Ok(plan) } else { optimize_partitions( - config.target_partitions(), + target_partitions, plan.clone(), plan.output_ordering().is_none(), false, @@ -360,8 +362,10 @@ mod tests { ($EXPECTED_LINES: expr, $PLAN: expr) => { let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); + let mut config = ConfigOptions::new(); + config.set_usize(OPT_TARGET_PARTITIONS, 10); + // run optimizer - let config = SessionConfig::new().with_target_partitions(10); let optimizers: Vec> = vec![ Arc::new(Repartition::new()), // The `BasicEnforcement` is an essential rule to be applied. diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index 8f1fe2d08213..ae1b58815fda 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -18,8 +18,8 @@ //! Collection of utility functions that are leveraged by the query optimizer rules use super::optimizer::PhysicalOptimizerRule; -use crate::execution::context::SessionConfig; +use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; @@ -36,12 +36,12 @@ use std::sync::Arc; pub fn optimize_children( optimizer: &impl PhysicalOptimizerRule, plan: Arc, - session_config: &SessionConfig, + config: &ConfigOptions, ) -> Result> { let children = plan .children() .iter() - .map(|child| optimizer.optimize(Arc::clone(child), session_config)) + .map(|child| optimizer.optimize(Arc::clone(child), config)) .collect::>>()?; if children.is_empty() { diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 768c42978936..6aa3f627df18 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1783,7 +1783,7 @@ impl DefaultPhysicalPlanner { let mut new_plan = plan; for optimizer in optimizers { let before_schema = new_plan.schema(); - new_plan = optimizer.optimize(new_plan, &session_state.config)?; + new_plan = optimizer.optimize(new_plan, session_state.config_options())?; if optimizer.schema_check() && new_plan.schema() != before_schema { return Err(DataFusionError::Internal(format!( "PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",