Skip to content

Commit

Permalink
refactor: split expression pf planner into one part. (#4783)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener authored Jan 3, 2023
1 parent 00a22cb commit 6c81c10
Show file tree
Hide file tree
Showing 12 changed files with 1,547 additions and 1,286 deletions.
67 changes: 67 additions & 0 deletions datafusion/sql/src/expr/binary_op.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::{DFSchema, DataFusionError, Result};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use sqlparser::ast::{BinaryOperator, Expr as SQLExpr};

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(crate) fn parse_sql_binary_op(
&self,
left: SQLExpr,
op: BinaryOperator,
right: SQLExpr,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let operator = match op {
BinaryOperator::Gt => Ok(Operator::Gt),
BinaryOperator::GtEq => Ok(Operator::GtEq),
BinaryOperator::Lt => Ok(Operator::Lt),
BinaryOperator::LtEq => Ok(Operator::LtEq),
BinaryOperator::Eq => Ok(Operator::Eq),
BinaryOperator::NotEq => Ok(Operator::NotEq),
BinaryOperator::Plus => Ok(Operator::Plus),
BinaryOperator::Minus => Ok(Operator::Minus),
BinaryOperator::Multiply => Ok(Operator::Multiply),
BinaryOperator::Divide => Ok(Operator::Divide),
BinaryOperator::Modulo => Ok(Operator::Modulo),
BinaryOperator::And => Ok(Operator::And),
BinaryOperator::Or => Ok(Operator::Or),
BinaryOperator::PGRegexMatch => Ok(Operator::RegexMatch),
BinaryOperator::PGRegexIMatch => Ok(Operator::RegexIMatch),
BinaryOperator::PGRegexNotMatch => Ok(Operator::RegexNotMatch),
BinaryOperator::PGRegexNotIMatch => Ok(Operator::RegexNotIMatch),
BinaryOperator::BitwiseAnd => Ok(Operator::BitwiseAnd),
BinaryOperator::BitwiseOr => Ok(Operator::BitwiseOr),
BinaryOperator::BitwiseXor => Ok(Operator::BitwiseXor),
BinaryOperator::PGBitwiseShiftRight => Ok(Operator::BitwiseShiftRight),
BinaryOperator::PGBitwiseShiftLeft => Ok(Operator::BitwiseShiftLeft),
BinaryOperator::StringConcat => Ok(Operator::StringConcat),
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported SQL binary operator {op:?}"
))),
}?;

Ok(Expr::BinaryExpr(BinaryExpr::new(
Box::new(self.sql_expr_to_logical_expr(left, schema, planner_context)?),
operator,
Box::new(self.sql_expr_to_logical_expr(right, schema, planner_context)?),
)))
}
}
225 changes: 225 additions & 0 deletions datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::utils::normalize_ident;
use datafusion_common::{DFSchema, DataFusionError, Result};
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_expr::{
expr, window_function, AggregateFunction, BuiltinScalarFunction, Expr, WindowFrame,
WindowFrameUnits, WindowFunction,
};
use sqlparser::ast::{
Expr as SQLExpr, Function as SQLFunction, FunctionArg, FunctionArgExpr,
};
use std::str::FromStr;

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(super) fn sql_function_to_expr(
&self,
mut function: SQLFunction,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let name = if function.name.0.len() > 1 {
// DF doesn't handle compound identifiers
// (e.g. "foo.bar") for function names yet
function.name.to_string()
} else {
normalize_ident(function.name.0[0].clone())
};

// next, scalar built-in
if let Ok(fun) = BuiltinScalarFunction::from_str(&name) {
let args = self.function_args_to_expr(function.args, schema)?;
return Ok(Expr::ScalarFunction { fun, args });
};

// then, window function
if let Some(window) = function.over.take() {
let partition_by = window
.partition_by
.into_iter()
.map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
.collect::<Result<Vec<_>>>()?;
let order_by = window
.order_by
.into_iter()
.map(|e| self.order_by_to_sort_expr(e, schema))
.collect::<Result<Vec<_>>>()?;
let window_frame = window
.window_frame
.as_ref()
.map(|window_frame| {
let window_frame: WindowFrame = window_frame.clone().try_into()?;
if WindowFrameUnits::Range == window_frame.units
&& order_by.len() != 1
{
Err(DataFusionError::Plan(format!(
"With window frame of type RANGE, the order by expression must be of length 1, got {}", order_by.len())))
} else {
Ok(window_frame)
}
})
.transpose()?;
let window_frame = if let Some(window_frame) = window_frame {
window_frame
} else {
WindowFrame::new(!order_by.is_empty())
};
let fun = self.find_window_func(&name)?;
let expr = match fun {
WindowFunction::AggregateFunction(aggregate_fun) => {
let (aggregate_fun, args) =
self.aggregate_fn_to_expr(aggregate_fun, function.args, schema)?;

Expr::WindowFunction(expr::WindowFunction::new(
WindowFunction::AggregateFunction(aggregate_fun),
args,
partition_by,
order_by,
window_frame,
))
}
_ => Expr::WindowFunction(expr::WindowFunction::new(
fun,
self.function_args_to_expr(function.args, schema)?,
partition_by,
order_by,
window_frame,
)),
};
return Ok(expr);
}

// next, aggregate built-ins
if let Ok(fun) = AggregateFunction::from_str(&name) {
let distinct = function.distinct;
let (fun, args) = self.aggregate_fn_to_expr(fun, function.args, schema)?;
return Ok(Expr::AggregateFunction(expr::AggregateFunction::new(
fun, args, distinct, None,
)));
};

// finally, user-defined functions (UDF) and UDAF
match self.schema_provider.get_function_meta(&name) {
Some(fm) => {
let args = self.function_args_to_expr(function.args, schema)?;

Ok(Expr::ScalarUDF { fun: fm, args })
}
None => match self.schema_provider.get_aggregate_meta(&name) {
Some(fm) => {
let args = self.function_args_to_expr(function.args, schema)?;
Ok(Expr::AggregateUDF {
fun: fm,
args,
filter: None,
})
}
_ => Err(DataFusionError::Plan(format!("Invalid function '{name}'"))),
},
}
}

