Skip to content

Commit

Permalink
Move window analysis to the window method (#7672)
Browse files Browse the repository at this point in the history
* Move window analysis to the method

* Preserve physical partition keys, during recreation of the window

* Final review

* Avoid cloning if not necessary

---------

Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
mustafasrepo and ozankabak authored Sep 28, 2023
1 parent b5ec4f1 commit 26a3602
Show file tree
Hide file tree
Showing 9 changed files with 873 additions and 784 deletions.
104 changes: 104 additions & 0 deletions datafusion/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
use std::borrow::{Borrow, Cow};
use std::cmp::Ordering;
use std::collections::HashSet;
use std::ops::Range;
use std::sync::Arc;

Expand Down Expand Up @@ -429,6 +430,64 @@ pub mod datafusion_strsim {
}
}

/// Merges collections `first` and `second`, removes duplicates and sorts the
/// result, returning it as a [`Vec`].
pub fn merge_and_order_indices<T: Borrow<usize>, S: Borrow<usize>>(
first: impl IntoIterator<Item = T>,
second: impl IntoIterator<Item = S>,
) -> Vec<usize> {
let mut result: Vec<_> = first
.into_iter()
.map(|e| *e.borrow())
.chain(second.into_iter().map(|e| *e.borrow()))
.collect::<HashSet<_>>()
.into_iter()
.collect();
result.sort();
result
}

/// Calculates the set difference between sequences `first` and `second`,
/// returning the result as a [`Vec`]. Preserves the ordering of `first`.
pub fn set_difference<T: Borrow<usize>, S: Borrow<usize>>(
first: impl IntoIterator<Item = T>,
second: impl IntoIterator<Item = S>,
) -> Vec<usize> {
let set: HashSet<_> = second.into_iter().map(|e| *e.borrow()).collect();
first
.into_iter()
.map(|e| *e.borrow())
.filter(|e| !set.contains(e))
.collect()
}

/// Checks whether the given index sequence is monotonically non-decreasing.
pub fn is_sorted<T: Borrow<usize>>(sequence: impl IntoIterator<Item = T>) -> bool {
// TODO: Remove this function when `is_sorted` graduates from Rust nightly.
let mut previous = 0;
for item in sequence.into_iter() {
let current = *item.borrow();
if current < previous {
return false;
}
previous = current;
}
true
}

/// Find indices of each element in `targets` inside `items`. If one of the
/// elements is absent in `items`, returns an error.
pub fn find_indices<T: PartialEq, S: Borrow<T>>(
items: &[T],
targets: impl IntoIterator<Item = S>,
) -> Result<Vec<usize>> {
targets
.into_iter()
.map(|target| items.iter().position(|e| target.borrow().eq(e)))
.collect::<Option<_>>()
.ok_or_else(|| DataFusionError::Execution("Target not found".to_string()))
}

#[cfg(test)]
mod tests {
use crate::ScalarValue;
Expand Down Expand Up @@ -747,4 +806,49 @@ mod tests {
"cloned `Arc` should point to same data as the original"
);
}

#[test]
fn test_merge_and_order_indices() {
assert_eq!(
merge_and_order_indices([0, 3, 4], [1, 3, 5]),
vec![0, 1, 3, 4, 5]
);
// Result should be ordered, even if inputs are not
assert_eq!(
merge_and_order_indices([3, 0, 4], [5, 1, 3]),
vec![0, 1, 3, 4, 5]
);
}

#[test]
fn test_set_difference() {
assert_eq!(set_difference([0, 3, 4], [1, 2]), vec![0, 3, 4]);
assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);
// return value should have same ordering with the in1
assert_eq!(set_difference([3, 4, 0], [1, 2, 4]), vec![3, 0]);
assert_eq!(set_difference([0, 3, 4], [4, 1, 2]), vec![0, 3]);
assert_eq!(set_difference([3, 4, 0], [4, 1, 2]), vec![3, 0]);
}

#[test]
fn test_is_sorted() {
assert!(is_sorted::<usize>([]));
assert!(is_sorted([0]));
assert!(is_sorted([0, 3, 4]));
assert!(is_sorted([0, 1, 2]));
assert!(is_sorted([0, 1, 4]));
assert!(is_sorted([0usize; 0]));
assert!(is_sorted([1, 2]));
assert!(!is_sorted([3, 2]));
}

#[test]
fn test_find_indices() -> Result<()> {
assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ use std::sync::Arc;
use crate::config::ConfigOptions;
use crate::datasource::physical_plan::{CsvExec, ParquetExec};
use crate::error::{DataFusionError, Result};
use crate::physical_optimizer::utils::{
add_sort_above, get_plan_string, unbounded_output, ExecTree,
};
use crate::physical_optimizer::utils::{add_sort_above, get_plan_string, ExecTree};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
Expand All @@ -46,6 +44,7 @@ use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::Partitioning;
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};

use datafusion_common::internal_err;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::equivalence::EquivalenceProperties;
Expand All @@ -56,8 +55,8 @@ use datafusion_physical_expr::utils::{
use datafusion_physical_expr::{
expr_list_eq_strict_order, PhysicalExpr, PhysicalSortRequirement,
};
use datafusion_physical_plan::unbounded_output;

use datafusion_common::internal_err;
use itertools::izip;

/// The `EnforceDistribution` rule ensures that distribution requirements are
Expand Down
Loading

0 comments on commit 26a3602

Please sign in to comment.