-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
A new interface for Scalar Functions #7978
Conversation
/// `execute()` and `execute_raw()` are two possible alternative for function definition: | ||
/// If returns `false`, `execute()` will be used for execution; | ||
/// If returns `true`, `execute_raw()` will be called. | ||
fn use_execute_raw_instead(&self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rational for this:
built-in functions now have two kinds of implementation
execute()
-- It's a more common and easy-to-implement interface for UDFs, and can be converted to the more generalexecute_raw()
case usingmake_scalar_function()
execute_raw()
-- Fewer existing functions are directly implemented using this interface
Though a single execute_raw()
can cover all existing cases, this design can make the general case easier to implement for UDFs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have a single execute
method that takes ColumnarValue
s and put in the documentation how to go from ColumnarValue
--> ArrayRef
for simple, initial implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree we should make the initial implementation concise
} | ||
|
||
/// Defines the return type behavior of a function. | ||
pub enum FunctionReturnType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now like 99% of built-in functions are either SameAsFirstArg
or FixedType
, only very rare array functions can only be defined using lambda. This way can make the new interface a little bit easier to use.
(also possible to extend to address #7657)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about a signature like this:
pub trait ScalarFunctionDef: Any + Sync + Send + std::fmt::Debug {
...
/// What type will this function return, given arguments of the specified input types?
/// By default, returns the same type as the first argument
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
arg_types.get(0)
.ok_or_else(Error ("Implementation of Function {} did not specify a return type, and there are no arguments"))
}
...
}
Then I think most function implementations can be left as the default or as
impl ScalarFunctionDef for Foo {
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The returning enum FunctionReturnType
approach's advantage is being able to extend to solve #7657, otherwise we have to extend the function interface to address that issue (though I'm not sure if that requirement is common, should we consider that case?)
And its limitation is harder to use when the return type is actually some complex lambda, but only for very few array functions.
self.monotonicity() | ||
} | ||
|
||
// execution functions are defined in `BuiltinScalarFunctionWrapper` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All execution code for BuiltinScalarFunction
are in phys-expr
crate (which depends on this crate), so they're defined elsewhere
pub(crate) struct BuiltinScalarFunctionWrapper { | ||
func: Arc<dyn ScalarFunctionDef>, | ||
// functions like `now()` requires per-execution properties | ||
execution_props: ExecutionProps, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few functions like now()
require the core to pass some information to it, when migrating those functions, we can extend the trait ScalarFunctionDef
with a optional method set_execution_props(exec_props: ExecutionProps)
, as the mechanism to let core pass data to functions defined outside the core
fn monotonicity(&self) -> Option<FuncMonotonicity>; | ||
|
||
// =============================== | ||
// OPTIONAL METHODS START BELOW |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This trait consists of mandatory and optional methods, it can get a bit lengthy...
An alternative implementation is trait inheritance(e.g. pub trait ScalarFunctionExtended: ScalarFunctionDef
, it seems won't be much clearer than the current one.
We can add more docs/examples to make it more straightforward later
Thank you @2010YOUY01 -- I plan to review this carefully tomorrow. cc @viirya and others with whom we have talked about making functions more modular |
As somewhat of an aside, I implemented a user defined function today (a backwards compatible implementation of Example User Defined Function
//! Implementation of `to_timestamp` function that
//! overrides the built in version in DataFusion because the semantics changed
//! upstream: <https://github.com/apache/arrow-datafusion/pull/7844>
use std::sync::Arc;
use arrow::datatypes::DataType;
use arrow::datatypes::TimeUnit;
use datafusion::common::internal_err;
use datafusion::logical_expr::{ReturnTypeFunction, Signature};
use datafusion::physical_expr::datetime_expressions;
use datafusion::physical_expr::expressions::cast_column;
use datafusion::{
error::DataFusionError,
logical_expr::{ScalarFunctionImplementation, ScalarUDF, Volatility},
physical_plan::ColumnarValue,
};
use once_cell::sync::Lazy;
/// The name of the function
pub const TO_TIMESTAMP_FUNCTION_NAME: &str = "to_timestamp";
/// Implementation of to_timestamp
pub(crate) static TO_TIMESTAMP_UDF: Lazy<Arc<ScalarUDF>> = Lazy::new(|| {
Arc::new(ScalarUDF::new(
TO_TIMESTAMP_FUNCTION_NAME,
&Signature::uniform(
1,
vec![
DataType::Int64,
DataType::Timestamp(TimeUnit::Nanosecond, None),
DataType::Timestamp(TimeUnit::Microsecond, None),
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Timestamp(TimeUnit::Second, None),
DataType::Utf8,
],
Volatility::Immutable,
),
&TO_TIMESTAMP_RETURN_TYPE,
&TO_TIMESTAMP_IMPL,
))
});
static TO_TIMESTAMP_RETURN_TYPE: Lazy<ReturnTypeFunction> = Lazy::new(|| {
let func =
|_arg_types: &[DataType]| Ok(Arc::new(DataType::Timestamp(TimeUnit::Nanosecond, None)));
Arc::new(func)
});
static TO_TIMESTAMP_IMPL: Lazy<ScalarFunctionImplementation> = Lazy::new(|| {
let func = |args: &[ColumnarValue]| {
if args.len() != 1 {
return internal_err!("to_timestamp expected 1 argument, got {}", args.len());
}
match args[0].data_type() {
// call through to arrow cast kernel
DataType::Int64 | DataType::Timestamp(_, _) => cast_column(
&args[0],
&DataType::Timestamp(TimeUnit::Nanosecond, None),
None,
),
DataType::Utf8 => datetime_expressions::to_timestamp_nanos(args),
dt => internal_err!("to_timestamp does not support argument type '{dt}'"),
}
};
Arc::new(func)
});
// https://github.com/apache/arrow-datafusion/pull/7844 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @2010YOUY01 -- this is looking like a great first start.
One thing that came to mind when I was reviewing this PR was what about simply moving BuiltInScalarFunction
s entirely into the physical_expr
crate (and remove Expr::ScalarFunction
). We would have to extend ScalarUDF... but that would have the nice property of keeping things uniform
} | ||
|
||
/// Defines the return type behavior of a function. | ||
pub enum FunctionReturnType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about a signature like this:
pub trait ScalarFunctionDef: Any + Sync + Send + std::fmt::Debug {
...
/// What type will this function return, given arguments of the specified input types?
/// By default, returns the same type as the first argument
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
arg_types.get(0)
.ok_or_else(Error ("Implementation of Function {} did not specify a return type, and there are no arguments"))
}
...
}
Then I think most function implementations can be left as the default or as
impl ScalarFunctionDef for Foo {
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8)
}
}
@@ -150,6 +151,9 @@ pub enum Expr { | |||
Sort(Sort), | |||
/// Represents the call of a built-in scalar function with a set of arguments. | |||
ScalarFunction(ScalarFunction), | |||
/// Represents the call of a built-in scalar function with a set of arguments, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about calling this ScalarFunction2
or ScalarFunctionDyn
to try and describe the difference? It might be confusing to see ScalarFunction
and ScalarFunctionDef
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than add a parallel implementation I would love to just change
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct ScalarFunction {
/// The function
pub fun: built_in_function::BuiltinScalarFunction,
/// List of expressions to feed to the functions as arguments
pub args: Vec<Expr>,
}
to something like
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct ScalarFunction {
/// The function
pub fun: Arc<dyn ScalarFunctionDef>,
/// List of expressions to feed to the functions as arguments
pub args: Vec<Expr>,
}
use std::fmt; | ||
use std::fmt::Debug; | ||
use std::fmt::Formatter; | ||
use std::sync::Arc; | ||
|
||
// TODO(PR): add doc comments | ||
pub trait ScalarFunctionDef: Any + Sync + Send + std::fmt::Debug { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about calling this trait ScalarFunction
rather than ScalarFunctionDef
? I know there are already several other things called ScalarFunction
but that would also keep it in line with things like WindowFunction
https://docs.rs/datafusion/latest/datafusion/index.html?search=WindowFunction
fn as_any(&self) -> &dyn Any; | ||
|
||
// May return 1 or more name as aliasing | ||
fn name(&self) -> &[&str]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend name(&self) ->&str
that returns one name, and then a second API that returns optional aliases
/// returns any alias names this function is known by. Defaults to empty list
fn aliases(&self) -> &[&str] { &[] }
``
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach looks better 👍🏼
|
||
fn return_type(&self) -> FunctionReturnType; | ||
|
||
fn execute(&self, _args: &[ArrayRef]) -> Result<ArrayRef> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be
fn execute(&self, _args: &[ArrayRef]) -> Result<ArrayRef> { | |
fn execute(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> { |
/// `execute()` and `execute_raw()` are two possible alternative for function definition: | ||
/// If returns `false`, `execute()` will be used for execution; | ||
/// If returns `true`, `execute_raw()` will be called. | ||
fn use_execute_raw_instead(&self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have a single execute
method that takes ColumnarValue
s and put in the documentation how to go from ColumnarValue
--> ArrayRef
for simple, initial implementation
/// The function body (`execute()` in `ScalarFunctionDef`) now are all defined in | ||
/// `physical-expr` crate, so the new interface implementation are defined separately | ||
/// in `BuiltinScalarFunctionWrapper` | ||
impl ScalarFunctionDef for BuiltinScalarFunction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we'll be able to impl
this as long as BuiltInScalarFunction
is split across two crates.
Thank you for your review. Moving |
Thank you @2010YOUY01 -- I am pretty excited that this approach has the benefits of
Some steps that might get us there might be to make the fields of let udf = ScalarUDF::new(..)
.with_aliases(["foo", "bar"]); Over time, we could then even introduce a new trait API (and internally change how ScalarUDF is implemented) without making additional breaking changes. |
@2010YOUY01 what do you think about this PR: #8039 (make the fields non |
I think we have merged a version of this work now, thank you @2010YOUY01 for all the help with the planning and implementation |
Which issue does this PR close?
1.1 in #7977
Rationale for this change
See #7977
What changes are included in this PR?
ScalarFunctionDef
Expr
representation)impl ScalarFunctionDef for BuiltinScalarFunctions
), and replace SQL execution code for built-in functions with the new interfaceRemaining tasks
Are these changes tested?
Should be covered by existing SQL tests
Are there any user-facing changes?
New UDF interface