Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Oct 2, 2024
1 parent 3892499 commit 08df939
Show file tree
Hide file tree
Showing 15 changed files with 349 additions and 27 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

[workspace]
# datafusion-cli is excluded because of its Cargo.lock. See datafusion-cli/README.md.
exclude = ["datafusion-cli", "dev/depcheck"]
exclude = ["dev/depcheck"]
members = [
"datafusion-cli",
"datafusion/common",
"datafusion/common-runtime",
"datafusion/catalog",
Expand Down
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,11 @@ config_namespace! {
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
pub expand_views_at_output: bool, default = false

///
///
///
pub dynamic_join_pushdown: bool, default = true
}
}

Expand Down
76 changes: 56 additions & 20 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ use datafusion_expr::{
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::joins::utils::PhysicalDynamicFiltersInfo;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::unnest::ListUnnest;
use datafusion_sql::utils::window_expr_common_partition_keys;
Expand Down Expand Up @@ -858,6 +859,7 @@ impl DefaultPhysicalPlanner {
join_type,
null_equals_null,
schema: join_schema,
filter_pushdown_info,
..
}) => {
let null_equals_null = *null_equals_null;
Expand Down Expand Up @@ -1051,6 +1053,34 @@ impl DefaultPhysicalPlanner {
_ => None,
};

let physical_dynamic_filter_info = if let Some(filter_pushdown_info) =
filter_pushdown_info
{
let aggregates = filter_pushdown_info
.min_max_aggregates
.iter()
.map(|aggr| {
self.create_physical_expr(aggr, &join_schema, session_state)?
})
.collect::<Vec<_>>();

let columns = filter_pushdown_info
.filters
.iter()
.map(|filter| {
self.create_physical_expr(
filter.column,
&join_schema,
session_state,
)?
})
.collect::<Vec<_>>();

Some(PhysicalDynamicFiltersInfo::new(aggregates, columns))
} else {
None
};

let prefer_hash_join =
session_state.config_options().optimizer.prefer_hash_join;

Expand Down Expand Up @@ -1091,27 +1121,33 @@ impl DefaultPhysicalPlanner {
PartitionMode::Partitioned
}
};
Arc::new(HashJoinExec::try_new(
physical_left,
physical_right,
join_on,
join_filter,
join_type,
None,
partition_mode,
null_equals_null,
)?)
Arc::new(
HashJoinExec::try_new(
physical_left,
physical_right,
join_on,
join_filter,
join_type,
None,
partition_mode,
null_equals_null,
)?
.with_dynamic_filter_info(physical_dynamic_filter_info),
)
} else {
Arc::new(HashJoinExec::try_new(
physical_left,
physical_right,
join_on,
join_filter,
join_type,
None,
PartitionMode::CollectLeft,
null_equals_null,
)?)
Arc::new(
HashJoinExec::try_new(
physical_left,
physical_right,
join_on,
join_filter,
join_type,
None,
PartitionMode::CollectLeft,
null_equals_null,
)?
.with_dynamic_filter_info(physical_dynamic_filter_info),
)
};

// If plan was mutated previously then need to create the ExecutionPlan
Expand Down
3 changes: 3 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,7 @@ impl LogicalPlanBuilder {
join_constraint: JoinConstraint::On,
schema: DFSchemaRef::new(join_schema),
null_equals_null,
filter_pushdown_info: None,
})))
}

Expand Down Expand Up @@ -941,6 +942,7 @@ impl LogicalPlanBuilder {
join_constraint: JoinConstraint::Using,
schema: DFSchemaRef::new(join_schema),
null_equals_null: false,
filter_pushdown_info: None,
})))
}
}
Expand Down Expand Up @@ -1164,6 +1166,7 @@ impl LogicalPlanBuilder {
join_constraint: JoinConstraint::On,
schema: DFSchemaRef::new(join_schema),
null_equals_null: false,
filter_pushdown_info: None,
})))
}

Expand Down
17 changes: 16 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::logical_plan::{DmlStatement, Statement};
use crate::utils::{
enumerate_grouping_sets, exprlist_len, exprlist_to_fields, find_base_plan,
find_out_reference_exprs, grouping_set_expr_count, grouping_set_to_exprlist,
split_conjunction,
split_conjunction, DynamicJoinFilterPushdownInfo,
};
use crate::{
build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Expr,
Expand Down Expand Up @@ -669,6 +669,7 @@ impl LogicalPlan {
on,
schema: _,
null_equals_null,
filter_pushdown_info,
}) => {
let schema =
build_join_schema(left.schema(), right.schema(), &join_type)?;
Expand All @@ -690,6 +691,7 @@ impl LogicalPlan {
filter,
schema: DFSchemaRef::new(schema),
null_equals_null,
filter_pushdown_info,
}))
}
LogicalPlan::CrossJoin(CrossJoin {
Expand Down Expand Up @@ -954,6 +956,7 @@ impl LogicalPlan {
filter: filter_expr,
schema: DFSchemaRef::new(schema),
null_equals_null: *null_equals_null,
filter_pushdown_info: None,
}))
}
LogicalPlan::CrossJoin(_) => {
Expand Down Expand Up @@ -3173,6 +3176,9 @@ pub struct Join {
pub schema: DFSchemaRef,
/// If null_equals_null is true, null == null else null != null
pub null_equals_null: bool,
/// store the filter which should passed to scan if dynamic filter
/// pushdown is enabled
pub filter_pushdown_info: Option<Arc<DynamicJoinFilterPushdownInfo>>,
}

impl Join {
Expand Down Expand Up @@ -3206,8 +3212,17 @@ impl Join {
join_constraint: original_join.join_constraint,
schema: Arc::new(join_schema),
null_equals_null: original_join.null_equals_null,
filter_pushdown_info: None,
})
}
/// assign filter pushdown struct
pub fn with_filter_pushdown_info(
mut self,
filter_pushdown: Arc<DynamicJoinFilterPushdownInfo>,
) -> Self {
self.filter_pushdown_info = Some(filter_pushdown);
self
}
}

