Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the config datafusion.execution.coalesce_target_batch_size and use datafusion.execution.batch_size instead #4757

Merged
merged 5 commits into from
Dec 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ pub const OPT_BATCH_SIZE: &str = "datafusion.execution.batch_size";
/// Configuration option "datafusion.execution.coalesce_batches"
pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches";

/// Configuration option "datafusion.execution.coalesce_target_batch_size"
pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str =
"datafusion.execution.coalesce_target_batch_size";

/// Configuration option "datafusion.execution.collect_statistics"
pub const OPT_COLLECT_STATISTICS: &str = "datafusion.execution.collect_statistics";

Expand Down Expand Up @@ -317,19 +313,13 @@ impl BuiltInConfigs {
small batches will be coalesced into larger batches. This is helpful when there \
are highly selective filters or joins that could produce tiny output batches. The \
target batch size is determined by the configuration setting \
'{OPT_COALESCE_TARGET_BATCH_SIZE}'."),
'{}'.", OPT_BATCH_SIZE),
true,
),
ConfigDefinition::new_u64(
OPT_COALESCE_TARGET_BATCH_SIZE,
format!("Target batch size when coalescing batches. Uses in conjunction with the \
configuration setting '{OPT_COALESCE_BATCHES}'."),
4096,
),
ConfigDefinition::new_string(
OPT_TIME_ZONE,
"The session time zone which some function require \
e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone,
e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, \
then extract the hour.",
Some("+00:00".into()),
),
Expand Down
128 changes: 52 additions & 76 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::repartition::Repartition;

use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES,
OPT_ENABLE_ROUND_ROBIN_REPARTITION, OPT_FILTER_NULL_JOIN_KEYS,
OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
Expand Down Expand Up @@ -1113,19 +1113,6 @@ impl QueryPlanner for DefaultQueryPlanner {
}
}

/// Session Configuration entry name for 'TARGET_PARTITIONS'
pub const TARGET_PARTITIONS: &str = "target_partitions";
/// Session Configuration entry name for 'REPARTITION_JOINS'
pub const REPARTITION_JOINS: &str = "repartition_joins";
/// Session Configuration entry name for 'REPARTITION_AGGREGATIONS'
pub const REPARTITION_AGGREGATIONS: &str = "repartition_aggregations";
/// Session Configuration entry name for 'REPARTITION_WINDOWS'
pub const REPARTITION_WINDOWS: &str = "repartition_windows";
/// Session Configuration entry name for 'PARQUET_PRUNING'
pub const PARQUET_PRUNING: &str = "parquet_pruning";
/// Session Configuration entry name for 'COLLECT_STATISTICS'
pub const COLLECT_STATISTICS: &str = "collect_statistics";

/// Map that holds opaque objects indexed by their type.
///
/// Data is wrapped into an [`Arc`] to enable [`Clone`] while still being [object safe].
Expand Down Expand Up @@ -1365,45 +1352,33 @@ impl SessionConfig {
.unwrap()
}

/// Convert configuration options to name-value pairs with values
/// converted to strings.
///
/// Note that this method will eventually be deprecated and
/// replaced by [`config_options`].
///
/// [`config_options`]: SessionContext::config_option
pub fn to_props(&self) -> HashMap<String, String> {
let mut map = HashMap::new();
// copy configs from config_options
for (k, v) in self.config_options.options() {
map.insert(k.to_string(), format!("{v}"));
}
map.insert(
TARGET_PARTITIONS.to_owned(),
format!("{}", self.target_partitions()),
);
map.insert(
REPARTITION_JOINS.to_owned(),
format!("{}", self.repartition_joins()),
);
map.insert(
REPARTITION_AGGREGATIONS.to_owned(),
format!("{}", self.repartition_aggregations()),
);
map.insert(
REPARTITION_WINDOWS.to_owned(),
format!("{}", self.repartition_window_functions()),
);
map.insert(
PARQUET_PRUNING.to_owned(),
format!("{}", self.parquet_pruning()),
);
map.insert(
COLLECT_STATISTICS.to_owned(),
format!("{}", self.collect_statistics()),
);
/// Enables or disables the coalescence of small batches into larger batches
pub fn with_coalesce_batches(mut self, enabled: bool) -> Self {
self.config_options.set_bool(OPT_COALESCE_BATCHES, enabled);
self
}

/// Returns true if record batches will be examined between each operator
/// and small batches will be coalesced into larger batches.
pub fn coalesce_batches(&self) -> bool {
self.config_options
.get_bool(OPT_COALESCE_BATCHES)
.unwrap_or_default()
}

map
/// Enables or disables the round robin repartition for increasing parallelism
pub fn with_round_robin_repartition(mut self, enabled: bool) -> Self {
self.config_options
.set_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION, enabled);
self
}

