Skip to content

Commit

Permalink
Remove the config datafusion.execution.coalesce_target_batch_size and…
Browse files Browse the repository at this point in the history
… use datafusion.execution.batch_size instead (#4757)

* Remove deprecated config related code in context.rs

* Provide coalesce_batches() and round_robin_repartition() methods for SessionConfig

* Remove the config datafusion.execution.coalesce_target_batch_size and use datafusion.execution.batch_size instead

* Fix config readme

Co-authored-by: yangzhong <[email protected]>
  • Loading branch information
yahoNanJing and kyotoYaho authored Dec 30, 2022
1 parent ac876db commit f7477dc
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 142 deletions.
14 changes: 2 additions & 12 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ pub const OPT_BATCH_SIZE: &str = "datafusion.execution.batch_size";
/// Configuration option "datafusion.execution.coalesce_batches"
pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches";

/// Configuration option "datafusion.execution.coalesce_target_batch_size"
pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str =
"datafusion.execution.coalesce_target_batch_size";

/// Configuration option "datafusion.execution.collect_statistics"
pub const OPT_COLLECT_STATISTICS: &str = "datafusion.execution.collect_statistics";

Expand Down Expand Up @@ -317,19 +313,13 @@ impl BuiltInConfigs {
small batches will be coalesced into larger batches. This is helpful when there \
are highly selective filters or joins that could produce tiny output batches. The \
target batch size is determined by the configuration setting \
'{OPT_COALESCE_TARGET_BATCH_SIZE}'."),
'{}'.", OPT_BATCH_SIZE),
true,
),
ConfigDefinition::new_u64(
OPT_COALESCE_TARGET_BATCH_SIZE,
format!("Target batch size when coalescing batches. Uses in conjunction with the \
configuration setting '{OPT_COALESCE_BATCHES}'."),
4096,
),
ConfigDefinition::new_string(
OPT_TIME_ZONE,
"The session time zone which some function require \
e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone,
e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, \
then extract the hour.",
Some("+00:00".into()),
),
Expand Down
128 changes: 52 additions & 76 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::repartition::Repartition;

use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES,
OPT_ENABLE_ROUND_ROBIN_REPARTITION, OPT_FILTER_NULL_JOIN_KEYS,
OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
Expand Down Expand Up @@ -1113,19 +1113,6 @@ impl QueryPlanner for DefaultQueryPlanner {
}
}

/// Session Configuration entry name for 'TARGET_PARTITIONS'
pub const TARGET_PARTITIONS: &str = "target_partitions";
/// Session Configuration entry name for 'REPARTITION_JOINS'
pub const REPARTITION_JOINS: &str = "repartition_joins";
/// Session Configuration entry name for 'REPARTITION_AGGREGATIONS'
pub const REPARTITION_AGGREGATIONS: &str = "repartition_aggregations";
/// Session Configuration entry name for 'REPARTITION_WINDOWS'
pub const REPARTITION_WINDOWS: &str = "repartition_windows";
/// Session Configuration entry name for 'PARQUET_PRUNING'
pub const PARQUET_PRUNING: &str = "parquet_pruning";
/// Session Configuration entry name for 'COLLECT_STATISTICS'
pub const COLLECT_STATISTICS: &str = "collect_statistics";

/// Map that holds opaque objects indexed by their type.
///
/// Data is wrapped into an [`Arc`] to enable [`Clone`] while still being [object safe].
Expand Down Expand Up @@ -1365,45 +1352,33 @@ impl SessionConfig {
.unwrap()
}