// Manual implementation needed because of `schema` field. Comparison excludes this field.
Expand Down
4 changes: 4 additions & 0 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ impl TreeNode for LogicalPlan {
join_constraint,
schema,
null_equals_null,
filter_pushdown_info,
}) => map_until_stop_and_collect!(
rewrite_arc(left, &mut f),
right,
Expand All @@ -157,6 +158,7 @@ impl TreeNode for LogicalPlan {
join_constraint,
schema,
null_equals_null,
filter_pushdown_info,
})
}),
LogicalPlan::CrossJoin(CrossJoin {
Expand Down Expand Up @@ -639,6 +641,7 @@ impl LogicalPlan {
join_constraint,
schema,
null_equals_null,
filter_pushdown_info,
}) => map_until_stop_and_collect!(
on.into_iter().map_until_stop_and_collect(
|on| map_until_stop_and_collect!(f(on.0), on.1, f(on.1))
Expand All @@ -658,6 +661,7 @@ impl LogicalPlan {
join_constraint,
schema,
null_equals_null,
filter_pushdown_info,
})
}),
LogicalPlan::Sort(Sort { expr, input, fetch }) => {
Expand Down
61 changes: 61 additions & 0 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,67 @@ pub fn format_state_name(name: &str, state_name: &str) -> String {
format!("{name}[{state_name}]")
}

/// Dynamic join filter used in
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DynamicJoinFilterPushdownInfo {
pub dynamic_filters: Option<DynamicTableFilters>,
pub filters: Vec<DynamicJoinFilterPushdownColumn>,
pub min_max_aggregates: Vec<Expr>,
}

impl DynamicJoinFilterPushdownInfo {
pub fn new_with_all(
dynamic_filters: DynamicTableFilters,
filters: Vec<DynamicJoinFilterPushdownColumn>,
min_max_aggregates: Vec<Expr>,
) -> Self {
Self {
dynamic_filters: Some(dynamic_filters),
filters,
min_max_aggregates,
}
}

pub fn new_with_dynamic_filter(dynamic_filters: DynamicTableFilters) -> Self {
Self {
dynamic_filters: Some(dynamic_filters),
filters: Vec::new(),
min_max_aggregates: Vec::new(),
}
}
pub fn new() -> Self {
Self {
dynamic_filters: None,
filters: Vec::new(),
min_max_aggregates: Vec::new(),
}
}
pub fn push_filter(&mut self, filter: DynamicJoinFilterPushdownColumn) {
self.filters.push(filter);
}

pub fn push_aggregates(&mut self, aggs: Vec<Aggregate>) {
self.min_max_aggregates.extend(aggs);
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DynamicTableFilters {}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DynamicJoinFilterPushdownColumn {
pub condition_idx: usize,
pub column: Arc<Column>,
}

impl DynamicJoinFilterPushdownColumn {
pub fn new(condition_idx: usize, column: Arc<Column>) -> Self {
Self {
condition_idx,
column,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ fn find_inner_join(
filter: None,
schema: join_schema,
null_equals_null: false,
filter_pushdown_info: None,
}));
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/eliminate_outer_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl OptimizerRule for EliminateOuterJoin {
filter: join.filter.clone(),
schema: Arc::clone(&join.schema),
null_equals_null: join.null_equals_null,
filter_pushdown_info: None,
}));
Filter::try_new(filter.predicate, new_join)
.map(|f| Transformed::yes(LogicalPlan::Filter(f)))
Expand Down
3 changes: 3 additions & 0 deletions datafusion/optimizer/src/extract_equijoin_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl OptimizerRule for ExtractEquijoinPredicate {
join_constraint,
schema,
null_equals_null,
filter_pushdown_info,
}) => {
let left_schema = left.schema();
let right_schema = right.schema();
Expand All @@ -93,6 +94,7 @@ impl OptimizerRule for ExtractEquijoinPredicate {
join_constraint,
schema,
null_equals_null,
filter_pushdown_info,
})))
} else {
Ok(Transformed::no(LogicalPlan::Join(Join {
Expand All @@ -104,6 +106,7 @@ impl OptimizerRule for ExtractEquijoinPredicate {
join_constraint,
schema,
null_equals_null,
filter_pushdown_info,
})))
}
}
Expand Down
Loading

0 comments on commit 08df939

Please sign in to comment.