Skip to content

Commit

Permalink
Unify most of SessionConfig settings into ConfigOptions (#4492)
Browse files Browse the repository at this point in the history
* Unify most `SessionConfig` settings into `ConfigOptions`

* Update set target_partitions in show_variable test

* Normalize setting in docs

* fix clippy
  • Loading branch information
alamb authored Dec 5, 2022
1 parent 2a754f8 commit 740a4fa
Show file tree
Hide file tree
Showing 13 changed files with 254 additions and 93 deletions.
2 changes: 1 addition & 1 deletion benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ async fn get_table(
let options = ListingOptions::new(format)
.with_file_extension(extension)
.with_target_partitions(target_partitions)
.with_collect_stat(ctx.config.collect_statistics);
.with_collect_stat(ctx.config.collect_statistics());

let table_path = ListingTableUrl::parse(path)?;
let config = ListingTableConfig::new(table_path).with_listing_options(options);
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};

use datafusion::prelude::SessionConfig;
use parking_lot::Mutex;
use std::sync::Arc;

Expand Down Expand Up @@ -85,8 +86,8 @@ fn create_context() -> Arc<Mutex<SessionContext>> {

rt.block_on(async {
// create local session context
let ctx = SessionContext::new();
ctx.state.write().config.target_partitions = 1;
let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(1));

let table_provider = Arc::new(csv.await);
let mem_table = MemTable::load(table_provider, Some(partitions), &ctx.state())
Expand Down
98 changes: 93 additions & 5 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,28 @@ use std::env;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

/// Configuration option "datafusion.execution.target_partitions"
pub const OPT_TARGET_PARTITIONS: &str = "datafusion.execution.target_partitions";

/// Configuration option "datafusion.catalog.create_default_catalog_and_schema"
pub const OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA: &str =
"datafusion.catalog.create_default_catalog_and_schema";
/// Configuration option "datafusion.catalog.information_schema"
pub const OPT_INFORMATION_SCHEMA: &str = "datafusion.catalog.information_schema";

/// Configuration option "datafusion.optimizer.repartition_joins"
pub const OPT_REPARTITION_JOINS: &str = "datafusion.optimizer.repartition_joins";

/// Configuration option "datafusion.optimizer.repartition_aggregations"
pub const OPT_REPARTITION_AGGREGATIONS: &str =
"datafusion.optimizer.repartition_aggregations";

/// Configuration option "datafusion.optimizer.repartition_windows"
pub const OPT_REPARTITION_WINDOWS: &str = "datafusion.optimizer.repartition_windows";

/// Configuration option "datafusion.execuction_collect_statistics"
pub const OPT_COLLECT_STATISTICS: &str = "datafusion.execuction_collect_statistics";

/// Configuration option "datafusion.optimizer.filter_null_join_keys"
pub const OPT_FILTER_NULL_JOIN_KEYS: &str = "datafusion.optimizer.filter_null_join_keys";

Expand Down Expand Up @@ -199,7 +221,54 @@ impl BuiltInConfigs {
/// configuration options
pub fn new() -> Self {
Self {
config_definitions: vec![ConfigDefinition::new_bool(
config_definitions: vec![ConfigDefinition::new_u64(
OPT_TARGET_PARTITIONS,
"Number of partitions for query execution. Increasing partitions can increase \
concurrency. Defaults to the number of cpu cores on the system.",
num_cpus::get() as u64,
),

ConfigDefinition::new_bool(
OPT_CREATE_DEFAULT_CATALOG_AND_SCHEMA,
"Whether the default catalog and schema should be created automatically.",
true
),

ConfigDefinition::new_bool(
OPT_INFORMATION_SCHEMA,
"Should DataFusion provide access to `information_schema` \
virtual tables for displaying schema information",
false
),

ConfigDefinition::new_bool(
OPT_REPARTITION_JOINS,
"Should DataFusion repartition data using the join keys to execute joins in parallel \
using the provided `target_partitions` level",
true
),

ConfigDefinition::new_bool(
OPT_REPARTITION_AGGREGATIONS,
"Should DataFusion repartition data using the aggregate keys to execute aggregates \
in parallel using the provided `target_partitions` level",
true
),

ConfigDefinition::new_bool(
OPT_REPARTITION_WINDOWS,
"Should DataFusion collect statistics after listing files",
true
),

ConfigDefinition::new_bool(
OPT_COLLECT_STATISTICS,
"Should DataFusion repartition data using the partitions keys to execute window \
functions in parallel using the provided `target_partitions` level",
false
),

ConfigDefinition::new_bool(
OPT_FILTER_NULL_JOIN_KEYS,
"When set to true, the optimizer will insert filters before a join between \
a nullable and non-nullable column to filter out nulls on the nullable side. This \
Expand Down Expand Up @@ -336,11 +405,14 @@ impl BuiltInConfigs {
let configs = Self::new();
let mut docs = "| key | type | default | description |\n".to_string();
docs += "|-----|------|---------|-------------|\n";
for config in configs

let config_definitions: Vec<_> = configs
.config_definitions
.iter()
.sorted_by_key(|c| c.key.as_str())
{
.into_iter()
.map(normalize_for_display)
.collect();

for config in config_definitions.iter().sorted_by_key(|c| c.key.as_str()) {
let _ = writeln!(
&mut docs,
"| {} | {} | {} | {} |",
Expand All @@ -351,6 +423,16 @@ impl BuiltInConfigs {
}
}

/// Normalizes a config definition prior to markdown display
fn normalize_for_display(mut v: ConfigDefinition) -> ConfigDefinition {
// Since the default value of target_partitions depends on the number of cores,
// set the default value to 0 in the docs.
if v.key == OPT_TARGET_PARTITIONS {
v.default_value = ScalarValue::UInt64(Some(0))
}
v
}

/// Configuration options struct. This can contain values for built-in and custom options
#[derive(Clone)]
pub struct ConfigOptions {
Expand Down Expand Up @@ -437,6 +519,12 @@ impl ConfigOptions {
self.set(key, ScalarValue::UInt64(Some(value)))
}

/// set a `usize` configuration option
pub fn set_usize(&mut self, key: &str, value: usize) {
let value: u64 = value.try_into().expect("convert u64 to usize");
self.set(key, ScalarValue::UInt64(Some(value)))
}

/// set a `String` configuration option
pub fn set_string(&mut self, key: &str, value: impl Into<String>) {
self.set(key, ScalarValue::Utf8(Some(value.into())))
Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,7 @@ mod tests {
use std::vec;

use super::*;
use crate::execution::context::SessionConfig;
use crate::execution::options::{CsvReadOptions, ParquetReadOptions};
use crate::physical_plan::ColumnarValue;
use crate::physical_plan::Partitioning;
Expand Down Expand Up @@ -1538,8 +1539,7 @@ mod tests {
assert_eq!(4016, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());

let physical_plan = union.create_physical_plan().await?;
let default_partition_count =
SessionContext::new().copied_config().target_partitions;
let default_partition_count = SessionConfig::new().target_partitions();

// For partition aware union, the output partition count should not be changed.
assert_eq!(
Expand Down Expand Up @@ -1594,8 +1594,7 @@ mod tests {
assert_eq!(916, union_rows.iter().map(|x| x.num_rows()).sum::<usize>());

let physical_plan = union.create_physical_plan().await?;
let default_partition_count =
SessionContext::new().copied_config().target_partitions;
let default_partition_count = SessionConfig::new().target_partitions();

// For non-partition aware union, the output partitioning count should be the combination of all output partitions count
assert!(matches!(
Expand Down Expand Up @@ -1624,8 +1623,7 @@ mod tests {
JoinType::RightAnti,
];

let default_partition_count =
SessionContext::new().copied_config().target_partitions;
let default_partition_count = SessionConfig::new().target_partitions();

for join_type in all_join_types {
let join = left.join(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl ListingTableConfig {

let listing_options = ListingOptions::new(format)
.with_file_extension(file_extension)
.with_target_partitions(state.config.target_partitions);
.with_target_partitions(state.config.target_partitions());

Ok(Self {
table_paths: self.table_paths,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ impl TableProviderFactory for ListingTableFactory {
};

let options = ListingOptions::new(file_format)
.with_collect_stat(state.config.collect_statistics)
.with_collect_stat(state.config.collect_statistics())
.with_file_extension(file_extension)
.with_target_partitions(state.config.target_partitions)
.with_target_partitions(state.config.target_partitions())
.with_table_partition_cols(table_partition_cols)
.with_file_sort_order(None);

Expand Down
Loading

0 comments on commit 740a4fa

Please sign in to comment.