pub(super) fn sql_named_function_to_expr(
&self,
expr: SQLExpr,
fun: BuiltinScalarFunction,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let args = vec![self.sql_expr_to_logical_expr(expr, schema, planner_context)?];
Ok(Expr::ScalarFunction { fun, args })
}

pub(super) fn find_window_func(&self, name: &str) -> Result<WindowFunction> {
window_function::find_df_window_func(name)
.or_else(|| {
self.schema_provider
.get_aggregate_meta(name)
.map(WindowFunction::AggregateUDF)
})
.ok_or_else(|| {
DataFusionError::Plan(format!("There is no window function named {name}"))
})
}

fn sql_fn_arg_to_logical_expr(
&self,
sql: FunctionArg,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
match sql {
FunctionArg::Named {
name: _,
arg: FunctionArgExpr::Expr(arg),
} => self.sql_expr_to_logical_expr(arg, schema, planner_context),
FunctionArg::Named {
name: _,
arg: FunctionArgExpr::Wildcard,
} => Ok(Expr::Wildcard),
FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
self.sql_expr_to_logical_expr(arg, schema, planner_context)
}
FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => Ok(Expr::Wildcard),
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported qualified wildcard argument: {sql:?}"
))),
}
}

pub(super) fn function_args_to_expr(
&self,
args: Vec<FunctionArg>,
schema: &DFSchema,
) -> Result<Vec<Expr>> {
args.into_iter()
.map(|a| {
self.sql_fn_arg_to_logical_expr(a, schema, &mut PlannerContext::new())
})
.collect::<Result<Vec<Expr>>>()
}

pub(super) fn aggregate_fn_to_expr(
&self,
fun: AggregateFunction,
args: Vec<FunctionArg>,
schema: &DFSchema,
) -> Result<(AggregateFunction, Vec<Expr>)> {
let args = match fun {
// Special case rewrite COUNT(*) to COUNT(constant)
AggregateFunction::Count => args
.into_iter()
.map(|a| match a {
FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
Ok(Expr::Literal(COUNT_STAR_EXPANSION.clone()))
}
_ => self.sql_fn_arg_to_logical_expr(
a,
schema,
&mut PlannerContext::new(),
),
})
.collect::<Result<Vec<Expr>>>()?,
_ => self.function_args_to_expr(args, schema)?,
};

Ok((fun, args))
}
}
84 changes: 84 additions & 0 deletions datafusion/sql/src/expr/grouping_set.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use datafusion_common::{DFSchema, DataFusionError, Result};
use datafusion_expr::{Expr, GroupingSet};
use sqlparser::ast::Expr as SQLExpr;

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub(super) fn sql_grouping_sets_to_expr(
&self,
exprs: Vec<Vec<SQLExpr>>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let args: Result<Vec<Vec<_>>> = exprs
.into_iter()
.map(|v| {
v.into_iter()
.map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
.collect()
})
.collect();
Ok(Expr::GroupingSet(GroupingSet::GroupingSets(args?)))
}

pub(super) fn sql_rollup_to_expr(
&self,
exprs: Vec<Vec<SQLExpr>>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let args: Result<Vec<_>> = exprs
.into_iter()
.map(|v| {
if v.len() != 1 {
Err(DataFusionError::Internal(
"Tuple expressions are not supported for Rollup expressions"
.to_string(),
))
} else {
self.sql_expr_to_logical_expr(v[0].clone(), schema, planner_context)
}
})
.collect();
Ok(Expr::GroupingSet(GroupingSet::Rollup(args?)))
}

pub(super) fn sql_cube_to_expr(
&self,
exprs: Vec<Vec<SQLExpr>>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let args: Result<Vec<_>> = exprs
.into_iter()
.map(|v| {
if v.len() != 1 {
Err(DataFusionError::Internal(
"Tuple expressions not are supported for Cube expressions"
.to_string(),
))
} else {
self.sql_expr_to_logical_expr(v[0].clone(), schema, planner_context)
}
})
.collect();
Ok(Expr::GroupingSet(GroupingSet::Cube(args?)))
}
}
Loading

0 comments on commit 6c81c10

Please sign in to comment.