From b9c70f51497c7e629914e6ce2d057fcfc4c0cfe0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 3 Dec 2022 06:35:13 -0500 Subject: [PATCH] Unify most `SessionConfig` settings into `ConfigOptions` --- benchmarks/src/bin/tpch.rs | 2 +- .../core/benches/sort_limit_query_sql.rs | 5 +- datafusion/core/src/config.rs | 77 +++++++- datafusion/core/src/dataframe.rs | 10 +- .../core/src/datasource/listing/table.rs | 2 +- .../src/datasource/listing_table_factory.rs | 4 +- datafusion/core/src/execution/context.rs | 176 ++++++++++++------ datafusion/core/src/execution/options.rs | 2 +- .../src/physical_optimizer/enforcement.rs | 2 +- .../src/physical_optimizer/repartition.rs | 4 +- datafusion/core/src/physical_plan/planner.rs | 22 +-- .../test_files/information_schema.slt | 7 + docs/source/user-guide/configs.md | 7 + 13 files changed, 231 insertions(+), 89 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 4bc37f0ae984..3f0342b928c8 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -410,7 +410,7 @@ async fn get_table( let options = ListingOptions::new(format) .with_file_extension(extension) .with_target_partitions(target_partitions) - .with_collect_stat(ctx.config.collect_statistics); + .with_collect_stat(ctx.config.collect_statistics()); let table_path = ListingTableUrl::parse(path)?; let config = ListingTableConfig::new(table_path).with_listing_options(options); diff --git a/datafusion/core/benches/sort_limit_query_sql.rs b/datafusion/core/benches/sort_limit_query_sql.rs index e7aa33bd70bd..efee5de13274 100644 --- a/datafusion/core/benches/sort_limit_query_sql.rs +++ b/datafusion/core/benches/sort_limit_query_sql.rs @@ -23,6 +23,7 @@ use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; +use datafusion::prelude::SessionConfig; use parking_lot::Mutex; use std::sync::Arc; @@ -85,8 +86,8 @@ fn create_context() -> Arc> { rt.block_on(async { // create local session context - let ctx = SessionContext::new(); - ctx.state.write().config.target_partitions = 1; + let ctx = + SessionContext::with_config(SessionConfig::new().with_target_partitions(1)); let table_provider = Arc::new(csv.await); let mem_table = MemTable::load(table_provider, Some(partitions), &ctx.state()) diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs index c738bc0cf27c..cb8d74e4de78 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/core/src/config.rs @@ -27,6 +27,28 @@ use std::env; use std::fmt::{Debug, Formatter}; use std::sync::Arc; +/// Configuration option "datafusion.execution.target_partitions" +pub const OPT_TARGET_PARTITIONS: &str = "datafusion.execution.target_partitions"; + +/// Configuration option "datafusion.catalog.create_default_catalog_and_schema" +pub const OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA: &str = + "datafusion.catalog.create_default_catalog_and_schema"; +/// Configuration option "datafusion.catalog.information_schema" +pub const OPT_INFORMATION_SCHEMA: &str = "datafusion.catalog.information_schema"; + +/// Configuration option "datafusion.optimizer.repartition_joins" +pub const OPT_REPARTITION_JOINS: &str = "datafusion.optimizer.repartition_joins"; + +/// Configuration option "datafusion.optimizer.repartition_aggregations" +pub const OPT_REPARTITION_AGGREGATIONS: &str = + "datafusion.optimizer.repartition_aggregations"; + +/// Configuration option "datafusion.optimizer.repartition_windows" +pub const OPT_REPARTITION_WINDOWS: &str = "datafusion.optimizer.repartition_windows"; + +/// Configuration option "datafusion.execuction_collect_statistics" +pub const OPT_COLLECT_STATISTICS: &str = "datafusion.execuction_collect_statistics"; + /// Configuration option "datafusion.optimizer.filter_null_join_keys" pub const OPT_FILTER_NULL_JOIN_KEYS: &str = "datafusion.optimizer.filter_null_join_keys"; @@ -199,7 +221,54 @@ impl BuiltInConfigs { /// configuration options pub fn new() -> Self { Self { - config_definitions: vec![ConfigDefinition::new_bool( + config_definitions: vec![ConfigDefinition::new_u64( + OPT_TARGET_PARTITIONS, + "Number of partitions for query execution. Increasing partitions can increase \ + concurrency.", + num_cpus::get() as u64, + ), + + ConfigDefinition::new_bool( + OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA, + "Whether the default catalog and schema should be created automatically.", + true + ), + + ConfigDefinition::new_bool( + OPT_INFORMATION_SCHEMA, + "Should DataFusion provide access to `information_schema` \ + virtual tables for displaying schema information", + false + ), + + ConfigDefinition::new_bool( + OPT_REPARTITION_JOINS, + "Should DataFusion repartition data using the join keys to execute joins in parallel \ + using the provided `target_partitions` level", + true + ), + + ConfigDefinition::new_bool( + OPT_REPARTITION_AGGREGATIONS, + "Should DataFusion repartition data using the aggregate keys to execute aggregates \ + in parallel using the provided `target_partitions` level", + true + ), + + ConfigDefinition::new_bool( + OPT_REPARTITION_WINDOWS, + "Should DataFusion collect statistics after listing files", + true + ), + + ConfigDefinition::new_bool( + OPT_COLLECT_STATISTICS, + "Should DataFusion repartition data using the partitions keys to execute window \ + functions in parallel using the provided `target_partitions` level", + false + ), + + ConfigDefinition::new_bool( OPT_FILTER_NULL_JOIN_KEYS, "When set to true, the optimizer will insert filters before a join between \ a nullable and non-nullable column to filter out nulls on the nullable side. This \ @@ -437,6 +506,12 @@ impl ConfigOptions { self.set(key, ScalarValue::UInt64(Some(value))) } + /// set a `usize` configuration option + pub fn set_usize(&mut self, key: &str, value: usize) { + let value: u64 = value.try_into().expect("convert u64 to usize"); + self.set(key, ScalarValue::UInt64(Some(value))) + } + /// set a `String` configuration option pub fn set_string(&mut self, key: &str, value: impl Into) { self.set(key, ScalarValue::Utf8(Some(value.into()))) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index d7b7ccc942c6..10152d99dae9 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -839,6 +839,7 @@ mod tests { use std::vec; use super::*; + use crate::execution::context::SessionConfig; use crate::execution::options::{CsvReadOptions, ParquetReadOptions}; use crate::physical_plan::ColumnarValue; use crate::physical_plan::Partitioning; @@ -1541,8 +1542,7 @@ mod tests { assert_eq!(4016, union_rows.iter().map(|x| x.num_rows()).sum::()); let physical_plan = union.create_physical_plan().await?; - let default_partition_count = - SessionContext::new().copied_config().target_partitions; + let default_partition_count = SessionConfig::new().target_partitions(); // For partition aware union, the output partition count should not be changed. assert_eq!( @@ -1597,8 +1597,7 @@ mod tests { assert_eq!(916, union_rows.iter().map(|x| x.num_rows()).sum::()); let physical_plan = union.create_physical_plan().await?; - let default_partition_count = - SessionContext::new().copied_config().target_partitions; + let default_partition_count = SessionConfig::new().target_partitions(); // For non-partition aware union, the output partitioning count should be the combination of all output partitions count assert!(matches!( @@ -1627,8 +1626,7 @@ mod tests { JoinType::RightAnti, ]; - let default_partition_count = - SessionContext::new().copied_config().target_partitions; + let default_partition_count = SessionConfig::new().target_partitions(); for join_type in all_join_types { let join = left.join( diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 4ca044f0bf5b..22fcb9216bd5 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -169,7 +169,7 @@ impl ListingTableConfig { let listing_options = ListingOptions::new(format) .with_file_extension(file_extension) - .with_target_partitions(state.config.target_partitions); + .with_target_partitions(state.config.target_partitions()); Ok(Self { table_paths: self.table_paths, diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 537288646707..ad6b820baac4 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -125,9 +125,9 @@ impl TableProviderFactory for ListingTableFactory { }; let options = ListingOptions::new(file_format) - .with_collect_stat(state.config.collect_statistics) + .with_collect_stat(state.config.collect_statistics()) .with_file_extension(file_extension) - .with_target_partitions(state.config.target_partitions) + .with_target_partitions(state.config.target_partitions()) .with_table_partition_cols(table_partition_cols) .with_file_sort_order(None); diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 6b6a4fb1f6e6..8cbaf7ea3fd4 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -21,7 +21,11 @@ use crate::{ catalog::{CatalogList, MemoryCatalogList}, information_schema::CatalogWithInformationSchema, }, - config::OPT_PARQUET_ENABLE_PRUNING, + config::{ + OPT_COLLECT_STATISTICS, OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA, + OPT_INFORMATION_SCHEMA, OPT_PARQUET_ENABLE_PRUNING, OPT_REPARTITION_AGGREGATIONS, + OPT_REPARTITION_JOINS, OPT_REPARTITION_WINDOWS, OPT_TARGET_PARTITIONS, + }, datasource::listing::{ListingOptions, ListingTable}, datasource::{MemTable, ViewTable}, logical_expr::{PlanType, ToStringifiedPlan}, @@ -612,7 +616,7 @@ impl SessionContext { options: AvroReadOptions<'_>, ) -> Result> { let table_path = ListingTableUrl::parse(table_path)?; - let target_partitions = self.copied_config().target_partitions; + let target_partitions = self.copied_config().target_partitions(); let listing_options = options.to_listing_options(target_partitions); @@ -639,7 +643,7 @@ impl SessionContext { options: NdJsonReadOptions<'_>, ) -> Result> { let table_path = ListingTableUrl::parse(table_path)?; - let target_partitions = self.copied_config().target_partitions; + let target_partitions = self.copied_config().target_partitions(); let listing_options = options.to_listing_options(target_partitions); @@ -674,7 +678,7 @@ impl SessionContext { options: CsvReadOptions<'_>, ) -> Result> { let table_path = ListingTableUrl::parse(table_path)?; - let target_partitions = self.copied_config().target_partitions; + let target_partitions = self.copied_config().target_partitions(); let listing_options = options.to_listing_options(target_partitions); let resolved_schema = match options.schema { Some(s) => Arc::new(s.to_owned()), @@ -774,7 +778,7 @@ impl SessionContext { options: CsvReadOptions<'_>, ) -> Result<()> { let listing_options = - options.to_listing_options(self.copied_config().target_partitions); + options.to_listing_options(self.copied_config().target_partitions()); self.register_listing_table( name, @@ -797,7 +801,7 @@ impl SessionContext { options: NdJsonReadOptions<'_>, ) -> Result<()> { let listing_options = - options.to_listing_options(self.copied_config().target_partitions); + options.to_listing_options(self.copied_config().target_partitions()); self.register_listing_table( name, @@ -834,7 +838,7 @@ impl SessionContext { options: AvroReadOptions<'_>, ) -> Result<()> { let listing_options = - options.to_listing_options(self.copied_config().target_partitions); + options.to_listing_options(self.copied_config().target_partitions()); self.register_listing_table( name, @@ -859,7 +863,7 @@ impl SessionContext { catalog: Arc, ) -> Option> { let name = name.into(); - let information_schema = self.copied_config().information_schema; + let information_schema = self.copied_config().information_schema(); let state = self.state.read(); let catalog = if information_schema { Arc::new(CatalogWithInformationSchema::new( @@ -1140,28 +1144,11 @@ impl Hasher for IdHasher { /// Configuration options for session context #[derive(Clone)] pub struct SessionConfig { - /// Number of partitions for query execution. Increasing partitions can increase concurrency. - pub target_partitions: usize, /// Default catalog name for table resolution default_catalog: String, - /// Default schema name for table resolution + /// Default schema name for table resolution (not in ConfigOptions + /// due to `resolve_table_ref` which passes back references) default_schema: String, - /// Whether the default catalog and schema should be created automatically - create_default_catalog_and_schema: bool, - /// Should DataFusion provide access to `information_schema` - /// virtual tables for displaying schema information - information_schema: bool, - /// Should DataFusion repartition data using the join keys to execute joins in parallel - /// using the provided `target_partitions` level - pub repartition_joins: bool, - /// Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel - /// using the provided `target_partitions` level - pub repartition_aggregations: bool, - /// Should DataFusion repartition data using the partition keys to execute window functions in - /// parallel using the provided `target_partitions` level - pub repartition_windows: bool, - /// Should DataFusion collect statistics after listing files - pub collect_statistics: bool, /// Configuration options pub config_options: Arc>, /// Opaque extensions. @@ -1171,15 +1158,8 @@ pub struct SessionConfig { impl Default for SessionConfig { fn default() -> Self { Self { - target_partitions: num_cpus::get(), default_catalog: DEFAULT_CATALOG.to_owned(), default_schema: DEFAULT_SCHEMA.to_owned(), - create_default_catalog_and_schema: true, - information_schema: false, - repartition_joins: true, - repartition_aggregations: true, - repartition_windows: true, - collect_statistics: false, config_options: Arc::new(RwLock::new(ConfigOptions::new())), // Assume no extensions by default. extensions: HashMap::with_capacity_and_hasher( @@ -1220,6 +1200,12 @@ impl SessionConfig { self.set(key, ScalarValue::UInt64(Some(value))) } + /// Set a generic `usize` configuration option + pub fn set_usize(self, key: &str, value: usize) -> Self { + let value: u64 = value.try_into().expect("convert usize to u64"); + self.set(key, ScalarValue::UInt64(Some(value))) + } + /// Set a generic `str` configuration option pub fn set_str(self, key: &str, value: &str) -> Self { self.set(key, ScalarValue::Utf8(Some(value.to_string()))) @@ -1232,12 +1218,67 @@ impl SessionConfig { self.set_u64(OPT_BATCH_SIZE, n.try_into().unwrap()) } - /// Customize target_partitions - pub fn with_target_partitions(mut self, n: usize) -> Self { + /// Customize [`OPT_TARGET_PARTITIONS`] + pub fn with_target_partitions(self, n: usize) -> Self { // partition count must be greater than zero assert!(n > 0); - self.target_partitions = n; - self + self.set_usize(OPT_TARGET_PARTITIONS, n) + } + + /// get target_partitions + pub fn target_partitions(&self) -> usize { + self.config_options + .read() + .get_usize(OPT_TARGET_PARTITIONS) + .expect("target partitions must be set") + } + + /// Is the information schema enabled? + pub fn information_schema(&self) -> bool { + self.config_options + .read() + .get_bool(OPT_INFORMATION_SCHEMA) + .unwrap_or_default() + } + + /// Should the context create the default catalog and schema? + pub fn create_default_catalog_and_schema(&self) -> bool { + self.config_options + .read() + .get_bool(OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA) + .unwrap_or_default() + } + + /// Are joins repartitioned during execution? + pub fn repartition_joins(&self) -> bool { + self.config_options + .read() + .get_bool(OPT_REPARTITION_JOINS) + .unwrap_or_default() + } + + /// Are aggregates repartitioned during execution? + pub fn repartition_aggregations(&self) -> bool { + self.config_options + .read() + .get_bool(OPT_REPARTITION_AGGREGATIONS) + .unwrap_or_default() + } + + /// Are window functions repartitioned during execution? + pub fn repartition_window_functions(&self) -> bool { + self.config_options + .read() + .get_bool(OPT_REPARTITION_WINDOWS) + .unwrap_or_default() + } + + /// Are statistics collected during execution? + pub fn collect_statistics(&self) -> bool { + self.config_options + .read() + .get_bool(OPT_COLLECT_STATISTICS) + .unwrap_or_default() } /// Selects a name for the default catalog and schema @@ -1252,32 +1293,42 @@ impl SessionConfig { } /// Controls whether the default catalog and schema will be automatically created - pub fn create_default_catalog_and_schema(mut self, create: bool) -> Self { - self.create_default_catalog_and_schema = create; + pub fn with_create_default_catalog_and_schema(self, create: bool) -> Self { + self.config_options + .write() + .set_bool(OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA, create); self } /// Enables or disables the inclusion of `information_schema` virtual tables - pub fn with_information_schema(mut self, enabled: bool) -> Self { - self.information_schema = enabled; + pub fn with_information_schema(self, enabled: bool) -> Self { + self.config_options + .write() + .set_bool(OPT_INFORMATION_SCHEMA, enabled); self } /// Enables or disables the use of repartitioning for joins to improve parallelism - pub fn with_repartition_joins(mut self, enabled: bool) -> Self { - self.repartition_joins = enabled; + pub fn with_repartition_joins(self, enabled: bool) -> Self { + self.config_options + .write() + .set_bool(OPT_REPARTITION_JOINS, enabled); self } /// Enables or disables the use of repartitioning for aggregations to improve parallelism - pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self { - self.repartition_aggregations = enabled; + pub fn with_repartition_aggregations(self, enabled: bool) -> Self { + self.config_options + .write() + .set_bool(OPT_REPARTITION_AGGREGATIONS, enabled); self } /// Enables or disables the use of repartitioning for window functions to improve parallelism - pub fn with_repartition_windows(mut self, enabled: bool) -> Self { - self.repartition_windows = enabled; + pub fn with_repartition_windows(self, enabled: bool) -> Self { + self.config_options + .write() + .set_bool(OPT_REPARTITION_WINDOWS, enabled); self } @@ -1298,8 +1349,10 @@ impl SessionConfig { } /// Enables or disables the collection of statistics after listing files - pub fn with_collect_statistics(mut self, enabled: bool) -> Self { - self.collect_statistics = enabled; + pub fn with_collect_statistics(self, enabled: bool) -> Self { + self.config_options + .write() + .set_bool(OPT_COLLECT_STATISTICS, enabled); self } @@ -1328,19 +1381,19 @@ impl SessionConfig { } map.insert( TARGET_PARTITIONS.to_owned(), - format!("{}", self.target_partitions), + format!("{}", self.target_partitions()), ); map.insert( REPARTITION_JOINS.to_owned(), - format!("{}", self.repartition_joins), + format!("{}", self.repartition_joins()), ); map.insert( REPARTITION_AGGREGATIONS.to_owned(), - format!("{}", self.repartition_aggregations), + format!("{}", self.repartition_aggregations()), ); map.insert( REPARTITION_WINDOWS.to_owned(), - format!("{}", self.repartition_windows), + format!("{}", self.repartition_window_functions()), ); map.insert( PARQUET_PRUNING.to_owned(), @@ -1348,7 +1401,7 @@ impl SessionConfig { ); map.insert( COLLECT_STATISTICS.to_owned(), - format!("{}", self.collect_statistics), + format!("{}", self.collect_statistics()), ); map @@ -1475,7 +1528,7 @@ impl SessionState { let session_id = Uuid::new_v4().to_string(); let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc; - if config.create_default_catalog_and_schema { + if config.create_default_catalog_and_schema() { let default_catalog = MemoryCatalogProvider::new(); default_catalog @@ -1487,7 +1540,8 @@ impl SessionState { Self::register_default_schema(&config, &runtime, &default_catalog); - let default_catalog: Arc = if config.information_schema { + let default_catalog: Arc = if config.information_schema() + { Arc::new(CatalogWithInformationSchema::new( Arc::downgrade(&catalog_list), Arc::downgrade(&config.config_options), @@ -2272,7 +2326,7 @@ mod tests { #[tokio::test] async fn disabled_default_catalog_and_schema() -> Result<()> { let ctx = SessionContext::with_config( - SessionConfig::new().create_default_catalog_and_schema(false), + SessionConfig::new().with_create_default_catalog_and_schema(false), ); assert!(matches!( @@ -2291,7 +2345,7 @@ mod tests { #[tokio::test] async fn custom_catalog_and_schema() { let config = SessionConfig::new() - .create_default_catalog_and_schema(true) + .with_create_default_catalog_and_schema(true) .with_default_catalog_and_schema("my_catalog", "my_schema"); catalog_and_schema_test(config).await; } @@ -2299,7 +2353,7 @@ mod tests { #[tokio::test] async fn custom_catalog_and_schema_no_default() { let config = SessionConfig::new() - .create_default_catalog_and_schema(false) + .with_create_default_catalog_and_schema(false) .with_default_catalog_and_schema("my_catalog", "my_schema"); catalog_and_schema_test(config).await; } @@ -2307,7 +2361,7 @@ mod tests { #[tokio::test] async fn custom_catalog_and_schema_and_information_schema() { let config = SessionConfig::new() - .create_default_catalog_and_schema(true) + .with_create_default_catalog_and_schema(true) .with_information_schema(true) .with_default_catalog_and_schema("my_catalog", "my_schema"); catalog_and_schema_test(config).await; diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs index 430d10862456..21a82227cef0 100644 --- a/datafusion/core/src/execution/options.rs +++ b/datafusion/core/src/execution/options.rs @@ -223,7 +223,7 @@ impl<'a> ParquetReadOptions<'a> { ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) - .with_target_partitions(config.target_partitions) + .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) } } diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs index cc9070ccb261..3110061c4f2c 100644 --- a/datafusion/core/src/physical_optimizer/enforcement.rs +++ b/datafusion/core/src/physical_optimizer/enforcement.rs @@ -72,7 +72,7 @@ impl PhysicalOptimizerRule for BasicEnforcement { plan: Arc, config: &SessionConfig, ) -> Result> { - let target_partitions = config.target_partitions; + let target_partitions = config.target_partitions(); let top_down_join_key_reordering = config .config_options() .read() diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 1b4f5394695d..42c5e0c3f743 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -223,10 +223,10 @@ impl PhysicalOptimizerRule for Repartition { config: &SessionConfig, ) -> Result> { // Don't run optimizer if target_partitions == 1 - if config.target_partitions == 1 { + if config.target_partitions() == 1 { Ok(plan) } else { - optimize_partitions(config.target_partitions, plan, false, false) + optimize_partitions(config.target_partitions(), plan, false, false) } } diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index d11b27670bf8..b5b6ab8be726 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -522,8 +522,8 @@ impl DefaultPhysicalPlanner { let partition_keys = window_expr_common_partition_keys(window_expr)?; let can_repartition = !partition_keys.is_empty() - && session_state.config.target_partitions > 1 - && session_state.config.repartition_windows; + && session_state.config.target_partitions() > 1 + && session_state.config.repartition_window_functions(); let physical_partition_keys = if can_repartition { @@ -661,8 +661,8 @@ impl DefaultPhysicalPlanner { let final_group: Vec> = initial_aggr.output_group_expr(); let can_repartition = !groups.is_empty() - && session_state.config.target_partitions > 1 - && session_state.config.repartition_aggregations; + && session_state.config.target_partitions() > 1 + && session_state.config.repartition_aggregations(); let (initial_aggr, next_partition_mode): ( Arc, @@ -836,7 +836,7 @@ impl DefaultPhysicalPlanner { }) .collect::>>()?; // If we have a `LIMIT` can run sort/limts in parallel (similar to TopK) - Ok(if fetch.is_some() && session_state.config.target_partitions > 1 { + Ok(if fetch.is_some() && session_state.config.target_partitions() > 1 { let sort = SortExec::new_with_partitioning( sort_expr, physical_input, @@ -937,8 +937,8 @@ impl DefaultPhysicalPlanner { .read() .get_bool(OPT_PREFER_HASH_JOIN) .unwrap_or_default(); - if session_state.config.target_partitions > 1 - && session_state.config.repartition_joins + if session_state.config.target_partitions() > 1 + && session_state.config.repartition_joins() && !prefer_hash_join { // Use SortMergeJoin if hash join is not preferred @@ -957,11 +957,11 @@ impl DefaultPhysicalPlanner { *null_equals_null, )?)) } - } else if session_state.config.target_partitions > 1 - && session_state.config.repartition_joins + } else if session_state.config.target_partitions() > 1 + && session_state.config.repartition_joins() && prefer_hash_join { let partition_mode = { - if session_state.config.collect_statistics { + if session_state.config.collect_statistics() { PartitionMode::Auto } else { PartitionMode::Partitioned @@ -1772,7 +1772,7 @@ mod tests { async fn plan(logical_plan: &LogicalPlan) -> Result> { let mut session_state = make_session_state(); - session_state.config.target_partitions = 4; + session_state.config = session_state.config.with_target_partitions(4); // optimize the logical plan let logical_plan = session_state.optimize(logical_plan)?; let planner = DefaultPhysicalPlanner::default(); diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt index 5ec28b162e98..605b71d19933 100644 --- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt +++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt @@ -19,8 +19,11 @@ query R SHOW ALL ---- +datafusion.catalog.create_default_catalog_and_schema true +datafusion.catalog.information_schema true datafusion.catalog.location NULL datafusion.catalog.type NULL +datafusion.execuction_collect_statistics false datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.coalesce_target_batch_size 4096 @@ -30,6 +33,7 @@ datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false datafusion.execution.parquet.skip_metadata true +datafusion.execution.target_partitions 16 datafusion.execution.time_zone +00:00 datafusion.explain.logical_plan_only false datafusion.explain.physical_plan_only false @@ -37,6 +41,9 @@ datafusion.optimizer.filter_null_join_keys false datafusion.optimizer.hash_join_single_partition_threshold 1048576 datafusion.optimizer.max_passes 3 datafusion.optimizer.prefer_hash_join true +datafusion.optimizer.repartition_aggregations true +datafusion.optimizer.repartition_joins true +datafusion.optimizer.repartition_windows true datafusion.optimizer.skip_failed_rules true datafusion.optimizer.top_down_join_key_reordering true diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 8bb31643337f..841c279b2dc3 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -37,8 +37,11 @@ Environment variables are read during `SessionConfig` initialisation so they mus | key | type | default | description | | --------------------------------------------------------- | ------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.catalog.create_default_catalog_and_schema | Boolean | true | Whether the default catalog and schema should be created automatically. | +| datafusion.catalog.information_schema | Boolean | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | | datafusion.catalog.location | Utf8 | NULL | Location scanned to load tables for `default` schema, defaults to None | | datafusion.catalog.type | Utf8 | NULL | Type of `TableProvider` to use when loading `default` schema. Defaults to None | +| datafusion.execuction_collect_statistics | Boolean | false | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | | datafusion.execution.batch_size | UInt64 | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption. | | datafusion.execution.coalesce_batches | Boolean | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting 'datafusion.execution.coalesce_target_batch_size'. | | datafusion.execution.coalesce_target_batch_size | UInt64 | 4096 | Target batch size when coalescing batches. Uses in conjunction with the configuration setting 'datafusion.execution.coalesce_batches'. | @@ -48,6 +51,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.pushdown_filters | Boolean | false | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. | | datafusion.execution.parquet.reorder_filters | Boolean | false | If true, filter expressions evaluated during the parquet decoding opearation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query. | | datafusion.execution.parquet.skip_metadata | Boolean | true | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata. | +| datafusion.execution.target_partitions | UInt64 | 16 | Number of partitions for query execution. Increasing partitions can increase concurrency. | | datafusion.execution.time_zone | Utf8 | +00:00 | The session time zone which some function require e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, | | then extract the hour. | | datafusion.explain.logical_plan_only | Boolean | false | When set to true, the explain statement will only print logical plans. | @@ -56,5 +60,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.hash_join_single_partition_threshold | UInt64 | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | | datafusion.optimizer.max_passes | UInt64 | 3 | Number of times that the optimizer will attempt to optimize the plan | | datafusion.optimizer.prefer_hash_join | Boolean | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficientlythan SortMergeJoin but consumes more memory. Defaults to true | +| datafusion.optimizer.repartition_aggregations | Boolean | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_joins | Boolean | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_windows | Boolean | true | Should DataFusion collect statistics after listing files | | datafusion.optimizer.skip_failed_rules | Boolean | true | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail. | | datafusion.optimizer.top_down_join_key_reordering | Boolean | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys. Defaults to true |