/// Convert configuration options to name-value pairs with values
/// converted to strings.
///
/// Note that this method will eventually be deprecated and
/// replaced by [`config_options`].
///
/// [`config_options`]: SessionContext::config_option
pub fn to_props(&self) -> HashMap<String, String> {
let mut map = HashMap::new();
// copy configs from config_options
for (k, v) in self.config_options.options() {
map.insert(k.to_string(), format!("{v}"));
}
map.insert(
TARGET_PARTITIONS.to_owned(),
format!("{}", self.target_partitions()),
);
map.insert(
REPARTITION_JOINS.to_owned(),
format!("{}", self.repartition_joins()),
);
map.insert(
REPARTITION_AGGREGATIONS.to_owned(),
format!("{}", self.repartition_aggregations()),
);
map.insert(
REPARTITION_WINDOWS.to_owned(),
format!("{}", self.repartition_window_functions()),
);
map.insert(
PARQUET_PRUNING.to_owned(),
format!("{}", self.parquet_pruning()),
);
map.insert(
COLLECT_STATISTICS.to_owned(),
format!("{}", self.collect_statistics()),
);
/// Enables or disables the coalescence of small batches into larger batches
pub fn with_coalesce_batches(mut self, enabled: bool) -> Self {
self.config_options.set_bool(OPT_COALESCE_BATCHES, enabled);
self
}

/// Returns true if record batches will be examined between each operator
/// and small batches will be coalesced into larger batches.
pub fn coalesce_batches(&self) -> bool {
self.config_options
.get_bool(OPT_COALESCE_BATCHES)
.unwrap_or_default()
}

map
/// Enables or disables the round robin repartition for increasing parallelism
pub fn with_round_robin_repartition(mut self, enabled: bool) -> Self {
self.config_options
.set_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION, enabled);
self
}

/// Returns true if the physical plan optimizer will try to
/// add round robin repartition to increase parallelism to leverage more CPU cores.
pub fn round_robin_repartition(&self) -> bool {
self.config_options
.get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION)
.unwrap_or_default()
}

