From bac49cfabe58ab2afce1fc6ce2d99bfe1db60712 Mon Sep 17 00:00:00 2001 From: Xin Li Date: Wed, 3 Jul 2024 14:11:27 +0800 Subject: [PATCH] Implement user defined planner for position --- .../core/src/execution/session_state.rs | 11 ++++-- datafusion/expr/src/planner.rs | 9 +++++ datafusion/functions/src/unicode/mod.rs | 1 + datafusion/functions/src/unicode/planner.rs | 36 +++++++++++++++++++ datafusion/sql/src/expr/mod.rs | 35 ++++++++---------- 5 files changed, 69 insertions(+), 23 deletions(-) create mode 100644 datafusion/functions/src/unicode/planner.rs diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index aa81d77cf682..0b3980dc5221 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -967,14 +967,19 @@ impl SessionState { let field_access_planner = Arc::new(functions_array::planner::FieldAccessPlanner) as _; - query + query = query .with_user_defined_planner(array_planner) .with_user_defined_planner(field_access_planner) } - #[cfg(not(feature = "array_expressions"))] + #[cfg(feature = "unicode_expressions")] { - query + let position_planner = + Arc::new(functions::unicode::planner::PositionPlanner::default()) as _; + + query = query.with_user_defined_planner(position_planner); } + + query } } diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs index c928ab39194d..d0f6bd8b8ae0 100644 --- a/datafusion/expr/src/planner.rs +++ b/datafusion/expr/src/planner.rs @@ -110,6 +110,15 @@ pub trait UserDefinedSQLPlanner: Send + Sync { ) -> Result>> { Ok(PlannerResult::Original(exprs)) } + + // Plan the POSITION expression, e.g., POSITION( in ) + // returns origin expression arguments if not possible + fn plan_position( + &self, + args: Vec + ) -> Result>> { + Ok(PlannerResult::Original(args)) + } } /// An operator with two arguments to plan diff --git a/datafusion/functions/src/unicode/mod.rs b/datafusion/functions/src/unicode/mod.rs index 9e8c07cd36ed..705637fd9b81 100644 --- a/datafusion/functions/src/unicode/mod.rs +++ b/datafusion/functions/src/unicode/mod.rs @@ -32,6 +32,7 @@ pub mod strpos; pub mod substr; pub mod substrindex; pub mod translate; +pub mod planner; // create UDFs make_udf_function!( diff --git a/datafusion/functions/src/unicode/planner.rs b/datafusion/functions/src/unicode/planner.rs new file mode 100644 index 000000000000..00a129eb472a --- /dev/null +++ b/datafusion/functions/src/unicode/planner.rs @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! SQL planning extensions like [`PositionPlanner`] + +use datafusion_common::Result; +use datafusion_expr::{ + expr::ScalarFunction, + planner::{PlannerResult, UserDefinedSQLPlanner}, + Expr, +}; + +#[derive(Default)] +pub struct PositionPlanner {} + +impl UserDefinedSQLPlanner for PositionPlanner { + fn plan_position(&self, args: Vec) -> Result>> { + Ok(PlannerResult::Planned(Expr::ScalarFunction( + ScalarFunction::new_udf(crate::unicode::strpos(), args), + ))) + } +} \ No newline at end of file diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 786ea288fa0e..612c34e25ab8 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -597,7 +597,21 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { self.parse_struct(values, fields, schema, planner_context) } SQLExpr::Position { expr, r#in } => { - self.sql_position_to_expr(*expr, *r#in, schema, planner_context) + let substr = + self.sql_expr_to_logical_expr(*expr, schema, planner_context)?; + let fullstr = self.sql_expr_to_logical_expr(*r#in, schema, planner_context)?; + let mut extract_args = vec![fullstr, substr]; + + for planner in self.planners.iter() { + match planner.plan_position(extract_args)? { + PlannerResult::Planned(expr) => return Ok(expr), + PlannerResult::Original(args) => { + extract_args = args; + } + } + } + + not_impl_err!("Position not supported by UserDefinedExtensionPlanners: {extract_args:?}") } SQLExpr::AtTimeZone { timestamp, @@ -889,25 +903,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, args))) } - fn sql_position_to_expr( - &self, - substr_expr: SQLExpr, - str_expr: SQLExpr, - schema: &DFSchema, - planner_context: &mut PlannerContext, - ) -> Result { - let fun = self - .context_provider - .get_function_meta("strpos") - .ok_or_else(|| { - internal_datafusion_err!("Unable to find expected 'strpos' function") - })?; - let substr = - self.sql_expr_to_logical_expr(substr_expr, schema, planner_context)?; - let fullstr = self.sql_expr_to_logical_expr(str_expr, schema, planner_context)?; - let args = vec![fullstr, substr]; - Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, args))) - } } #[cfg(test)]