Skip to content

Commit

Permalink
Implement user defined planner for create_struct & `create_named_st…
Browse files Browse the repository at this point in the history
…ruct` (#11273)

* add UserDefinedSQLPlanner for create struct

* fix linting

* add create name struct user defined sql planner

* simplify

* refactor

* refactor

* remove named_struct from functions

* formatting

* revert 953ad31

* update docs
  • Loading branch information
dharanad authored Jul 8, 2024
1 parent 940efd3 commit 1e39a85
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 56 deletions.
15 changes: 15 additions & 0 deletions datafusion/expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,21 @@ pub trait UserDefinedSQLPlanner: Send + Sync {
fn plan_extract(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}

/// Plans a struct `struct(expression1[, ..., expression_n])`
/// literal based on the given input expressions.
/// This function takes a vector of expressions and a boolean flag indicating whether
/// the struct uses the optional name
///
/// Returns a `PlannerResult` containing either the planned struct expressions or the original
/// input expressions if planning is not possible.
fn plan_struct_literal(
&self,
args: Vec<Expr>,
_is_named_struct: bool,
) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}
}

/// An operator with two arguments to plan
Expand Down
1 change: 0 additions & 1 deletion datafusion/functions/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
nvl(),
nvl2(),
arrow_typeof(),
r#struct(),
named_struct(),
get_field(),
coalesce(),
Expand Down
19 changes: 19 additions & 0 deletions datafusion/functions/src/core/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

use datafusion_common::DFSchema;
use datafusion_common::Result;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::planner::{PlannerResult, RawDictionaryExpr, UserDefinedSQLPlanner};
use datafusion_expr::Expr;

use super::named_struct;

Expand All @@ -37,4 +39,21 @@ impl UserDefinedSQLPlanner for CoreFunctionPlanner {
}
Ok(PlannerResult::Planned(named_struct().call(args)))
}

fn plan_struct_literal(
&self,
args: Vec<Expr>,
is_named_struct: bool,
) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Planned(Expr::ScalarFunction(
ScalarFunction::new_udf(
if is_named_struct {
crate::core::named_struct()
} else {
crate::core::r#struct()
},
args,
),
)))
}
}
96 changes: 41 additions & 55 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use datafusion_expr::planner::PlannerResult;
use datafusion_expr::planner::RawDictionaryExpr;
use datafusion_expr::planner::RawFieldAccessExpr;
use sqlparser::ast::{
CastKind, DictionaryField, Expr as SQLExpr, Subscript, TrimWhereField, Value,
CastKind, DictionaryField, Expr as SQLExpr, StructField, Subscript, TrimWhereField,
Value,
};

use datafusion_common::{
Expand Down Expand Up @@ -597,7 +598,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}

SQLExpr::Struct { values, fields } => {
self.parse_struct(values, fields, schema, planner_context)
self.parse_struct(schema, planner_context, values, fields)
}
SQLExpr::Position { expr, r#in } => {
self.sql_position_to_expr(*expr, *r#in, schema, planner_context)
Expand Down Expand Up @@ -629,6 +630,36 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}

/// Parses a struct(..) expression and plans it creation
fn parse_struct(
&self,
schema: &DFSchema,
planner_context: &mut PlannerContext,
values: Vec<sqlparser::ast::Expr>,
fields: Vec<StructField>,
) -> Result<Expr> {
if !fields.is_empty() {
return not_impl_err!("Struct fields are not supported yet");
}
let is_named_struct = values
.iter()
.any(|value| matches!(value, SQLExpr::Named { .. }));

let mut create_struct_args = if is_named_struct {
self.create_named_struct_expr(values, schema, planner_context)?
} else {
self.create_struct_expr(values, schema, planner_context)?
};

for planner in self.planners.iter() {
match planner.plan_struct_literal(create_struct_args, is_named_struct)? {
PlannerResult::Planned(expr) => return Ok(expr),
PlannerResult::Original(args) => create_struct_args = args,
}
}
not_impl_err!("Struct not supported by UserDefinedExtensionPlanners: {create_struct_args:?}")
}

fn sql_position_to_expr(
&self,
substr_expr: SQLExpr,
Expand Down Expand Up @@ -683,37 +714,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
not_impl_err!("Unsupported dictionary literal: {raw_expr:?}")
}

/// Parses a struct(..) expression
fn parse_struct(
&self,
values: Vec<SQLExpr>,
fields: Vec<sqlparser::ast::StructField>,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
if !fields.is_empty() {
return not_impl_err!("Struct fields are not supported yet");
}

if values
.iter()
.any(|value| matches!(value, SQLExpr::Named { .. }))
{
self.create_named_struct(values, input_schema, planner_context)
} else {
self.create_struct(values, input_schema, planner_context)
}
}

// Handles a call to struct(...) where the arguments are named. For example
// `struct (v as foo, v2 as bar)` by creating a call to the `named_struct` function
fn create_named_struct(
fn create_named_struct_expr(
&self,
values: Vec<SQLExpr>,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let args = values
) -> Result<Vec<Expr>> {
Ok(values
.into_iter()
.enumerate()
.map(|(i, value)| {
Expand Down Expand Up @@ -742,47 +751,24 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect();

let named_struct_func = self
.context_provider
.get_function_meta("named_struct")
.ok_or_else(|| {
internal_datafusion_err!("Unable to find expected 'named_struct' function")
})?;

Ok(Expr::ScalarFunction(ScalarFunction::new_udf(
named_struct_func,
args,
)))
.collect())
}

// Handles a call to struct(...) where the arguments are not named. For example
// `struct (v, v2)` by creating a call to the `struct` function
// which will create a struct with fields named `c0`, `c1`, etc.
fn create_struct(
fn create_struct_expr(
&self,
values: Vec<SQLExpr>,
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let args = values
) -> Result<Vec<Expr>> {
values
.into_iter()
.map(|value| {
self.sql_expr_to_logical_expr(value, input_schema, planner_context)
})
.collect::<Result<Vec<_>>>()?;
let struct_func = self
.context_provider
.get_function_meta("struct")
.ok_or_else(|| {
internal_datafusion_err!("Unable to find expected 'struct' function")
})?;

Ok(Expr::ScalarFunction(ScalarFunction::new_udf(
struct_func,
args,
)))
.collect::<Result<Vec<_>>>()
}

fn sql_in_list_to_expr(
Expand Down

0 comments on commit 1e39a85

Please sign in to comment.