Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into refactor-treenode-apply
Browse files Browse the repository at this point in the history
# Conflicts:
#	datafusion/expr/src/expr.rs
#	datafusion/expr/src/utils.rs
  • Loading branch information
peter-toth committed Dec 20, 2023
2 parents 8882285 + 8d72196 commit aa333d1
Show file tree
Hide file tree
Showing 13 changed files with 1,172 additions and 494 deletions.
16 changes: 16 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,22 @@ impl DFSchema {
.collect()
}

/// Find all fields indices having the given qualifier
pub fn fields_indices_with_qualified(
&self,
qualifier: &TableReference,
) -> Vec<usize> {
self.fields
.iter()
.enumerate()
.filter_map(|(idx, field)| {
field
.qualifier()
.and_then(|q| q.eq(qualifier).then_some(idx))
})
.collect()
}

/// Find all fields match the given name
pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> {
self.fields
Expand Down
7 changes: 5 additions & 2 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,11 @@ pub trait TreeNode: Sized {
// Run the recursive `transform` on each children.
let mut payload_up = vec![];
let tnr = self.transform_children(&mut |c| {
let (tnr, p) =
c.transform_with_payload(f_down, new_payload_down_iter.next().unwrap(), f_up)?;
let (tnr, p) = c.transform_with_payload(
f_down,
new_payload_down_iter.next().unwrap(),
f_up,
)?;
p.into_iter().for_each(|p| payload_up.push(p));
Ok(tnr)
})?;
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ impl TableProviderFactory for StreamTableFactory {
.with_encoding(encoding)
.with_order(cmd.order_exprs.clone())
.with_header(cmd.has_header)
.with_batch_size(state.config().batch_size());
.with_batch_size(state.config().batch_size())
.with_constraints(cmd.constraints.clone());

Ok(Arc::new(StreamTable(Arc::new(config))))
}
Expand Down
92 changes: 61 additions & 31 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ use datafusion_common::tree_node::{
};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};

use datafusion_physical_plan::repartition::RepartitionExec;

use itertools::izip;

/// This rule inspects [`SortExec`]'s in the given physical plan and removes the
Expand Down Expand Up @@ -842,14 +842,16 @@ mod tests {
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::{displayable, get_plan_string, Partitioning};
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::{csv_exec_sorted, stream_exec_ordered};
use crate::test::{csv_exec_ordered, csv_exec_sorted, stream_exec_ordered};

use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::Result;
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::{col, Column, NotExpr};

use rstest::rstest;

fn create_test_schema() -> Result<SchemaRef> {
let nullable_column = Field::new("nullable_col", DataType::Int32, true);
let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false);
Expand Down Expand Up @@ -2213,12 +2215,19 @@ mod tests {
Ok(())
}

#[rstest]
#[tokio::test]
async fn test_with_lost_ordering_unbounded() -> Result<()> {
async fn test_with_lost_ordering_unbounded_bounded(
#[values(false, true)] source_unbounded: bool,
) -> Result<()> {
let schema = create_test_schema3()?;
let sort_exprs = vec![sort_expr("a", &schema)];
// create an unbounded source
let source = stream_exec_ordered(&schema, sort_exprs);
// create either bounded or unbounded source
let source = if source_unbounded {
stream_exec_ordered(&schema, sort_exprs)
} else {
csv_exec_ordered(&schema, sort_exprs)
};
let repartition_rr = repartition_exec(source);
let repartition_hash = Arc::new(RepartitionExec::try_new(
repartition_rr,
Expand All @@ -2227,50 +2236,71 @@ mod tests {
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions);

let expected_input = [
// Expected inputs unbounded and bounded
let expected_input_unbounded = vec![
"SortExec: expr=[a@0 ASC]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
];
let expected_optimized = [
let expected_input_bounded = vec![
"SortExec: expr=[a@0 ASC]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
];

// Expected unbounded result (same for with and without flag)
let expected_optimized_unbounded = vec![
"SortPreservingMergeExec: [a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
Ok(())
}

#[tokio::test]
async fn test_with_lost_ordering_unbounded_parallelize_off() -> Result<()> {
let schema = create_test_schema3()?;
let sort_exprs = vec![sort_expr("a", &schema)];
// create an unbounded source
let source = stream_exec_ordered(&schema, sort_exprs);
let repartition_rr = repartition_exec(source);
let repartition_hash = Arc::new(RepartitionExec::try_new(
repartition_rr,
Partitioning::Hash(vec![col("c", &schema).unwrap()], 10),
)?) as _;
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions);

let expected_input = ["SortExec: expr=[a@0 ASC]",
// Expected bounded results with and without flag
let expected_optimized_bounded = vec![
"SortExec: expr=[a@0 ASC]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
];
let expected_optimized = [
let expected_optimized_bounded_parallelize_sort = vec![
"SortPreservingMergeExec: [a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" StreamingTableExec: partition_sizes=1, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC]",
" SortExec: expr=[a@0 ASC]",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], has_header=true",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, false);
let (expected_input, expected_optimized, expected_optimized_sort_parallelize) =
if source_unbounded {
(
expected_input_unbounded,
expected_optimized_unbounded.clone(),
expected_optimized_unbounded,
)
} else {
(
expected_input_bounded,
expected_optimized_bounded,
expected_optimized_bounded_parallelize_sort,
)
};
assert_optimized!(
expected_input,
expected_optimized,
physical_plan.clone(),
false
);
assert_optimized!(
expected_input,
expected_optimized_sort_parallelize,
physical_plan,
true
);
Ok(())
}

Expand Down
Loading

0 comments on commit aa333d1

Please sign in to comment.