/// Returns true if the physical plan optimizer will try to
/// add round robin repartition to increase parallelism to leverage more CPU cores.
pub fn round_robin_repartition(&self) -> bool {
self.config_options
.get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION)
.unwrap_or_default()
}

/// Return a handle to the configuration options.
Expand Down Expand Up @@ -1563,11 +1538,7 @@ impl SessionState {
// - it's conflicted with some parts of the BasicEnforcement, since it will
// introduce additional repartitioning while the BasicEnforcement aims at
// reducing unnecessary repartitioning.
if config
.config_options
.get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION)
.unwrap_or_default()
{
if config.round_robin_repartition() {
physical_optimizers.push(Arc::new(Repartition::new()));
}
//- Currently it will depend on the partition number to decide whether to change the
Expand Down Expand Up @@ -1599,19 +1570,8 @@ impl SessionState {
physical_optimizers.push(Arc::new(OptimizeSorts::new()));
// It will not influence the distribution and ordering of the whole plan tree.
// Therefore, to avoid influencing other rules, it should be run at last.
if config
.config_options
.get_bool(OPT_COALESCE_BATCHES)
.unwrap_or_default()
{
physical_optimizers.push(Arc::new(CoalesceBatches::new(
config
.config_options
.get_u64(OPT_COALESCE_TARGET_BATCH_SIZE)
.unwrap_or_default()
.try_into()
.unwrap(),
)));
if config.coalesce_batches() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That reads much more nicely 👍

physical_optimizers.push(Arc::new(CoalesceBatches::new(config.batch_size())));
}
// The PipelineChecker rule will reject non-runnable query plans that use
// pipeline-breaking operators on infinite input(s). The rule generates a
Expand Down Expand Up @@ -1969,30 +1929,46 @@ impl TaskContext {
SessionConfig::new()
.with_batch_size(task_props.get(OPT_BATCH_SIZE).unwrap().parse().unwrap())
.with_target_partitions(
task_props.get(TARGET_PARTITIONS).unwrap().parse().unwrap(),
task_props
.get(OPT_TARGET_PARTITIONS)
.unwrap()
.parse()
.unwrap(),
)
.with_repartition_joins(
task_props.get(REPARTITION_JOINS).unwrap().parse().unwrap(),
task_props
.get(OPT_REPARTITION_JOINS)
.unwrap()
.parse()
.unwrap(),
)
.with_repartition_aggregations(
task_props
.get(REPARTITION_AGGREGATIONS)
.get(OPT_REPARTITION_AGGREGATIONS)
.unwrap()
.parse()
.unwrap(),
)
.with_repartition_windows(
task_props
.get(REPARTITION_WINDOWS)
.get(OPT_REPARTITION_WINDOWS)
.unwrap()
.parse()
.unwrap(),
)
.with_parquet_pruning(
task_props.get(PARQUET_PRUNING).unwrap().parse().unwrap(),
task_props
.get(OPT_PARQUET_ENABLE_PRUNING)
.unwrap()
.parse()
.unwrap(),
)
.with_collect_statistics(
task_props.get(COLLECT_STATISTICS).unwrap().parse().unwrap(),
task_props
.get(OPT_COLLECT_STATISTICS)
.unwrap()
.parse()
.unwrap(),
)
};

Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ pub fn concat_batches(
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE};
use crate::config::OPT_COALESCE_BATCHES;
use crate::datasource::MemTable;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::projection::ProjectionExec;
Expand All @@ -319,9 +319,7 @@ mod tests {

#[tokio::test]
async fn test_custom_batch_size() -> Result<()> {
let ctx = SessionContext::with_config(
SessionConfig::new().set_u64(OPT_COALESCE_TARGET_BATCH_SIZE, 1234),
);
let ctx = SessionContext::with_config(SessionConfig::new().with_batch_size(1234));
let plan = create_physical_plan(ctx).await?;
let projection = plan.as_any().downcast_ref::<ProjectionExec>().unwrap();
let coalesce = projection
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
///
/// ```text
/// ProjectionExec: expr=[a]
/// CoalesceBatchesExec: target_batch_size=4096
/// CoalesceBatchesExec: target_batch_size=8192
/// FilterExec: a < 5
/// RepartitionExec: partitioning=RoundRobinBatch(16)
/// CsvExec: source=...",
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ pub fn with_new_children_if_necessary(
/// let plan_string = plan_string.replace(normalized.as_ref(), "WORKING_DIR");
///
/// assert_eq!("ProjectionExec: expr=[a@0 as a]\
/// \n CoalesceBatchesExec: target_batch_size=4096\
/// \n CoalesceBatchesExec: target_batch_size=8192\
/// \n FilterExec: a@0 < 5\
/// \n RepartitionExec: partitioning=RoundRobinBatch(3)\
/// \n CsvExec: files={1 group: [[WORKING_DIR/tests/example.csv]]}, has_header=true, limit=None, projection=[a]",
Expand Down
13 changes: 6 additions & 7 deletions datafusion/core/tests/config_from_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,17 @@ fn get_config_bool_from_env() {
fn get_config_int_from_env() {
let config_key = "datafusion.execution.batch_size";
let env_key = "DATAFUSION_EXECUTION_BATCH_SIZE";

// for valid testing
env::set_var(env_key, "4096");
let config = ConfigOptions::from_env();
env::remove_var(env_key);
assert_eq!(config.get_u64(config_key).unwrap_or_default(), 4096);
}

#[test]
fn get_config_int_from_env_invalid() {
let config_key = "datafusion.execution.coalesce_target_batch_size";
let env_key = "DATAFUSION_EXECUTION_COALESCE_TARGET_BATCH_SIZE";
// for invalid testing
env::set_var(env_key, "abc");
let config = ConfigOptions::from_env();
assert_eq!(config.get_u64(config_key).unwrap_or_default(), 8192); // set to its default value

// To avoid influence other testing, we need to clear this environment variable
env::remove_var(env_key);
assert_eq!(config.get_u64(config_key).unwrap_or_default(), 4096); // set to its default value
}
14 changes: 10 additions & 4 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use datafusion::{
async fn explain_analyze_baseline_metrics() {
// This test uses the execute function to run an actual plan under EXPLAIN ANALYZE
// and then validate the presence of baseline metrics for supported operators
let config = SessionConfig::new().with_target_partitions(3);
let config = SessionConfig::new()
.with_target_partitions(3)
.with_batch_size(4096);
let ctx = SessionContext::with_config(config);
register_aggregate_csv_by_sql(&ctx).await;
// a query with as many operators as we have metrics for
Expand Down Expand Up @@ -655,7 +657,9 @@ order by
#[tokio::test]
async fn test_physical_plan_display_indent() {
// Hard code target_partitions as it appears in the RepartitionExec output
let config = SessionConfig::new().with_target_partitions(9000);
let config = SessionConfig::new()
.with_target_partitions(9000)
.with_batch_size(4096);
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await.unwrap();
let sql = "SELECT c1, MAX(c12), MIN(c12) as the_min \
Expand Down Expand Up @@ -697,7 +701,9 @@ async fn test_physical_plan_display_indent() {
#[tokio::test]
async fn test_physical_plan_display_indent_multi_children() {
// Hard code target_partitions as it appears in the RepartitionExec output
let config = SessionConfig::new().with_target_partitions(9000);
let config = SessionConfig::new()
.with_target_partitions(9000)
.with_batch_size(4096);
let ctx = SessionContext::with_config(config);
// ensure indenting works for nodes with multiple children
register_aggregate_csv(&ctx).await.unwrap();
Expand Down Expand Up @@ -745,7 +751,7 @@ async fn test_physical_plan_display_indent_multi_children() {
async fn csv_explain() {
// This test uses the execute function that create full plan cycle: logical, optimized logical, and physical,
// then execute the physical plan and return the final explain results
let ctx = SessionContext::new();
let ctx = SessionContext::with_config(SessionConfig::new().with_batch_size(4096));
register_aggregate_csv_by_sql(&ctx).await;
let sql = "EXPLAIN SELECT c1 FROM aggregate_test_100 where c2 > cast(10 as int)";
let actual = execute(&ctx, sql).await;
Expand Down
12 changes: 8 additions & 4 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ fn create_join_context(
let ctx = SessionContext::with_config(
SessionConfig::new()
.with_repartition_joins(repartition_joins)
.with_target_partitions(2),
.with_target_partitions(2)
.with_batch_size(4096),
);

let t1_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -241,7 +242,8 @@ fn create_left_semi_anti_join_context_with_null_ids(
let ctx = SessionContext::with_config(
SessionConfig::new()
.with_repartition_joins(repartition_joins)
.with_target_partitions(2),
.with_target_partitions(2)
.with_batch_size(4096),
);

let t1_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -313,7 +315,8 @@ fn create_right_semi_anti_join_context_with_null_ids(
let ctx = SessionContext::with_config(
SessionConfig::new()
.with_repartition_joins(repartition_joins)
.with_target_partitions(2),
.with_target_partitions(2)
.with_batch_size(4096),
);

let t1_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -618,7 +621,8 @@ fn create_sort_merge_join_datatype_context() -> Result<SessionContext> {
let ctx = SessionContext::with_config(
SessionConfig::new()
.set_bool(OPT_PREFER_HASH_JOIN, false)
.with_target_partitions(2),
.with_target_partitions(2)
.with_batch_size(4096),
);

let t1_schema = Schema::new(vec![
Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/tests/sql/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1738,7 +1738,11 @@ async fn over_order_by_sort_keys_sorting_global_order_compacting() -> Result<()>

#[tokio::test]
async fn test_window_partition_by_order_by() -> Result<()> {
let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
let ctx = SessionContext::with_config(
SessionConfig::new()
.with_target_partitions(2)
.with_batch_size(4096),
);
register_aggregate_csv(&ctx).await?;

let sql = "SELECT \
Expand Down Expand Up @@ -2239,6 +2243,7 @@ async fn test_window_agg_sort_orderby_reversed_binary_expr() -> Result<()> {
async fn test_remove_unnecessary_sort_in_sub_query() -> Result<()> {
let config = SessionConfig::new()
.with_target_partitions(8)
.with_batch_size(4096)
.with_repartition_windows(true);
let ctx = SessionContext::with_config(config);
register_aggregate_csv(&ctx).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ datafusion.catalog.location NULL
datafusion.catalog.type NULL
datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
datafusion.execution.coalesce_target_batch_size 4096
datafusion.execution.collect_statistics false
datafusion.execution.parquet.enable_page_index false
datafusion.execution.parquet.metadata_size_hint NULL
Expand Down
Loading