Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FOLLOWUP: remove optimize() #4619

Merged
merged 1 commit into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ impl OptimizerRule for MyRule {
"my_rule"
}

fn optimize(
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
) -> Result<Option<LogicalPlan>> {
// recurse down and optimize children first
let plan = utils::optimize_children(self, plan, _config)?;

Expand All @@ -86,12 +86,12 @@ impl OptimizerRule for MyRule {
let mut expr_rewriter = MyExprRewriter {};
let predicate = filter.predicate().clone();
let predicate = predicate.rewrite(&mut expr_rewriter)?;
Ok(LogicalPlan::Filter(Filter::try_new(
Ok(Some(LogicalPlan::Filter(Filter::try_new(
predicate,
filter.input().clone(),
)?))
)?)))
}
_ => Ok(plan.clone()),
_ => Ok(Some(plan.clone())),
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions datafusion/core/tests/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,11 @@ impl QueryPlanner for TopKQueryPlanner {
struct TopKOptimizerRule {}
impl OptimizerRule for TopKOptimizerRule {
// Example rewrite pass to insert a user defined LogicalPlanNode
fn optimize(
fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
) -> Result<Option<LogicalPlan>> {
// Note: this code simply looks for the pattern of a Limit followed by a
// Sort and replaces it by a TopK node. It does not handle many
// edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
Expand All @@ -304,20 +304,22 @@ impl OptimizerRule for TopKOptimizerRule {
{
if expr.len() == 1 {
// we found a sort with a single sort expr, replace with a a TopK
return Ok(LogicalPlan::Extension(Extension {
return Ok(Some(LogicalPlan::Extension(Extension {
node: Arc::new(TopKPlanNode {
k: *fetch,
input: self.optimize(input.as_ref(), optimizer_config)?,
input: self
.try_optimize(input.as_ref(), optimizer_config)?
.unwrap_or_else(|| input.as_ref().clone()),
expr: expr[0].clone(),
}),
}));
})));
}
}
}

// If we didn't find the Limit/Sort combination, recurse as
// normal and build the result.
optimize_children(self, plan, optimizer_config)
Ok(Some(optimize_children(self, plan, optimizer_config)?))
}

fn name(&self) -> &str {
Expand Down
18 changes: 6 additions & 12 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,6 @@ impl CommonSubexprEliminate {
}

impl OptimizerRule for CommonSubexprEliminate {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
Expand Down Expand Up @@ -595,7 +585,8 @@ mod test {
fn assert_optimized_plan_eq(expected: &str, plan: &LogicalPlan) {
let optimizer = CommonSubexprEliminate {};
let optimized_plan = optimizer
.optimize(plan, &mut OptimizerConfig::new())
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(expected, formatted_plan);
Expand Down Expand Up @@ -839,7 +830,10 @@ mod test {
.build()
.unwrap();
let rule = CommonSubexprEliminate {};
let optimized_plan = rule.optimize(&plan, &mut OptimizerConfig::new()).unwrap();
let optimized_plan = rule
.try_optimize(&plan, &mut OptimizerConfig::new())
.unwrap()
.unwrap();

let schema = optimized_plan.schema();
let fields_with_datatypes: Vec<_> = schema
Expand Down
10 changes: 0 additions & 10 deletions datafusion/optimizer/src/decorrelate_where_exists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,6 @@ impl DecorrelateWhereExists {
}

impl OptimizerRule for DecorrelateWhereExists {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
Expand Down
10 changes: 0 additions & 10 deletions datafusion/optimizer/src/decorrelate_where_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,6 @@ impl DecorrelateWhereIn {
}

impl OptimizerRule for DecorrelateWhereIn {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
Expand Down
21 changes: 6 additions & 15 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,10 @@ impl EliminateCrossJoin {
/// This fix helps to improve the performance of TPCH Q19. issue#78
///
impl OptimizerRule for EliminateCrossJoin {
fn optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, _optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
Expand All @@ -91,7 +81,7 @@ impl OptimizerRule for EliminateCrossJoin {
return Ok(Some(utils::optimize_children(
self,
plan,
_optimizer_config,
optimizer_config,
)?));
}
}
Expand All @@ -112,7 +102,7 @@ impl OptimizerRule for EliminateCrossJoin {
)?;
}

left = utils::optimize_children(self, &left, _optimizer_config)?;
left = utils::optimize_children(self, &left, optimizer_config)?;

if plan.schema() != left.schema() {
left = LogicalPlan::Projection(Projection::new_from_schema(
Expand Down Expand Up @@ -141,7 +131,7 @@ impl OptimizerRule for EliminateCrossJoin {
_ => Ok(Some(utils::optimize_children(
self,
plan,
_optimizer_config,
optimizer_config,
)?)),
}
}
Expand Down Expand Up @@ -399,7 +389,8 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: Vec<&str>) {
let rule = EliminateCrossJoin::new();
let optimized_plan = rule
.optimize(plan, &mut OptimizerConfig::new())
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let formatted = optimized_plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
Expand Down
19 changes: 5 additions & 14 deletions datafusion/optimizer/src/eliminate_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,10 @@ impl EliminateFilter {
}

impl OptimizerRule for EliminateFilter {
fn optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, _optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let predicate_and_input = match plan {
LogicalPlan::Filter(filter) => match filter.predicate() {
Expand All @@ -63,7 +53,7 @@ impl OptimizerRule for EliminateFilter {
};

match predicate_and_input {
Some((true, input)) => self.try_optimize(input, _optimizer_config),
Some((true, input)) => self.try_optimize(input, optimizer_config),
Some((false, input)) => Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: input.schema().clone(),
Expand All @@ -73,7 +63,7 @@ impl OptimizerRule for EliminateFilter {
Ok(Some(utils::optimize_children(
self,
plan,
_optimizer_config,
optimizer_config,
)?))
}
}
Expand All @@ -93,7 +83,8 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = EliminateFilter::new();
let optimized_plan = rule
.optimize(plan, &mut OptimizerConfig::new())
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
19 changes: 6 additions & 13 deletions datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@ impl EliminateLimit {
}

impl OptimizerRule for EliminateLimit {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
Expand Down Expand Up @@ -100,7 +90,8 @@ mod tests {

fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
let optimized_plan = EliminateLimit::new()
.optimize(plan, &mut OptimizerConfig::new())
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand All @@ -113,10 +104,12 @@ mod tests {
expected: &str,
) -> Result<()> {
let optimized_plan = PushDownLimit::new()
.optimize(plan, &mut OptimizerConfig::new())
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let optimized_plan = EliminateLimit::new()
.optimize(&optimized_plan, &mut OptimizerConfig::new())
.try_optimize(&optimized_plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
13 changes: 2 additions & 11 deletions datafusion/optimizer/src/eliminate_outer_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,6 @@ impl EliminateOuterJoin {

/// Attempt to eliminate outer joins.
impl OptimizerRule for EliminateOuterJoin {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
Expand Down Expand Up @@ -330,7 +320,8 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
let rule = EliminateOuterJoin::new();
let optimized_plan = rule
.optimize(plan, &mut OptimizerConfig::new())
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
13 changes: 2 additions & 11 deletions datafusion/optimizer/src/filter_null_join_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,6 @@ use std::sync::Arc;
pub struct FilterNullJoinKeys {}

impl OptimizerRule for FilterNullJoinKeys {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
Expand Down Expand Up @@ -165,7 +155,8 @@ mod tests {

fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
let rule = FilterNullJoinKeys::default();
rule.optimize(plan, &mut OptimizerConfig::new())
rule.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan")
}

Expand Down
19 changes: 5 additions & 14 deletions datafusion/optimizer/src/inline_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,10 @@ impl InlineTableScan {
}

impl OptimizerRule for InlineTableScan {
fn optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, _optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
// Match only on scans without filter / projection / fetch
Expand All @@ -63,7 +53,7 @@ impl OptimizerRule for InlineTableScan {
if let Some(sub_plan) = source.get_logical_plan() {
// Recursively apply optimization
let plan =
utils::optimize_children(self, sub_plan, _optimizer_config)?;
utils::optimize_children(self, sub_plan, optimizer_config)?;
let plan = LogicalPlanBuilder::from(plan)
.project(vec![Expr::Wildcard])?
.alias(table_name)?;
Expand All @@ -80,7 +70,7 @@ impl OptimizerRule for InlineTableScan {
Ok(Some(utils::optimize_children(
self,
plan,
_optimizer_config,
optimizer_config,
)?))
}
}
Expand Down Expand Up @@ -168,7 +158,8 @@ mod tests {
let plan = scan.filter(col("x.a").eq(lit(1))).unwrap().build().unwrap();

let optimized_plan = rule
.optimize(&plan, &mut OptimizerConfig::new())
.try_optimize(&plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
let expected = "Filter: x.a = Int32(1)\
Expand Down
Loading