Skip to content

Commit

Permalink
remove optimize()
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener committed Dec 14, 2022
1 parent 0bdaabb commit 7dab4c3
Show file tree
Hide file tree
Showing 24 changed files with 116 additions and 278 deletions.
10 changes: 5 additions & 5 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ impl OptimizerRule for MyRule {
"my_rule"
}

fn optimize(
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
) -> Result<Option<LogicalPlan>> {
// recurse down and optimize children first
let plan = utils::optimize_children(self, plan, _config)?;

Expand All @@ -86,12 +86,12 @@ impl OptimizerRule for MyRule {
let mut expr_rewriter = MyExprRewriter {};
let predicate = filter.predicate().clone();
let predicate = predicate.rewrite(&mut expr_rewriter)?;
Ok(LogicalPlan::Filter(Filter::try_new(
Ok(Some(LogicalPlan::Filter(Filter::try_new(
predicate,
filter.input().clone(),
)?))
)?)))
}
_ => Ok(plan.clone()),
_ => Ok(Some(plan.clone())),
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions datafusion/core/tests/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,11 @@ impl QueryPlanner for TopKQueryPlanner {
struct TopKOptimizerRule {}
impl OptimizerRule for TopKOptimizerRule {
// Example rewrite pass to insert a user defined LogicalPlanNode
fn optimize(
fn try_optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
) -> Result<Option<LogicalPlan>> {
// Note: this code simply looks for the pattern of a Limit followed by a
// Sort and replaces it by a TopK node. It does not handle many
// edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
Expand All @@ -304,20 +304,22 @@ impl OptimizerRule for TopKOptimizerRule {
{
if expr.len() == 1 {
// we found a sort with a single sort expr, replace with a a TopK
return Ok(LogicalPlan::Extension(Extension {
return Ok(Some(LogicalPlan::Extension(Extension {
node: Arc::new(TopKPlanNode {
k: *fetch,
input: self.optimize(input.as_ref(), optimizer_config)?,
input: self
.try_optimize(input.as_ref(), optimizer_config)?
.unwrap_or_else(|| input.as_ref().clone()),
expr: expr[0].clone(),
}),
}));
})));
}
}
}

// If we didn't find the Limit/Sort combination, recurse as
// normal and build the result.
optimize_children(self, plan, optimizer_config)
Ok(Some(optimize_children(self, plan, optimizer_config)?))
}

fn name(&self) -> &str {
Expand Down
18 changes: 6 additions & 12 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,6 @@ impl CommonSubexprEliminate {
}

impl OptimizerRule for CommonSubexprEliminate {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
Expand Down Expand Up @@ -595,7 +585,8 @@ mod test {
fn assert_optimized_plan_eq(expected: &str, plan: &LogicalPlan) {
let optimizer = CommonSubexprEliminate {};
let optimized_plan = optimizer
.optimize(plan, &mut OptimizerConfig::new())
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(expected, formatted_plan);
Expand Down Expand Up @@ -839,7 +830,10 @@ mod test {
.build()
.unwrap();
let rule = CommonSubexprEliminate {};
let optimized_plan = rule.optimize(&plan, &mut OptimizerConfig::new()).unwrap();
let optimized_plan = rule
.try_optimize(&plan, &mut OptimizerConfig::new())
.unwrap()
.unwrap();

let schema = optimized_plan.schema();
let fields_with_datatypes: Vec<_> = schema
Expand Down
10 changes: 0 additions & 10 deletions datafusion/optimizer/src/decorrelate_where_exists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,6 @@ impl DecorrelateWhereExists {
}

impl OptimizerRule for DecorrelateWhereExists {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
Expand Down
10 changes: 0 additions & 10 deletions datafusion/optimizer/src/decorrelate_where_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,6 @@ impl DecorrelateWhereIn {
}

impl OptimizerRule for DecorrelateWhereIn {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
Expand Down
21 changes: 6 additions & 15 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,10 @@ impl EliminateCrossJoin {
/// This fix helps to improve the performance of TPCH Q19. issue#78
///
impl OptimizerRule for EliminateCrossJoin {
fn optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, _optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
Expand All @@ -91,7 +81,7 @@ impl OptimizerRule for EliminateCrossJoin {
return Ok(Some(utils::optimize_children(
self,
plan,
_optimizer_config,
optimizer_config,
)?));
}
}
Expand All @@ -112,7 +102,7 @@ impl OptimizerRule for EliminateCrossJoin {
)?;
}

left = utils::optimize_children(self, &left, _optimizer_config)?;
left = utils::optimize_children(self, &left, optimizer_config)?;

if plan.schema() != left.schema() {
left = LogicalPlan::Projection(Projection::new_from_schema(
Expand Down Expand Up @@ -141,7 +131,7 @@ impl OptimizerRule for EliminateCrossJoin {
_ => Ok(Some(utils::optimize_children(
self,
plan,
_optimizer_config,
optimizer_config,
)?)),
}
}
Expand Down Expand Up @@ -399,7 +389,8 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: Vec<&str>) {
let rule = EliminateCrossJoin::new();
let optimized_plan = rule
.optimize(plan, &mut OptimizerConfig::new())
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let formatted = optimized_plan.display_indent_schema().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
Expand Down
19 changes: 5 additions & 14 deletions datafusion/optimizer/src/eliminate_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,10 @@ impl EliminateFilter {
}

impl OptimizerRule for EliminateFilter {
fn optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, _optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let predicate_and_input = match plan {
LogicalPlan::Filter(filter) => match filter.predicate() {
Expand All @@ -63,7 +53,7 @@ impl OptimizerRule for EliminateFilter {
};

match predicate_and_input {
Some((true, input)) => self.try_optimize(input, _optimizer_config),
Some((true, input)) => self.try_optimize(input, optimizer_config),
Some((false, input)) => Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: input.schema().clone(),
Expand All @@ -73,7 +63,7 @@ impl OptimizerRule for EliminateFilter {
Ok(Some(utils::optimize_children(
self,
plan,
_optimizer_config,
optimizer_config,
)?))
}
}
Expand All @@ -93,7 +83,8 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
let rule = EliminateFilter::new();
let optimized_plan = rule
.optimize(plan, &mut OptimizerConfig::new())
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
25 changes: 9 additions & 16 deletions datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,10 @@ impl EliminateLimit {
}

impl OptimizerRule for EliminateLimit {
fn optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, _optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
_optimizer_config: &mut OptimizerConfig,
optimizer_config: &mut OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
if let LogicalPlan::Limit(limit) = plan {
match limit.fetch {
Expand All @@ -68,7 +58,7 @@ impl OptimizerRule for EliminateLimit {
return Ok(Some(utils::optimize_children(
self,
input,
_optimizer_config,
optimizer_config,
)?));
}
}
Expand All @@ -77,7 +67,7 @@ impl OptimizerRule for EliminateLimit {
Ok(Some(utils::optimize_children(
self,
plan,
_optimizer_config,
optimizer_config,
)?))
}

Expand All @@ -100,7 +90,8 @@ mod tests {

fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
let optimized_plan = EliminateLimit::new()
.optimize(plan, &mut OptimizerConfig::new())
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand All @@ -113,10 +104,12 @@ mod tests {
expected: &str,
) -> Result<()> {
let optimized_plan = PushDownLimit::new()
.optimize(plan, &mut OptimizerConfig::new())
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let optimized_plan = EliminateLimit::new()
.optimize(&optimized_plan, &mut OptimizerConfig::new())
.try_optimize(&optimized_plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
13 changes: 2 additions & 11 deletions datafusion/optimizer/src/eliminate_outer_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,6 @@ impl EliminateOuterJoin {

/// Attempt to eliminate outer joins.
impl OptimizerRule for EliminateOuterJoin {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
Expand Down Expand Up @@ -330,7 +320,8 @@ mod tests {
fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> {
let rule = EliminateOuterJoin::new();
let optimized_plan = rule
.optimize(plan, &mut OptimizerConfig::new())
.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan");
let formatted_plan = format!("{:?}", optimized_plan);
assert_eq!(formatted_plan, expected);
Expand Down
13 changes: 2 additions & 11 deletions datafusion/optimizer/src/filter_null_join_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,6 @@ use std::sync::Arc;
pub struct FilterNullJoinKeys {}

impl OptimizerRule for FilterNullJoinKeys {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<LogicalPlan> {
Ok(self
.try_optimize(plan, optimizer_config)?
.unwrap_or_else(|| plan.clone()))
}

fn try_optimize(
&self,
plan: &LogicalPlan,
Expand Down Expand Up @@ -165,7 +155,8 @@ mod tests {

fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
let rule = FilterNullJoinKeys::default();
rule.optimize(plan, &mut OptimizerConfig::new())
rule.try_optimize(plan, &mut OptimizerConfig::new())
.unwrap()
.expect("failed to optimize plan")
}

Expand Down
Loading

0 comments on commit 7dab4c3

Please sign in to comment.