Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/consolidated_st…
Browse files Browse the repository at this point in the history
…reaming
  • Loading branch information
alamb committed Jul 19, 2023
2 parents 4814da4 + 07ffebb commit 048b5cd
Show file tree
Hide file tree
Showing 12 changed files with 1,822 additions and 1,448 deletions.
5 changes: 4 additions & 1 deletion datafusion/core/src/physical_optimizer/pipeline_fixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use crate::error::Result;
use crate::physical_optimizer::join_selection::swap_hash_join;
use crate::physical_optimizer::pipeline_checker::PipelineStatePropagator;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::joins::{HashJoinExec, PartitionMode, SymmetricHashJoinExec};
use crate::physical_plan::joins::{
HashJoinExec, PartitionMode, StreamJoinPartitionMode, SymmetricHashJoinExec,
};
use crate::physical_plan::ExecutionPlan;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::DataFusionError;
Expand Down Expand Up @@ -101,6 +103,7 @@ fn hash_join_convert_symmetric_subrule(
hash_join.filter().cloned(),
hash_join.join_type(),
hash_join.null_equals_null(),
StreamJoinPartitionMode::Partitioned,
)
.map(|exec| {
input.plan = Arc::new(exec) as _;
Expand Down
230 changes: 8 additions & 222 deletions datafusion/core/src/physical_optimizer/sort_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,22 +972,15 @@ pub(crate) fn unbounded_output(plan: &Arc<dyn ExecutionPlan>) -> bool {
#[cfg(test)]
mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
use crate::physical_plan::aggregates::PhysicalGroupBy;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::joins::utils::{JoinFilter, JoinOn};
use crate::physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec};
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::memory::MemoryExec;
use crate::physical_optimizer::test_utils::{
aggregate_exec, bounded_window_exec, coalesce_batches_exec,
coalesce_partitions_exec, filter_exec, get_plan_string, global_limit_exec,
hash_join_exec, limit_exec, local_limit_exec, memory_exec, parquet_exec,
parquet_exec_sorted, repartition_exec, sort_exec, sort_expr, sort_expr_options,
sort_merge_join_exec, sort_preserving_merge_exec, union_exec,
};
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::create_window_expr;
use crate::physical_plan::windows::PartitionSearchMode::{
Linear, PartiallySorted, Sorted,
};
Expand All @@ -996,9 +989,8 @@ mod tests {
use crate::test::csv_exec_sorted;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::{Result, Statistics};
use datafusion_common::Result;
use datafusion_expr::JoinType;
use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::expressions::{col, NotExpr};
use datafusion_physical_expr::PhysicalSortExpr;
Expand Down Expand Up @@ -1030,13 +1022,6 @@ mod tests {
Ok(schema)
}

// Util function to get string representation of a physical plan
fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
let formatted = displayable(plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
actual.iter().map(|elem| elem.to_string()).collect()
}

#[tokio::test]
async fn test_is_column_aligned_nullable() -> Result<()> {
let schema = create_test_schema()?;
Expand Down Expand Up @@ -2828,203 +2813,4 @@ mod tests {
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}

/// make PhysicalSortExpr with default options
fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
sort_expr_options(name, schema, SortOptions::default())
}

/// PhysicalSortExpr with specified options
fn sort_expr_options(
name: &str,
schema: &Schema,
options: SortOptions,
) -> PhysicalSortExpr {
PhysicalSortExpr {
expr: col(name, schema).unwrap(),
options,
}
}

fn memory_exec(schema: &SchemaRef) -> Arc<dyn ExecutionPlan> {
Arc::new(MemoryExec::try_new(&[], schema.clone(), None).unwrap())
}

fn sort_exec(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(SortExec::new(sort_exprs, input))
}

fn hash_join_exec(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
filter: Option<JoinFilter>,
join_type: &JoinType,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(HashJoinExec::try_new(
left,
right,
on,
filter,
join_type,
PartitionMode::Partitioned,
true,
)?))
}

fn sort_preserving_merge_exec(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(SortPreservingMergeExec::new(sort_exprs, input))
}

fn filter_exec(
predicate: Arc<dyn PhysicalExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
Arc::new(FilterExec::try_new(predicate, input).unwrap())
}

fn bounded_window_exec(
col_name: &str,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs: Vec<_> = sort_exprs.into_iter().collect();
let schema = input.schema();

Arc::new(
BoundedWindowAggExec::try_new(
vec![create_window_expr(
&WindowFunction::AggregateFunction(AggregateFunction::Count),
"count".to_owned(),
&[col(col_name, &schema).unwrap()],
&[],
&sort_exprs,
Arc::new(WindowFrame::new(true)),
schema.as_ref(),
)
.unwrap()],
input.clone(),
input.schema(),
vec![],
Sorted,
)
.unwrap(),
)
}

/// Create a non sorted parquet exec
fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
},
None,
None,
))
}

// Created a sorted parquet exec
fn parquet_exec_sorted(
schema: &SchemaRef,
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();

Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile::new("x".to_string(), 100)]],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![sort_exprs],
infinite_source: false,
},
None,
None,
))
}

fn union_exec(input: Vec<Arc<dyn ExecutionPlan>>) -> Arc<dyn ExecutionPlan> {
Arc::new(UnionExec::new(input))
}

fn limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
global_limit_exec(local_limit_exec(input))
}

fn local_limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(LocalLimitExec::new(input, 100))
}

fn global_limit_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(GlobalLimitExec::new(input, 0, Some(100)))
}

fn repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10)).unwrap(),
)
}

fn coalesce_partitions_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(CoalescePartitionsExec::new(input))
}

fn aggregate_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
let schema = input.schema();
Arc::new(
AggregateExec::try_new(
AggregateMode::Final,
PhysicalGroupBy::default(),
vec![],
vec![],
vec![],
input,
schema,
)
.unwrap(),
)
}

fn sort_merge_join_exec(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
join_on: &JoinOn,
join_type: &JoinType,
) -> Arc<dyn ExecutionPlan> {
Arc::new(
SortMergeJoinExec::try_new(
left,
right,
join_on.clone(),
*join_type,
vec![SortOptions::default(); join_on.len()],
false,
)
.unwrap(),
)
}

fn coalesce_batches_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(CoalesceBatchesExec::new(input, 128))
}
}
Loading

0 comments on commit 048b5cd

Please sign in to comment.