Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactors on TreeNode Implementations #8395

Merged
merged 5 commits into from
Dec 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ fn hash_join_convert_symmetric_subrule(
config_options: &ConfigOptions,
) -> Option<Result<PipelineStatePropagator>> {
if let Some(hash_join) = input.plan.as_any().downcast_ref::<HashJoinExec>() {
let ub_flags = &input.children_unbounded;
let ub_flags = input.children_unbounded();
let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
input.unbounded = left_unbounded || right_unbounded;
let result = if left_unbounded && right_unbounded {
Expand Down Expand Up @@ -511,7 +511,7 @@ fn hash_join_swap_subrule(
_config_options: &ConfigOptions,
) -> Option<Result<PipelineStatePropagator>> {
if let Some(hash_join) = input.plan.as_any().downcast_ref::<HashJoinExec>() {
let ub_flags = &input.children_unbounded;
let ub_flags = input.children_unbounded();
let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
input.unbounded = left_unbounded || right_unbounded;
let result = if left_unbounded
Expand Down Expand Up @@ -577,7 +577,7 @@ fn apply_subrules(
}
let is_unbounded = input
.plan
.unbounded_output(&input.children_unbounded)
.unbounded_output(&input.children_unbounded())
// Treat the case where an operator can not run on unbounded data as
// if it can and it outputs unbounded data. Do not raise an error yet.
// Such operators may be fixed, adjusted or replaced later on during
Expand Down Expand Up @@ -1253,6 +1253,7 @@ mod hash_join_tests {
use arrow::record_batch::RecordBatch;
use datafusion_common::utils::DataPtr;
use datafusion_common::JoinType;
use datafusion_physical_plan::empty::EmptyExec;
use std::sync::Arc;

struct TestCase {
Expand Down Expand Up @@ -1620,10 +1621,22 @@ mod hash_join_tests {
false,
)?;

let children = vec![
PipelineStatePropagator {
plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
unbounded: left_unbounded,
children: vec![],
},
PipelineStatePropagator {
plan: Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))),
unbounded: right_unbounded,
children: vec![],
},
];
let initial_hash_join_state = PipelineStatePropagator {
plan: Arc::new(join),
unbounded: false,
children_unbounded: vec![left_unbounded, right_unbounded],
children,
};

let optimized_hash_join =
Expand Down
40 changes: 20 additions & 20 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,29 +70,36 @@ impl PhysicalOptimizerRule for PipelineChecker {
pub struct PipelineStatePropagator {
pub(crate) plan: Arc<dyn ExecutionPlan>,
pub(crate) unbounded: bool,
pub(crate) children_unbounded: Vec<bool>,
pub(crate) children: Vec<PipelineStatePropagator>,
}

impl PipelineStatePropagator {
/// Constructs a new, default pipelining state.
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
let length = plan.children().len();
let children = plan.children();
PipelineStatePropagator {
plan,
unbounded: false,
children_unbounded: vec![false; length],
children: children.into_iter().map(Self::new).collect(),
}
}

/// Returns the children unboundedness information.
pub fn children_unbounded(&self) -> Vec<bool> {
self.children
.iter()
.map(|c| c.unbounded)
.collect::<Vec<_>>()
}
}

impl TreeNode for PipelineStatePropagator {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
let children = self.plan.children();
for child in children {
match op(&PipelineStatePropagator::new(child))? {
for child in &self.children {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
Expand All @@ -106,25 +113,18 @@ impl TreeNode for PipelineStatePropagator {
where
F: FnMut(Self) -> Result<Self>,
{
let children = self.plan.children();
if !children.is_empty() {
let new_children = children
if !self.children.is_empty() {
let new_children = self
.children
.into_iter()
.map(PipelineStatePropagator::new)
.map(transform)
.collect::<Result<Vec<_>>>()?;
let children_unbounded = new_children
.iter()
.map(|c| c.unbounded)
.collect::<Vec<bool>>();
let children_plans = new_children
.into_iter()
.map(|child| child.plan)
.collect::<Vec<_>>();
let children_plans = new_children.iter().map(|c| c.plan.clone()).collect();

Ok(PipelineStatePropagator {
plan: with_new_children_if_necessary(self.plan, children_plans)?.into(),
unbounded: self.unbounded,
children_unbounded,
children: new_children,
})
} else {
Ok(self)
Expand All @@ -149,7 +149,7 @@ pub fn check_finiteness_requirements(
}
input
.plan
.unbounded_output(&input.children_unbounded)
.unbounded_output(&input.children_unbounded())
.map(|value| {
input.unbounded = value;
Transformed::Yes(input)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/equivalence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1520,7 +1520,7 @@ fn update_ordering(
node.state = SortProperties::Ordered(options);
} else if !node.expr.children().is_empty() {
// We have an intermediate (non-leaf) node, account for its children:
node.state = node.expr.get_ordering(&node.children_states);
node.state = node.expr.get_ordering(&node.children_state());
} else if node.expr.as_any().is::<Literal>() {
// We have a Literal, which is the other possible leaf node type:
node.state = node.expr.get_ordering(&[]);
Expand Down
58 changes: 21 additions & 37 deletions datafusion/physical-expr/src/sort_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

use std::{ops::Neg, sync::Arc};

use crate::PhysicalExpr;
use arrow_schema::SortOptions;

use crate::PhysicalExpr;
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::Result;

use itertools::Itertools;

/// To propagate [`SortOptions`] across the [`PhysicalExpr`], it is insufficient
/// to simply use `Option<SortOptions>`: There must be a differentiation between
/// unordered columns and literal values, since literals may not break the ordering
Expand All @@ -35,11 +34,12 @@ use itertools::Itertools;
/// sorted data; however the ((a_ordered + 999) + c_ordered) expression can. Therefore,
/// we need two different variants for literals and unordered columns as literals are
/// often more ordering-friendly under most mathematical operations.
#[derive(PartialEq, Debug, Clone, Copy)]
#[derive(PartialEq, Debug, Clone, Copy, Default)]
pub enum SortProperties {
/// Use the ordinary [`SortOptions`] struct to represent ordered data:
Ordered(SortOptions),
// This alternative represents unordered data:
#[default]
Unordered,
// Singleton is used for single-valued literal numbers:
Singleton,
Expand Down Expand Up @@ -151,34 +151,24 @@ impl Neg for SortProperties {
pub struct ExprOrdering {
pub expr: Arc<dyn PhysicalExpr>,
pub state: SortProperties,
pub children_states: Vec<SortProperties>,
pub children: Vec<ExprOrdering>,
}

impl ExprOrdering {
/// Creates a new [`ExprOrdering`] with [`SortProperties::Unordered`] states
/// for `expr` and its children.
pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
let size = expr.children().len();
let children = expr.children();
Self {
expr,
state: SortProperties::Unordered,
children_states: vec![SortProperties::Unordered; size],
state: Default::default(),
children: children.into_iter().map(Self::new).collect(),
}
}

/// Updates this [`ExprOrdering`]'s children states with the given states.
pub fn with_new_children(mut self, children_states: Vec<SortProperties>) -> Self {
self.children_states = children_states;
self
}

/// Creates new [`ExprOrdering`] objects for each child of the expression.
pub fn children_expr_orderings(&self) -> Vec<ExprOrdering> {
self.expr
.children()
.into_iter()
.map(ExprOrdering::new)
.collect()
/// Get a reference to each child state.
pub fn children_state(&self) -> Vec<SortProperties> {
self.children.iter().map(|c| c.state).collect()
}
}

Expand All @@ -187,8 +177,8 @@ impl TreeNode for ExprOrdering {
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children_expr_orderings() {
match op(&child)? {
for child in &self.children {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
Expand All @@ -197,25 +187,19 @@ impl TreeNode for ExprOrdering {
Ok(VisitRecursion::Continue)
}

fn map_children<F>(self, transform: F) -> Result<Self>
fn map_children<F>(mut self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
if self.children_states.is_empty() {
if self.children.is_empty() {
Ok(self)
} else {
let child_expr_orderings = self.children_expr_orderings();
// After mapping over the children, the function `F` applies to the
// current object and updates its state.
Ok(self.with_new_children(
child_expr_orderings
.into_iter()
// Update children states after this transformation:
.map(transform)
// Extract the state (i.e. sort properties) information:
.map_ok(|c| c.state)
.collect::<Result<Vec<_>>>()?,
))
self.children = self
.children
.into_iter()
.map(transform)
.collect::<Result<Vec<_>>>()?;
Ok(self)
}
}
}
15 changes: 6 additions & 9 deletions datafusion/physical-expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,23 +129,20 @@ pub struct ExprTreeNode<T> {

impl<T> ExprTreeNode<T> {
pub fn new(expr: Arc<dyn PhysicalExpr>) -> Self {
let children = expr.children();
ExprTreeNode {
expr,
data: None,
child_nodes: vec![],
child_nodes: children.into_iter().map(Self::new).collect_vec(),
}
}

pub fn expression(&self) -> &Arc<dyn PhysicalExpr> {
&self.expr
}

pub fn children(&self) -> Vec<ExprTreeNode<T>> {
self.expr
.children()
.into_iter()
.map(ExprTreeNode::new)
.collect()
pub fn children(&self) -> &[ExprTreeNode<T>] {
&self.child_nodes
}
}

Expand All @@ -155,7 +152,7 @@ impl<T: Clone> TreeNode for ExprTreeNode<T> {
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children() {
match op(&child)? {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
Expand All @@ -170,7 +167,7 @@ impl<T: Clone> TreeNode for ExprTreeNode<T> {
F: FnMut(Self) -> Result<Self>,
{
self.child_nodes = self
.children()
.child_nodes
.into_iter()
.map(transform)
.collect::<Result<Vec<_>>>()?;
Expand Down