Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move make_array array_append array_prepend array_concat function to datafusion-functions-array crate #9343

Closed
wants to merge 13 commits into from
4 changes: 4 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 22 additions & 3 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ use datafusion_expr::{
logical_plan::{DdlStatement, Statement},
Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
};
#[cfg(feature = "array_expressions")]
use datafusion_functions_array::optimizer::analyzer::rewrite_expr::OperatorToFunction;
pub use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::var_provider::is_system_variables;
use parking_lot::RwLock;
Expand Down Expand Up @@ -105,6 +107,11 @@ use url::Url;
use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA};
use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::datasource::object_store::ObjectStoreUrl;
#[cfg(feature = "array_expressions")]
use datafusion_optimizer::analyzer::{
count_wildcard_rule::CountWildcardRule, inline_table_scan::InlineTableScan,
type_coercion::TypeCoercion,
};
use datafusion_optimizer::{
analyzer::{Analyzer, AnalyzerRule},
OptimizerConfig,
Expand Down Expand Up @@ -1398,10 +1405,22 @@ impl SessionState {
datafusion_functions::register_all(&mut new_self)
.expect("can not register built in functions");

// register crate of array expressions (if enabled)
// register crate of array expressions && add array analyzer rule (if enabled)
#[cfg(feature = "array_expressions")]
datafusion_functions_array::register_all(&mut new_self)
.expect("can not register array expressions");
{
datafusion_functions_array::register_all(&mut new_self)
.expect("can not register array expressions");
// we need keep the analyzer order
let rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>> = vec![
Arc::new(InlineTableScan::new()),
// OperatorToFunction should be run before TypeCoercion, since it rewrite based on the argument types (List or Scalar),
// and TypeCoercion may cast the argument types from Scalar to List.
Arc::new(OperatorToFunction::new()),
Arc::new(TypeCoercion::new()),
Arc::new(CountWildcardRule::new()),
];
new_self = new_self.with_analyzer_rules(rules);
}

new_self
}
Expand Down
109 changes: 0 additions & 109 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

//! Built-in functions module contains all the built-in functions definitions.

use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt;
use std::str::FromStr;
use std::sync::{Arc, OnceLock};

use crate::signature::TIMEZONE_WILDCARD;
use crate::type_coercion::binary::get_wider_type;
use crate::type_coercion::functions::data_types;
use crate::{
conditional_expressions, FuncMonotonicity, Signature, TypeSignature, Volatility,
Expand Down Expand Up @@ -114,12 +112,8 @@ pub enum BuiltinScalarFunction {
Cot,

// array functions
/// array_append
ArrayAppend,
/// array_sort
ArraySort,
/// array_concat
ArrayConcat,
/// array_has
ArrayHas,
/// array_has_all
Expand All @@ -146,8 +140,6 @@ pub enum BuiltinScalarFunction {
ArrayPosition,
/// array_positions
ArrayPositions,
/// array_prepend
ArrayPrepend,
/// array_remove
ArrayRemove,
/// array_remove_n
Expand Down Expand Up @@ -176,8 +168,6 @@ pub enum BuiltinScalarFunction {
Cardinality,
/// array_resize
ArrayResize,
/// construct an array from columns
MakeArray,
/// Flatten
Flatten,

Expand Down Expand Up @@ -388,9 +378,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Tan => Volatility::Immutable,
BuiltinScalarFunction::Tanh => Volatility::Immutable,
BuiltinScalarFunction::Trunc => Volatility::Immutable,
BuiltinScalarFunction::ArrayAppend => Volatility::Immutable,
BuiltinScalarFunction::ArraySort => Volatility::Immutable,
BuiltinScalarFunction::ArrayConcat => Volatility::Immutable,
BuiltinScalarFunction::ArrayEmpty => Volatility::Immutable,
BuiltinScalarFunction::ArrayHasAll => Volatility::Immutable,
BuiltinScalarFunction::ArrayHasAny => Volatility::Immutable,
Expand All @@ -405,7 +393,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayPopBack => Volatility::Immutable,
BuiltinScalarFunction::ArrayPosition => Volatility::Immutable,
BuiltinScalarFunction::ArrayPositions => Volatility::Immutable,
BuiltinScalarFunction::ArrayPrepend => Volatility::Immutable,
BuiltinScalarFunction::ArrayRepeat => Volatility::Immutable,
BuiltinScalarFunction::ArrayRemove => Volatility::Immutable,
BuiltinScalarFunction::ArrayRemoveN => Volatility::Immutable,
Expand All @@ -420,7 +407,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayUnion => Volatility::Immutable,
BuiltinScalarFunction::ArrayResize => Volatility::Immutable,
BuiltinScalarFunction::Cardinality => Volatility::Immutable,
BuiltinScalarFunction::MakeArray => Volatility::Immutable,
BuiltinScalarFunction::Ascii => Volatility::Immutable,
BuiltinScalarFunction::BitLength => Volatility::Immutable,
BuiltinScalarFunction::Btrim => Volatility::Immutable,
Expand Down Expand Up @@ -489,25 +475,6 @@ impl BuiltinScalarFunction {
}
}

/// Returns the dimension [`DataType`] of [`DataType::List`] if
/// treated as a N-dimensional array.
///
/// ## Examples:
///
/// * `Int64` has dimension 1
/// * `List(Int64)` has dimension 2
/// * `List(List(Int64))` has dimension 3
/// * etc.
fn return_dimension(self, input_expr_type: &DataType) -> u64 {
let mut result: u64 = 1;
let mut current_data_type = input_expr_type;
while let DataType::List(field) = current_data_type {
current_data_type = field.data_type();
result += 1;
}
result
}

/// Returns the output [`DataType`] of this function
///
/// This method should be invoked only after `input_expr_types` have been validated
Expand Down Expand Up @@ -540,38 +507,7 @@ impl BuiltinScalarFunction {
let data_type = get_base_type(&input_expr_types[0])?;
Ok(data_type)
}
BuiltinScalarFunction::ArrayAppend => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArraySort => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayConcat => {
let mut expr_type = Null;
let mut max_dims = 0;
for input_expr_type in input_expr_types {
match input_expr_type {
List(field) => {
if !field.data_type().equals_datatype(&Null) {
let dims = self.return_dimension(input_expr_type);
expr_type = match max_dims.cmp(&dims) {
Ordering::Greater => expr_type,
Ordering::Equal => {
get_wider_type(&expr_type, input_expr_type)?
}
Ordering::Less => {
max_dims = dims;
input_expr_type.clone()
}
};
}
}
_ => {
return plan_err!(
"The {self} function can only accept list as the args."
)
}
}
}

Ok(expr_type)
}
BuiltinScalarFunction::ArrayHasAll
| BuiltinScalarFunction::ArrayHasAny
| BuiltinScalarFunction::ArrayHas
Expand All @@ -596,7 +532,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayPositions => {
Ok(List(Arc::new(Field::new("item", UInt64, true))))
}
BuiltinScalarFunction::ArrayPrepend => Ok(input_expr_types[1].clone()),
BuiltinScalarFunction::ArrayRepeat => Ok(List(Arc::new(Field::new(
"item",
input_expr_types[0].clone(),
Expand Down Expand Up @@ -638,20 +573,6 @@ impl BuiltinScalarFunction {
}
}
BuiltinScalarFunction::Cardinality => Ok(UInt64),
BuiltinScalarFunction::MakeArray => match input_expr_types.len() {
0 => Ok(List(Arc::new(Field::new("item", Null, true)))),
_ => {
let mut expr_type = Null;
for input_expr_type in input_expr_types {
if !input_expr_type.equals_datatype(&Null) {
expr_type = input_expr_type.clone();
break;
}
}

Ok(List(Arc::new(Field::new("item", expr_type, true))))
}
},
BuiltinScalarFunction::Ascii => Ok(Int32),
BuiltinScalarFunction::BitLength => {
utf8_to_int_type(&input_expr_types[0], "bit_length")
Expand Down Expand Up @@ -892,18 +813,8 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArraySort => {
Signature::variadic_any(self.volatility())
}
BuiltinScalarFunction::ArrayAppend => {
Signature::array_and_element(self.volatility())
}
BuiltinScalarFunction::MakeArray => {
// 0 or more arguments of arbitrary type
Signature::one_of(vec![VariadicEqual, Any(0)], self.volatility())
}
BuiltinScalarFunction::ArrayPopFront => Signature::array(self.volatility()),
BuiltinScalarFunction::ArrayPopBack => Signature::array(self.volatility()),
BuiltinScalarFunction::ArrayConcat => {
Signature::variadic_any(self.volatility())
}
BuiltinScalarFunction::ArrayDims => Signature::array(self.volatility()),
BuiltinScalarFunction::ArrayEmpty => Signature::array(self.volatility()),
BuiltinScalarFunction::ArrayElement => {
Expand All @@ -928,9 +839,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayPositions => {
Signature::array_and_element(self.volatility())
}
BuiltinScalarFunction::ArrayPrepend => {
Signature::element_and_array(self.volatility())
}
BuiltinScalarFunction::ArrayRepeat => Signature::any(2, self.volatility()),
BuiltinScalarFunction::ArrayRemove => {
Signature::array_and_element(self.volatility())
Expand Down Expand Up @@ -1502,17 +1410,7 @@ impl BuiltinScalarFunction {
// other functions
BuiltinScalarFunction::ArrowTypeof => &["arrow_typeof"],

// array functions
BuiltinScalarFunction::ArrayAppend => &[
"array_append",
"list_append",
"array_push_back",
"list_push_back",
],
BuiltinScalarFunction::ArraySort => &["array_sort", "list_sort"],
BuiltinScalarFunction::ArrayConcat => {
&["array_concat", "array_cat", "list_concat", "list_cat"]
}
BuiltinScalarFunction::ArrayDims => &["array_dims", "list_dims"],
BuiltinScalarFunction::ArrayDistinct => &["array_distinct", "list_distinct"],
BuiltinScalarFunction::ArrayEmpty => &["empty"],
Expand Down Expand Up @@ -1544,12 +1442,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayPositions => {
&["array_positions", "list_positions"]
}
BuiltinScalarFunction::ArrayPrepend => &[
"array_prepend",
"list_prepend",
"array_push_front",
"list_push_front",
],
BuiltinScalarFunction::ArrayRepeat => &["array_repeat", "list_repeat"],
BuiltinScalarFunction::ArrayRemove => &["array_remove", "list_remove"],
BuiltinScalarFunction::ArrayRemoveN => &["array_remove_n", "list_remove_n"],
Expand All @@ -1568,7 +1460,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayUnion => &["array_union", "list_union"],
BuiltinScalarFunction::Cardinality => &["cardinality"],
BuiltinScalarFunction::ArrayResize => &["array_resize", "list_resize"],
BuiltinScalarFunction::MakeArray => &["make_array", "make_list"],
BuiltinScalarFunction::ArrayIntersect => {
&["array_intersect", "list_intersect"]
}
Expand Down
23 changes: 0 additions & 23 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,14 +573,6 @@ scalar_expr!(
scalar_expr!(Uuid, uuid, , "returns uuid v4 as a string value");
scalar_expr!(Log, log, base x, "logarithm of a `x` for a particular `base`");

// array functions
scalar_expr!(
ArrayAppend,
array_append,
array element,
"appends an element to the end of an array."
);

scalar_expr!(ArraySort, array_sort, array desc null_first, "returns sorted array.");

scalar_expr!(
Expand All @@ -597,7 +589,6 @@ scalar_expr!(
"returns the array without the first element."
);

nary_scalar_expr!(ArrayConcat, array_concat, "concatenates arrays.");
scalar_expr!(
ArrayHas,
array_has,
Expand Down Expand Up @@ -676,12 +667,6 @@ scalar_expr!(
array element,
"searches for an element in the array, returns all occurrences."
);
scalar_expr!(
ArrayPrepend,
array_prepend,
array element,
"prepends an element to the beginning of an array."
);
scalar_expr!(
ArrayRepeat,
array_repeat,
Expand Down Expand Up @@ -752,11 +737,6 @@ scalar_expr!(
"returns an array with the specified size filled with the given value."
);

nary_scalar_expr!(
MakeArray,
array,
"returns an Arrow array using the specified input expressions."
);
scalar_expr!(
ArrayIntersect,
array_intersect,
Expand Down Expand Up @@ -1410,7 +1390,6 @@ mod test {
test_scalar_expr!(DateBin, date_bin, stride, source, origin);
test_scalar_expr!(FromUnixtime, from_unixtime, unixtime);

test_scalar_expr!(ArrayAppend, array_append, array, element);
test_scalar_expr!(ArraySort, array_sort, array, desc, null_first);
test_scalar_expr!(ArrayPopFront, array_pop_front, array);
test_scalar_expr!(ArrayPopBack, array_pop_back, array);
Expand All @@ -1419,7 +1398,6 @@ mod test {
test_unary_scalar_expr!(ArrayNdims, array_ndims);
test_scalar_expr!(ArrayPosition, array_position, array, element, index);
test_scalar_expr!(ArrayPositions, array_positions, array, element);
test_scalar_expr!(ArrayPrepend, array_prepend, array, element);
test_scalar_expr!(ArrayRepeat, array_repeat, element, count);
test_scalar_expr!(ArrayRemove, array_remove, array, element);
test_scalar_expr!(ArrayRemoveN, array_remove_n, array, element, max);
Expand All @@ -1428,7 +1406,6 @@ mod test {
test_scalar_expr!(ArrayReplaceN, array_replace_n, array, from, to, max);
test_scalar_expr!(ArrayReplaceAll, array_replace_all, array, from, to);
test_unary_scalar_expr!(Cardinality, cardinality);
test_nary_scalar_expr!(MakeArray, array, input);

test_unary_scalar_expr!(ArrowTypeof, arrow_typeof);
test_nary_scalar_expr!(OverLay, overlay, string, characters, position, len);
Expand Down
4 changes: 4 additions & 0 deletions datafusion/functions-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@ path = "src/lib.rs"

[dependencies]
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-schema = { workspace = true }
datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-optimizer = { path = "../optimizer", version = "36.0.0", default-features = false }
log = "0.4.20"
paste = "1.0.14"
Loading
Loading