Skip to content

Commit

Permalink
Decouple physical optimizer from SessionConfig (#3887) (#4749)
Browse files Browse the repository at this point in the history
* Decouple physical optimizer from SessionConfig (#3887)

* Logical conflicts
  • Loading branch information
tustvold authored Dec 28, 2022
1 parent b686b68 commit d3ca9b0
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 52 deletions.
13 changes: 7 additions & 6 deletions datafusion/core/src/physical_optimizer/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
//! Utilizing exact statistics from sources to avoid scanning data
use std::sync::Arc;

use crate::config::ConfigOptions;
use datafusion_expr::utils::COUNT_STAR_EXPANSION;

use crate::execution::context::SessionConfig;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
use crate::physical_plan::empty::EmptyExec;
use crate::physical_plan::projection::ProjectionExec;
Expand Down Expand Up @@ -51,7 +51,7 @@ impl PhysicalOptimizerRule for AggregateStatistics {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &SessionConfig,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
if let Some(partial_agg_exec) = take_optimizable(&*plan) {
let partial_agg_exec = partial_agg_exec
Expand Down Expand Up @@ -307,9 +307,10 @@ mod tests {
agg: TestAggregate,
) -> Result<()> {
let session_ctx = SessionContext::new();
let conf = session_ctx.copied_config();
let state = session_ctx.state();
let plan = Arc::new(plan) as _;
let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &conf)?;
let optimized = AggregateStatistics::new()
.optimize(Arc::clone(&plan), state.config_options())?;

// A ProjectionExec is a sign that the count optimization was applied
assert!(optimized.as_any().is::<ProjectionExec>());
Expand Down Expand Up @@ -548,7 +549,7 @@ mod tests {
Arc::clone(&schema),
)?;

let conf = SessionConfig::new();
let conf = ConfigOptions::new();
let optimized =
AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;

Expand Down Expand Up @@ -591,7 +592,7 @@ mod tests {
Arc::clone(&schema),
)?;

let conf = SessionConfig::new();
let conf = ConfigOptions::new();
let optimized =
AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;

Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! CoalesceBatches optimizer that groups batches together rows
//! in bigger batches to avoid overhead with small batches
use crate::config::ConfigOptions;
use crate::{
error::Result,
physical_optimizer::PhysicalOptimizerRule,
Expand Down Expand Up @@ -46,7 +47,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
_config: &crate::execution::context::SessionConfig,
_config: &ConfigOptions,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
let target_batch_size = self.target_batch_size;
plan.transform_up(&|plan| {
Expand Down
16 changes: 9 additions & 7 deletions datafusion/core/src/physical_optimizer/enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
//! Enforcement optimizer rules are used to make sure the plan's Distribution and Ordering
//! requirements are met by inserting necessary [[RepartitionExec]] and [[SortExec]].
//!
use crate::config::OPT_TOP_DOWN_JOIN_KEY_REORDERING;
use crate::config::{
ConfigOptions, OPT_TARGET_PARTITIONS, OPT_TOP_DOWN_JOIN_KEY_REORDERING,
};
use crate::error::Result;
use crate::physical_optimizer::utils::{add_sort_above_child, ordering_satisfy};
use crate::physical_optimizer::PhysicalOptimizerRule;
Expand All @@ -34,7 +36,6 @@ use crate::physical_plan::sorts::sort::SortOptions;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::Partitioning;
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
use crate::prelude::SessionConfig;
use arrow::datatypes::SchemaRef;
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::equivalence::EquivalenceProperties;
Expand Down Expand Up @@ -69,11 +70,10 @@ impl PhysicalOptimizerRule for BasicEnforcement {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &SessionConfig,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let target_partitions = config.target_partitions();
let target_partitions = config.get_usize(OPT_TARGET_PARTITIONS).unwrap();
let top_down_join_key_reordering = config
.config_options()
.get_bool(OPT_TOP_DOWN_JOIN_KEY_REORDERING)
.unwrap_or_default();
let new_plan = if top_down_join_key_reordering {
Expand Down Expand Up @@ -1135,10 +1135,12 @@ mod tests {
($EXPECTED_LINES: expr, $PLAN: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();

let mut config = ConfigOptions::new();
config.set_usize(OPT_TARGET_PARTITIONS, 10);

// run optimizer
let optimizer = BasicEnforcement {};
let optimized = optimizer
.optimize($PLAN, &SessionConfig::new().with_target_partitions(10))?;
let optimized = optimizer.optimize($PLAN, &config)?;

// Now format correctly
let plan = displayable(optimized.as_ref()).indent().to_string();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
use std::sync::Arc;

use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::ExecutionPlan;
use crate::prelude::SessionConfig;

/// Currently for a sort operator, if
/// - there are more than one input partitions
Expand All @@ -48,7 +48,7 @@ impl PhysicalOptimizerRule for GlobalSortSelection {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &SessionConfig,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(&|plan| {
Ok(plan
Expand Down
20 changes: 9 additions & 11 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use std::sync::Arc;

use arrow::datatypes::Schema;

use crate::config::OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD;
use crate::execution::context::SessionConfig;
use crate::config::{ConfigOptions, OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD};
use crate::logical_expr::JoinType;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::joins::{
Expand Down Expand Up @@ -211,10 +210,9 @@ impl PhysicalOptimizerRule for JoinSelection {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
session_config: &SessionConfig,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let collect_left_threshold: usize = session_config
.config_options()
let collect_left_threshold: usize = config
.get_u64(OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD)
.unwrap_or_default()
.try_into()
Expand Down Expand Up @@ -508,7 +506,7 @@ mod tests {
.unwrap();

let optimized_join = JoinSelection::new()
.optimize(Arc::new(join), &SessionConfig::new())
.optimize(Arc::new(join), &ConfigOptions::new())
.unwrap();

let swapping_projection = optimized_join
Expand Down Expand Up @@ -556,7 +554,7 @@ mod tests {
.unwrap();

let optimized_join = JoinSelection::new()
.optimize(Arc::new(join), &SessionConfig::new())
.optimize(Arc::new(join), &ConfigOptions::new())
.unwrap();

let swapping_projection = optimized_join
Expand Down Expand Up @@ -609,7 +607,7 @@ mod tests {
let original_schema = join.schema();

let optimized_join = JoinSelection::new()
.optimize(Arc::new(join), &SessionConfig::new())
.optimize(Arc::new(join), &ConfigOptions::new())
.unwrap();

let swapped_join = optimized_join
Expand Down Expand Up @@ -638,7 +636,7 @@ mod tests {
$EXPECTED_LINES.iter().map(|s| *s).collect::<Vec<&str>>();

let optimized = JoinSelection::new()
.optimize(Arc::new($PLAN), &SessionConfig::new())
.optimize(Arc::new($PLAN), &ConfigOptions::new())
.unwrap();

let plan = displayable(optimized.as_ref()).indent().to_string();
Expand Down Expand Up @@ -725,7 +723,7 @@ mod tests {
.unwrap();

let optimized_join = JoinSelection::new()
.optimize(Arc::new(join), &SessionConfig::new())
.optimize(Arc::new(join), &ConfigOptions::new())
.unwrap();

let swapped_join = optimized_join
Expand Down Expand Up @@ -950,7 +948,7 @@ mod tests {
.unwrap();

let optimized_join = JoinSelection::new()
.optimize(Arc::new(join), &SessionConfig::new())
.optimize(Arc::new(join), &ConfigOptions::new())
.unwrap();

if !is_swapped {
Expand Down
24 changes: 12 additions & 12 deletions datafusion/core/src/physical_optimizer/optimize_sorts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
//! " SortExec: [non_nullable_col@1 ASC]",
//! in the physical plan. The first sort is unnecessary since its result is overwritten
//! by another SortExec. Therefore, this rule removes it from the physical plan.
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::{
add_sort_above_child, ordering_satisfy, ordering_satisfy_concrete,
Expand All @@ -34,7 +35,6 @@ use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use crate::prelude::SessionConfig;
use arrow::datatypes::SchemaRef;
use datafusion_common::{reverse_sort_options, DataFusionError};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
Expand Down Expand Up @@ -122,7 +122,7 @@ impl PhysicalOptimizerRule for OptimizeSorts {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &SessionConfig,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
// Execute a post-order traversal to adjust input key ordering:
let plan_requirements = PlanWithCorrespondingSort::new(plan);
Expand Down Expand Up @@ -557,7 +557,7 @@ mod tests {
#[tokio::test]
async fn test_remove_unnecessary_sort() -> Result<()> {
let session_ctx = SessionContext::new();
let conf = session_ctx.copied_config();
let state = session_ctx.state();
let schema = create_test_schema()?;
let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
as Arc<dyn ExecutionPlan>;
Expand Down Expand Up @@ -589,7 +589,7 @@ mod tests {
expected, actual
);
let optimized_physical_plan =
OptimizeSorts::new().optimize(physical_plan, &conf)?;
OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
Expand All @@ -608,7 +608,7 @@ mod tests {
#[tokio::test]
async fn test_remove_unnecessary_sort_window_multilayer() -> Result<()> {
let session_ctx = SessionContext::new();
let conf = session_ctx.copied_config();
let state = session_ctx.state();
let schema = create_test_schema()?;
let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
as Arc<dyn ExecutionPlan>;
Expand Down Expand Up @@ -690,7 +690,7 @@ mod tests {
expected, actual
);
let optimized_physical_plan =
OptimizeSorts::new().optimize(physical_plan, &conf)?;
OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
Expand All @@ -715,7 +715,7 @@ mod tests {
#[tokio::test]
async fn test_add_required_sort() -> Result<()> {
let session_ctx = SessionContext::new();
let conf = session_ctx.copied_config();
let state = session_ctx.state();
let schema = create_test_schema()?;
let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
as Arc<dyn ExecutionPlan>;
Expand All @@ -736,7 +736,7 @@ mod tests {
expected, actual
);
let optimized_physical_plan =
OptimizeSorts::new().optimize(physical_plan, &conf)?;
OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
Expand All @@ -760,7 +760,7 @@ mod tests {
#[tokio::test]
async fn test_remove_unnecessary_sort1() -> Result<()> {
let session_ctx = SessionContext::new();
let conf = session_ctx.copied_config();
let state = session_ctx.state();
let schema = create_test_schema()?;
let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
as Arc<dyn ExecutionPlan>;
Expand Down Expand Up @@ -803,7 +803,7 @@ mod tests {
expected, actual
);
let optimized_physical_plan =
OptimizeSorts::new().optimize(physical_plan, &conf)?;
OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
Expand All @@ -827,7 +827,7 @@ mod tests {
#[tokio::test]
async fn test_change_wrong_sorting() -> Result<()> {
let session_ctx = SessionContext::new();
let conf = session_ctx.copied_config();
let state = session_ctx.state();
let schema = create_test_schema()?;
let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
as Arc<dyn ExecutionPlan>;
Expand Down Expand Up @@ -865,7 +865,7 @@ mod tests {
expected, actual
);
let optimized_physical_plan =
OptimizeSorts::new().optimize(physical_plan, &conf)?;
OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/physical_optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
use std::sync::Arc;

use crate::{
error::Result, execution::context::SessionConfig, physical_plan::ExecutionPlan,
};
use crate::config::ConfigOptions;
use crate::{error::Result, physical_plan::ExecutionPlan};

/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which
/// computes the same results, but in a potentially more efficient
Expand All @@ -31,7 +30,7 @@ pub trait PhysicalOptimizerRule {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &SessionConfig,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>>;

/// A human readable name for this optimizer rule
Expand Down
14 changes: 9 additions & 5 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
use std::sync::Arc;

use super::optimizer::PhysicalOptimizerRule;
use crate::config::{ConfigOptions, OPT_TARGET_PARTITIONS};
use crate::error::Result;
use crate::physical_plan::Partitioning::*;
use crate::physical_plan::{
repartition::RepartitionExec, with_new_children_if_necessary, ExecutionPlan,
};
use crate::{error::Result, execution::context::SessionConfig};

/// Optimizer that introduces repartition to introduce more
/// parallelism in the plan
Expand Down Expand Up @@ -207,14 +208,15 @@ impl PhysicalOptimizerRule for Repartition {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &SessionConfig,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let target_partitions = config.get_usize(OPT_TARGET_PARTITIONS).unwrap();
// Don't run optimizer if target_partitions == 1
if config.target_partitions() == 1 {
if target_partitions == 1 {
Ok(plan)
} else {
optimize_partitions(
config.target_partitions(),
target_partitions,
plan.clone(),
plan.output_ordering().is_none(),
false,
Expand Down Expand Up @@ -360,8 +362,10 @@ mod tests {
($EXPECTED_LINES: expr, $PLAN: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();

let mut config = ConfigOptions::new();
config.set_usize(OPT_TARGET_PARTITIONS, 10);

// run optimizer
let config = SessionConfig::new().with_target_partitions(10);
let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(Repartition::new()),
// The `BasicEnforcement` is an essential rule to be applied.
Expand Down
Loading

0 comments on commit d3ca9b0

Please sign in to comment.