/// Return a handle to the configuration options.
Expand Down Expand Up @@ -1563,11 +1538,7 @@ impl SessionState {
// - it's conflicted with some parts of the BasicEnforcement, since it will
// introduce additional repartitioning while the BasicEnforcement aims at
// reducing unnecessary repartitioning.
if config
.config_options
.get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION)
.unwrap_or_default()
{
if config.round_robin_repartition() {
physical_optimizers.push(Arc::new(Repartition::new()));
}
//- Currently it will depend on the partition number to decide whether to change the
Expand Down Expand Up @@ -1599,19 +1570,8 @@ impl SessionState {
physical_optimizers.push(Arc::new(OptimizeSorts::new()));
// It will not influence the distribution and ordering of the whole plan tree.
// Therefore, to avoid influencing other rules, it should be run at last.
if config
.config_options
.get_bool(OPT_COALESCE_BATCHES)
.unwrap_or_default()
{
physical_optimizers.push(Arc::new(CoalesceBatches::new(
config
.config_options
.get_u64(OPT_COALESCE_TARGET_BATCH_SIZE)
.unwrap_or_default()
.try_into()
.unwrap(),
)));
if config.coalesce_batches() {
physical_optimizers.push(Arc::new(CoalesceBatches::new(config.batch_size())));
}
// The PipelineChecker rule will reject non-runnable query plans that use
// pipeline-breaking operators on infinite input(s). The rule generates a
Expand Down Expand Up @@ -1969,30 +1929,46 @@ impl TaskContext {
SessionConfig::new()
.with_batch_size(task_props.get(OPT_BATCH_SIZE).unwrap().parse().unwrap())
.with_target_partitions(
task_props.get(TARGET_PARTITIONS).unwrap().parse().unwrap(),
task_props
.get(OPT_TARGET_PARTITIONS)
.unwrap()
.parse()
.unwrap(),
)
.with_repartition_joins(
task_props.get(REPARTITION_JOINS).unwrap().parse().unwrap(),
task_props
.get(OPT_REPARTITION_JOINS)
.unwrap()
.parse()
.unwrap(),
)
.with_repartition_aggregations(
task_props
.get(REPARTITION_AGGREGATIONS)
.get(OPT_REPARTITION_AGGREGATIONS)
.unwrap()
.parse()
.unwrap(),
)
.with_repartition_windows(
task_props
.get(REPARTITION_WINDOWS)
.get(OPT_REPARTITION_WINDOWS)
.unwrap()
.parse()
.unwrap(),
)
.with_parquet_pruning(
task_props.get(PARQUET_PRUNING).unwrap().parse().unwrap(),
task_props
.get(OPT_PARQUET_ENABLE_PRUNING)
.unwrap()
.parse()
.unwrap(),
)
.with_collect_statistics(
task_props.get(COLLECT_STATISTICS).unwrap().parse().unwrap(),
task_props
.get(OPT_COLLECT_STATISTICS)
.unwrap()
.parse()
.unwrap(),
)
};

Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ pub fn concat_batches(
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE};
use crate::config::OPT_COALESCE_BATCHES;
use crate::datasource::MemTable;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::projection::ProjectionExec;
Expand All @@ -319,9 +319,7 @@ mod tests {

#[tokio::test]
async fn test_custom_batch_size() -> Result<()> {
let ctx = SessionContext::with_config(
SessionConfig::new().set_u64(OPT_COALESCE_TARGET_BATCH_SIZE, 1234),
);
let ctx = SessionContext::with_config(SessionConfig::new().with_batch_size(1234));
let plan = create_physical_plan(ctx).await?;
let projection = plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
let coalesce = projection
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
///
/// ```text
/// ProjectionExec: expr=[a]
/// CoalesceBatchesExec: target_batch_size=4096
/// CoalesceBatchesExec: target_batch_size=8192
/// FilterExec: a < 5
/// RepartitionExec: partitioning=RoundRobinBatch(16)
/// CsvExec: source=...",
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ pub fn with_new_children_if_necessary(
/// let plan_string = plan_string.replace(normalized.as_ref(), "WORKING_DIR");
///
/// assert_eq!("ProjectionExec: expr=[a@0 as a]\
/// \n CoalesceBatchesExec: target_batch_size=4096\
/// \n CoalesceBatchesExec: target_batch_size=8192\
/// \n FilterExec: a@0 < 5\
/// \n RepartitionExec: partitioning=RoundRobinBatch(3)\
/// \n CsvExec: files={1 group: [[WORKING_DIR/tests/example.csv]]}, has_header=true, limit=None, projection=[a]",
Expand Down
13 changes: 6 additions & 7 deletions datafusion/core/tests/config_from_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,17 @@ fn get_config_bool_from_env() {
fn get_config_int_from_env() {
let config_key = "datafusion.execution.batch_size";
let env_key = "DATAFUSION_EXECUTION_BATCH_SIZE";

// for valid testing
env::set_var(env_key, "4096");
let config = ConfigOptions::from_env();
env::remove_var(env_key);
assert_eq!(config.get_u64(config_key).unwrap_or_default(), 4096);
}

#[test]
fn get_config_int_from_env_invalid() {
let config_key = "datafusion.execution.coalesce_target_batch_size";
let env_key = "DATAFUSION_EXECUTION_COALESCE_TARGET_BATCH_SIZE";
// for invalid testing
env::set_var(env_key, "abc");
let config = ConfigOptions::from_env();
assert_eq!(config.get_u64(config_key).unwrap_or_default(), 8192); // set to its default value

// To avoid influence other testing, we need to clear this environment variable
env::remove_var(env_key);
assert_eq!(config.get_u64(config_key).unwrap_or_default(), 4096); // set to its default value
}
14 changes: 10 additions & 4 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use datafusion::{
async fn explain_analyze_baseline_metrics() {
// This test uses the execute function to run an actual plan under EXPLAIN ANALYZE
// and then validate the presence of baseline metrics for supported operators
let config = SessionConfig::new().with_target_partitions(3);
let config = SessionConfig::new()
.with_target_partitions(3)
.with_batch_size(4096);
let ctx = SessionContext::with_config(config);
register_aggregate_csv_by_sql(&ctx).await;
// a query with as many operators as we have metrics for
Expand Down Expand Up @@ -655,7 +657,9 @@ order by
#[tokio::test]
async fn test_physical_plan_display_indent() {
// Hard code target_partitions as it appears in the RepartitionExec output
let config = SessionConfig::new().with_target_partitions(9000);
let config = SessionConfig::new()
.with_target_partitions(9000)
.with_batch_size(4096);
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await.unwrap();
let sql = "SELECT c1, MAX(c12), MIN(c12) as the_min \
Expand Down Expand Up @@ -697,7 +701,9 @@ async fn test_physical_plan_display_indent() {
#[tokio::test]
async fn test_physical_plan_display_indent_multi_children() {
// Hard code target_partitions as it appears in the RepartitionExec output
let config = SessionConfig::new().with_target_partitions(9000);
let config = SessionConfig::new()
.with_target_partitions(9000)
.with_batch_size(4096);
let ctx = SessionContext::with_config(config);
// ensure indenting works for nodes with multiple children
register_aggregate_csv(&ctx).await.unwrap();
Expand Down Expand Up @@ -745,7 +751,7 @@ async fn test_physical_plan_display_indent_multi_children() {
async fn csv_explain() {
// This test uses the execute function that create full plan cycle: logical, optimized logical, and physical,
// then execute the physical plan and return the final explain results
let ctx = SessionContext::new();
let ctx = SessionContext::with_config(SessionConfig::new().with_batch_size(4096));
register_aggregate_csv_by_sql(&ctx).await;
let sql = "EXPLAIN SELECT c1 FROM aggregate_test_100 where c2 > cast(10 as int)";
let actual = execute(&ctx, sql).await;
Expand Down
12 changes: 8 additions & 4 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ fn create_join_context(
let ctx = SessionContext::with_config(
SessionConfig::new()
.with_repartition_joins(repartition_joins)
.with_target_partitions(2),
.with_target_partitions(2)
.with_batch_size(4096),
);

let t1_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -241,7 +242,8 @@ fn create_left_semi_anti_join_context_with_null_ids(
let ctx = SessionContext::with_config(
SessionConfig::new()
.with_repartition_joins(repartition_joins)
.with_target_partitions(2),
.with_target_partitions(2)
.with_batch_size(4096),
);

let t1_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -313,7 +315,8 @@ fn create_right_semi_anti_join_context_with_null_ids(
let ctx = SessionContext::with_config(
SessionConfig::new()
.with_repartition_joins(repartition_joins)
.with_target_partitions(2),
.with_target_partitions(2)
.with_batch_size(4096),
);

let t1_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -618,7 +621,8 @@ fn create_sort_merge_join_datatype_context() -> Result<SessionContext> {
let ctx = SessionContext::with_config(
SessionConfig::new()
.set_bool(OPT_PREFER_HASH_JOIN, false)
.with_target_partitions(2),
.with_target_partitions(2)
.with_batch_size(4096),
);

let t1_schema = Schema::new(vec![
Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/tests/sql/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1738,7 +1738,11 @@ async fn over_order_by_sort_keys_sorting_global_order_compacting() -> Result<()>

#[tokio::test]
async fn test_window_partition_by_order_by() -> Result<()> {
let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
let ctx = SessionContext::with_config(
SessionConfig::new()
.with_target_partitions(2)
.with_batch_size(4096),
);
register_aggregate_csv(&ctx).await?;

let sql = "SELECT \
Expand Down Expand Up @@ -2239,6 +2243,7 @@ async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> {
async fn test_remove_unnecessary_sort_in_sub_query() -> Result<()> {
let config = SessionConfig::new()
.with_target_partitions(8)
.with_batch_size(4096)
.with_repartition_windows(true);
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ datafusion.catalog.location NULL
datafusion.catalog.type NULL
datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.coalesce_target_batch_size 4096
datafusion.execution.collect_statistics false
datafusion.execution.parquet.enable_page_index false
datafusion.execution.parquet.metadata_size_hint NULL
Expand Down
Loading

0 comments on commit f7477dc

Please sign in to comment.