From f7477dc5ddadd27eac7ff186be9362e10a119319 Mon Sep 17 00:00:00 2001 From: yahoNanJing <90197956+yahoNanJing@users.noreply.github.com> Date: Fri, 30 Dec 2022 12:39:59 +0800 Subject: [PATCH] Remove the config datafusion.execution.coalesce_target_batch_size and use datafusion.execution.batch_size instead (#4757) * Remove deprecated config related code in context.rs * Provide coalesce_batches() and round_robin_repartition() methods for SessionConfig * Remove the config datafusion.execution.coalesce_target_batch_size and use datafusion.execution.batch_size instead * Fix config readme Co-authored-by: yangzhong --- datafusion/core/src/config.rs | 14 +- datafusion/core/src/execution/context.rs | 128 +++++++----------- .../src/physical_plan/coalesce_batches.rs | 6 +- datafusion/core/src/physical_plan/display.rs | 2 +- datafusion/core/src/physical_plan/mod.rs | 2 +- datafusion/core/tests/config_from_env.rs | 13 +- datafusion/core/tests/sql/explain_analyze.rs | 14 +- datafusion/core/tests/sql/mod.rs | 12 +- datafusion/core/tests/sql/window.rs | 7 +- .../test_files/information_schema.slt | 1 - docs/source/user-guide/configs.md | 60 ++++---- 11 files changed, 117 insertions(+), 142 deletions(-) diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs index d6504c3d0f0a..508e4be50ec8 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/core/src/config.rs @@ -50,10 +50,6 @@ pub const OPT_BATCH_SIZE: &str = "datafusion.execution.batch_size"; /// Configuration option "datafusion.execution.coalesce_batches" pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches"; -/// Configuration option "datafusion.execution.coalesce_target_batch_size" -pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str = - "datafusion.execution.coalesce_target_batch_size"; - /// Configuration option "datafusion.execution.collect_statistics" pub const OPT_COLLECT_STATISTICS: &str = "datafusion.execution.collect_statistics"; @@ -317,19 +313,13 @@ impl BuiltInConfigs { small batches will be coalesced into larger batches. This is helpful when there \ are highly selective filters or joins that could produce tiny output batches. The \ target batch size is determined by the configuration setting \ - '{OPT_COALESCE_TARGET_BATCH_SIZE}'."), + '{}'.", OPT_BATCH_SIZE), true, - ), - ConfigDefinition::new_u64( - OPT_COALESCE_TARGET_BATCH_SIZE, - format!("Target batch size when coalescing batches. Uses in conjunction with the \ - configuration setting '{OPT_COALESCE_BATCHES}'."), - 4096, ), ConfigDefinition::new_string( OPT_TIME_ZONE, "The session time zone which some function require \ - e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, + e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, \ then extract the hour.", Some("+00:00".into()), ), diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index f5814778b943..4a49e8ec7000 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -72,7 +72,7 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches; use crate::physical_optimizer::repartition::Repartition; use crate::config::{ - ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE, + ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_ENABLE_ROUND_ROBIN_REPARTITION, OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES, }; @@ -1113,19 +1113,6 @@ impl QueryPlanner for DefaultQueryPlanner { } } -/// Session Configuration entry name for 'TARGET_PARTITIONS' -pub const TARGET_PARTITIONS: &str = "target_partitions"; -/// Session Configuration entry name for 'REPARTITION_JOINS' -pub const REPARTITION_JOINS: &str = "repartition_joins"; -/// Session Configuration entry name for 'REPARTITION_AGGREGATIONS' -pub const REPARTITION_AGGREGATIONS: &str = "repartition_aggregations"; -/// Session Configuration entry name for 'REPARTITION_WINDOWS' -pub const REPARTITION_WINDOWS: &str = "repartition_windows"; -/// Session Configuration entry name for 'PARQUET_PRUNING' -pub const PARQUET_PRUNING: &str = "parquet_pruning"; -/// Session Configuration entry name for 'COLLECT_STATISTICS' -pub const COLLECT_STATISTICS: &str = "collect_statistics"; - /// Map that holds opaque objects indexed by their type. /// /// Data is wrapped into an [`Arc`] to enable [`Clone`] while still being [object safe]. @@ -1365,45 +1352,33 @@ impl SessionConfig { .unwrap() } - /// Convert configuration options to name-value pairs with values - /// converted to strings. - /// - /// Note that this method will eventually be deprecated and - /// replaced by [`config_options`]. - /// - /// [`config_options`]: SessionContext::config_option - pub fn to_props(&self) -> HashMap { - let mut map = HashMap::new(); - // copy configs from config_options - for (k, v) in self.config_options.options() { - map.insert(k.to_string(), format!("{v}")); - } - map.insert( - TARGET_PARTITIONS.to_owned(), - format!("{}", self.target_partitions()), - ); - map.insert( - REPARTITION_JOINS.to_owned(), - format!("{}", self.repartition_joins()), - ); - map.insert( - REPARTITION_AGGREGATIONS.to_owned(), - format!("{}", self.repartition_aggregations()), - ); - map.insert( - REPARTITION_WINDOWS.to_owned(), - format!("{}", self.repartition_window_functions()), - ); - map.insert( - PARQUET_PRUNING.to_owned(), - format!("{}", self.parquet_pruning()), - ); - map.insert( - COLLECT_STATISTICS.to_owned(), - format!("{}", self.collect_statistics()), - ); + /// Enables or disables the coalescence of small batches into larger batches + pub fn with_coalesce_batches(mut self, enabled: bool) -> Self { + self.config_options.set_bool(OPT_COALESCE_BATCHES, enabled); + self + } + + /// Returns true if record batches will be examined between each operator + /// and small batches will be coalesced into larger batches. + pub fn coalesce_batches(&self) -> bool { + self.config_options + .get_bool(OPT_COALESCE_BATCHES) + .unwrap_or_default() + } - map + /// Enables or disables the round robin repartition for increasing parallelism + pub fn with_round_robin_repartition(mut self, enabled: bool) -> Self { + self.config_options + .set_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION, enabled); + self + } + + /// Returns true if the physical plan optimizer will try to + /// add round robin repartition to increase parallelism to leverage more CPU cores. + pub fn round_robin_repartition(&self) -> bool { + self.config_options + .get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION) + .unwrap_or_default() } /// Return a handle to the configuration options. @@ -1563,11 +1538,7 @@ impl SessionState { // - it's conflicted with some parts of the BasicEnforcement, since it will // introduce additional repartitioning while the BasicEnforcement aims at // reducing unnecessary repartitioning. - if config - .config_options - .get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION) - .unwrap_or_default() - { + if config.round_robin_repartition() { physical_optimizers.push(Arc::new(Repartition::new())); } //- Currently it will depend on the partition number to decide whether to change the @@ -1599,19 +1570,8 @@ impl SessionState { physical_optimizers.push(Arc::new(OptimizeSorts::new())); // It will not influence the distribution and ordering of the whole plan tree. // Therefore, to avoid influencing other rules, it should be run at last. - if config - .config_options - .get_bool(OPT_COALESCE_BATCHES) - .unwrap_or_default() - { - physical_optimizers.push(Arc::new(CoalesceBatches::new( - config - .config_options - .get_u64(OPT_COALESCE_TARGET_BATCH_SIZE) - .unwrap_or_default() - .try_into() - .unwrap(), - ))); + if config.coalesce_batches() { + physical_optimizers.push(Arc::new(CoalesceBatches::new(config.batch_size()))); } // The PipelineChecker rule will reject non-runnable query plans that use // pipeline-breaking operators on infinite input(s). The rule generates a @@ -1969,30 +1929,46 @@ impl TaskContext { SessionConfig::new() .with_batch_size(task_props.get(OPT_BATCH_SIZE).unwrap().parse().unwrap()) .with_target_partitions( - task_props.get(TARGET_PARTITIONS).unwrap().parse().unwrap(), + task_props + .get(OPT_TARGET_PARTITIONS) + .unwrap() + .parse() + .unwrap(), ) .with_repartition_joins( - task_props.get(REPARTITION_JOINS).unwrap().parse().unwrap(), + task_props + .get(OPT_REPARTITION_JOINS) + .unwrap() + .parse() + .unwrap(), ) .with_repartition_aggregations( task_props - .get(REPARTITION_AGGREGATIONS) + .get(OPT_REPARTITION_AGGREGATIONS) .unwrap() .parse() .unwrap(), ) .with_repartition_windows( task_props - .get(REPARTITION_WINDOWS) + .get(OPT_REPARTITION_WINDOWS) .unwrap() .parse() .unwrap(), ) .with_parquet_pruning( - task_props.get(PARQUET_PRUNING).unwrap().parse().unwrap(), + task_props + .get(OPT_PARQUET_ENABLE_PRUNING) + .unwrap() + .parse() + .unwrap(), ) .with_collect_statistics( - task_props.get(COLLECT_STATISTICS).unwrap().parse().unwrap(), + task_props + .get(OPT_COLLECT_STATISTICS) + .unwrap() + .parse() + .unwrap(), ) }; diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs index 582edc103358..b229f3aa2c0f 100644 --- a/datafusion/core/src/physical_plan/coalesce_batches.rs +++ b/datafusion/core/src/physical_plan/coalesce_batches.rs @@ -308,7 +308,7 @@ pub fn concat_batches( #[cfg(test)] mod tests { use super::*; - use crate::config::{OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE}; + use crate::config::OPT_COALESCE_BATCHES; use crate::datasource::MemTable; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::projection::ProjectionExec; @@ -319,9 +319,7 @@ mod tests { #[tokio::test] async fn test_custom_batch_size() -> Result<()> { - let ctx = SessionContext::with_config( - SessionConfig::new().set_u64(OPT_COALESCE_TARGET_BATCH_SIZE, 1234), - ); + let ctx = SessionContext::with_config(SessionConfig::new().with_batch_size(1234)); let plan = create_physical_plan(ctx).await?; let projection = plan.as_any().downcast_ref::().unwrap(); let coalesce = projection diff --git a/datafusion/core/src/physical_plan/display.rs b/datafusion/core/src/physical_plan/display.rs index 24613484fd02..e3ebc6d40334 100644 --- a/datafusion/core/src/physical_plan/display.rs +++ b/datafusion/core/src/physical_plan/display.rs @@ -74,7 +74,7 @@ impl<'a> DisplayableExecutionPlan<'a> { /// /// ```text /// ProjectionExec: expr=[a] - /// CoalesceBatchesExec: target_batch_size=4096 + /// CoalesceBatchesExec: target_batch_size=8192 /// FilterExec: a < 5 /// RepartitionExec: partitioning=RoundRobinBatch(16) /// CsvExec: source=...", diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 3d6663fbd68c..a30b650ec350 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -336,7 +336,7 @@ pub fn with_new_children_if_necessary( /// let plan_string = plan_string.replace(normalized.as_ref(), "WORKING_DIR"); /// /// assert_eq!("ProjectionExec: expr=[a@0 as a]\ -/// \n CoalesceBatchesExec: target_batch_size=4096\ +/// \n CoalesceBatchesExec: target_batch_size=8192\ /// \n FilterExec: a@0 < 5\ /// \n RepartitionExec: partitioning=RoundRobinBatch(3)\ /// \n CsvExec: files={1 group: [[WORKING_DIR/tests/example.csv]]}, has_header=true, limit=None, projection=[a]", diff --git a/datafusion/core/tests/config_from_env.rs b/datafusion/core/tests/config_from_env.rs index 20d153d8f83b..3f1fcbd601d2 100644 --- a/datafusion/core/tests/config_from_env.rs +++ b/datafusion/core/tests/config_from_env.rs @@ -32,18 +32,17 @@ fn get_config_bool_from_env() { fn get_config_int_from_env() { let config_key = "datafusion.execution.batch_size"; let env_key = "DATAFUSION_EXECUTION_BATCH_SIZE"; + + // for valid testing env::set_var(env_key, "4096"); let config = ConfigOptions::from_env(); - env::remove_var(env_key); assert_eq!(config.get_u64(config_key).unwrap_or_default(), 4096); -} -#[test] -fn get_config_int_from_env_invalid() { - let config_key = "datafusion.execution.coalesce_target_batch_size"; - let env_key = "DATAFUSION_EXECUTION_COALESCE_TARGET_BATCH_SIZE"; + // for invalid testing env::set_var(env_key, "abc"); let config = ConfigOptions::from_env(); + assert_eq!(config.get_u64(config_key).unwrap_or_default(), 8192); // set to its default value + + // To avoid influence other testing, we need to clear this environment variable env::remove_var(env_key); - assert_eq!(config.get_u64(config_key).unwrap_or_default(), 4096); // set to its default value } diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 8574aa9b9585..e950791c8e21 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -25,7 +25,9 @@ use datafusion::{ async fn explain_analyze_baseline_metrics() { // This test uses the execute function to run an actual plan under EXPLAIN ANALYZE // and then validate the presence of baseline metrics for supported operators - let config = SessionConfig::new().with_target_partitions(3); + let config = SessionConfig::new() + .with_target_partitions(3) + .with_batch_size(4096); let ctx = SessionContext::with_config(config); register_aggregate_csv_by_sql(&ctx).await; // a query with as many operators as we have metrics for @@ -655,7 +657,9 @@ order by #[tokio::test] async fn test_physical_plan_display_indent() { // Hard code target_partitions as it appears in the RepartitionExec output - let config = SessionConfig::new().with_target_partitions(9000); + let config = SessionConfig::new() + .with_target_partitions(9000) + .with_batch_size(4096); let ctx = SessionContext::with_config(config); register_aggregate_csv(&ctx).await.unwrap(); let sql = "SELECT c1, MAX(c12), MIN(c12) as the_min \ @@ -697,7 +701,9 @@ async fn test_physical_plan_display_indent() { #[tokio::test] async fn test_physical_plan_display_indent_multi_children() { // Hard code target_partitions as it appears in the RepartitionExec output - let config = SessionConfig::new().with_target_partitions(9000); + let config = SessionConfig::new() + .with_target_partitions(9000) + .with_batch_size(4096); let ctx = SessionContext::with_config(config); // ensure indenting works for nodes with multiple children register_aggregate_csv(&ctx).await.unwrap(); @@ -745,7 +751,7 @@ async fn test_physical_plan_display_indent_multi_children() { async fn csv_explain() { // This test uses the execute function that create full plan cycle: logical, optimized logical, and physical, // then execute the physical plan and return the final explain results - let ctx = SessionContext::new(); + let ctx = SessionContext::with_config(SessionConfig::new().with_batch_size(4096)); register_aggregate_csv_by_sql(&ctx).await; let sql = "EXPLAIN SELECT c1 FROM aggregate_test_100 where c2 > cast(10 as int)"; let actual = execute(&ctx, sql).await; diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 19c9838efa9d..eaf4603dd823 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -187,7 +187,8 @@ fn create_join_context( let ctx = SessionContext::with_config( SessionConfig::new() .with_repartition_joins(repartition_joins) - .with_target_partitions(2), + .with_target_partitions(2) + .with_batch_size(4096), ); let t1_schema = Arc::new(Schema::new(vec![ @@ -241,7 +242,8 @@ fn create_left_semi_anti_join_context_with_null_ids( let ctx = SessionContext::with_config( SessionConfig::new() .with_repartition_joins(repartition_joins) - .with_target_partitions(2), + .with_target_partitions(2) + .with_batch_size(4096), ); let t1_schema = Arc::new(Schema::new(vec![ @@ -313,7 +315,8 @@ fn create_right_semi_anti_join_context_with_null_ids( let ctx = SessionContext::with_config( SessionConfig::new() .with_repartition_joins(repartition_joins) - .with_target_partitions(2), + .with_target_partitions(2) + .with_batch_size(4096), ); let t1_schema = Arc::new(Schema::new(vec![ @@ -618,7 +621,8 @@ fn create_sort_merge_join_datatype_context() -> Result { let ctx = SessionContext::with_config( SessionConfig::new() .set_bool(OPT_PREFER_HASH_JOIN, false) - .with_target_partitions(2), + .with_target_partitions(2) + .with_batch_size(4096), ); let t1_schema = Schema::new(vec![ diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index ccccba6c52b1..d3a2043f1e8a 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -1738,7 +1738,11 @@ async fn over_order_by_sort_keys_sorting_global_order_compacting() -> Result<()> #[tokio::test] async fn test_window_partition_by_order_by() -> Result<()> { - let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2)); + let ctx = SessionContext::with_config( + SessionConfig::new() + .with_target_partitions(2) + .with_batch_size(4096), + ); register_aggregate_csv(&ctx).await?; let sql = "SELECT \ @@ -2239,6 +2243,7 @@ async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> { async fn test_remove_unnecessary_sort_in_sub_query() -> Result<()> { let config = SessionConfig::new() .with_target_partitions(8) + .with_batch_size(4096) .with_repartition_windows(true); let ctx = SessionContext::with_config(config); register_aggregate_csv(&ctx).await?; diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt index da2db8de64de..8eaa7dd439b8 100644 --- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt +++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt @@ -108,7 +108,6 @@ datafusion.catalog.location NULL datafusion.catalog.type NULL datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true -datafusion.execution.coalesce_target_batch_size 4096 datafusion.execution.collect_statistics false datafusion.execution.parquet.enable_page_index false datafusion.execution.parquet.metadata_size_hint NULL diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 1c5f08656388..79245ca1a0c5 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -35,34 +35,32 @@ Values are parsed according to the [same rules used in casts from Utf8](https:// If the value in the environment variable cannot be cast to the type of the configuration option, the default value will be used instead and a warning emitted. Environment variables are read during `SessionConfig` initialisation so they must be set beforehand and will not affect running sessions. -| key | type | default | description | -| --------------------------------------------------------- | ------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| datafusion.catalog.create_default_catalog_and_schema | Boolean | true | Whether the default catalog and schema should be created automatically. | -| datafusion.catalog.information_schema | Boolean | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | -| datafusion.catalog.location | Utf8 | NULL | Location scanned to load tables for `default` schema, defaults to None | -| datafusion.catalog.type | Utf8 | NULL | Type of `TableProvider` to use when loading `default` schema. Defaults to None | -| datafusion.execution.batch_size | UInt64 | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption. | -| datafusion.execution.coalesce_batches | Boolean | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting 'datafusion.execution.coalesce_target_batch_size'. | -| datafusion.execution.coalesce_target_batch_size | UInt64 | 4096 | Target batch size when coalescing batches. Uses in conjunction with the configuration setting 'datafusion.execution.coalesce_batches'. | -| datafusion.execution.collect_statistics | Boolean | false | Should DataFusion collect statistics after listing files | -| datafusion.execution.parquet.enable_page_index | Boolean | false | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded. | -| datafusion.execution.parquet.metadata_size_hint | UInt64 | NULL | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two read are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer. | -| datafusion.execution.parquet.pruning | Boolean | true | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file. | -| datafusion.execution.parquet.pushdown_filters | Boolean | false | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. | -| datafusion.execution.parquet.reorder_filters | Boolean | false | If true, filter expressions evaluated during the parquet decoding opearation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query. | -| datafusion.execution.parquet.skip_metadata | Boolean | true | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata. | -| datafusion.execution.target_partitions | UInt64 | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of cpu cores on the system. | -| datafusion.execution.time_zone | Utf8 | +00:00 | The session time zone which some function require e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, | -| then extract the hour. | -| datafusion.explain.logical_plan_only | Boolean | false | When set to true, the explain statement will only print logical plans. | -| datafusion.explain.physical_plan_only | Boolean | false | When set to true, the explain statement will only print physical plans. | -| datafusion.optimizer.enable_round_robin_repartition | Boolean | true | When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores | -| datafusion.optimizer.filter_null_join_keys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | -| datafusion.optimizer.hash_join_single_partition_threshold | UInt64 | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | -| datafusion.optimizer.max_passes | UInt64 | 3 | Number of times that the optimizer will attempt to optimize the plan | -| datafusion.optimizer.prefer_hash_join | Boolean | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficientlythan SortMergeJoin but consumes more memory. Defaults to true | -| datafusion.optimizer.repartition_aggregations | Boolean | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | -| datafusion.optimizer.repartition_joins | Boolean | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | -| datafusion.optimizer.repartition_windows | Boolean | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | -| datafusion.optimizer.skip_failed_rules | Boolean | true | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail. | -| datafusion.optimizer.top_down_join_key_reordering | Boolean | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys. Defaults to true | +| key | type | default | description | +| --------------------------------------------------------- | ------- | ------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.catalog.create_default_catalog_and_schema | Boolean | true | Whether the default catalog and schema should be created automatically. | +| datafusion.catalog.information_schema | Boolean | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | +| datafusion.catalog.location | Utf8 | NULL | Location scanned to load tables for `default` schema, defaults to None | +| datafusion.catalog.type | Utf8 | NULL | Type of `TableProvider` to use when loading `default` schema. Defaults to None | +| datafusion.execution.batch_size | UInt64 | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption. | +| datafusion.execution.coalesce_batches | Boolean | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting 'datafusion.execution.batch_size'. | +| datafusion.execution.collect_statistics | Boolean | false | Should DataFusion collect statistics after listing files | +| datafusion.execution.parquet.enable_page_index | Boolean | false | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded. | +| datafusion.execution.parquet.metadata_size_hint | UInt64 | NULL | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two read are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer. | +| datafusion.execution.parquet.pruning | Boolean | true | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file. | +| datafusion.execution.parquet.pushdown_filters | Boolean | false | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. | +| datafusion.execution.parquet.reorder_filters | Boolean | false | If true, filter expressions evaluated during the parquet decoding opearation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query. | +| datafusion.execution.parquet.skip_metadata | Boolean | true | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata. | +| datafusion.execution.target_partitions | UInt64 | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of cpu cores on the system. | +| datafusion.execution.time_zone | Utf8 | +00:00 | The session time zone which some function require e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, then extract the hour. | +| datafusion.explain.logical_plan_only | Boolean | false | When set to true, the explain statement will only print logical plans. | +| datafusion.explain.physical_plan_only | Boolean | false | When set to true, the explain statement will only print physical plans. | +| datafusion.optimizer.enable_round_robin_repartition | Boolean | true | When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores | +| datafusion.optimizer.filter_null_join_keys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | +| datafusion.optimizer.hash_join_single_partition_threshold | UInt64 | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | +| datafusion.optimizer.max_passes | UInt64 | 3 | Number of times that the optimizer will attempt to optimize the plan | +| datafusion.optimizer.prefer_hash_join | Boolean | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficientlythan SortMergeJoin but consumes more memory. Defaults to true | +| datafusion.optimizer.repartition_aggregations | Boolean | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_joins | Boolean | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_windows | Boolean | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | +| datafusion.optimizer.skip_failed_rules | Boolean | true | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail. | +| datafusion.optimizer.top_down_join_key_reordering | Boolean | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys. Defaults to true |