diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 416df5d17f25..d2fe44f1f7c1 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1243,9 +1243,13 @@ name = "datafusion-functions-array" version = "36.0.0" dependencies = [ "arrow", + "arrow-array", + "arrow-buffer", + "arrow-schema", "datafusion-common", "datafusion-execution", "datafusion-expr", + "datafusion-optimizer", "log", "paste", ] diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 3aa4edfe3adc..7af7500d45b9 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -47,6 +47,8 @@ use datafusion_expr::{ logical_plan::{DdlStatement, Statement}, Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF, }; +#[cfg(feature = "array_expressions")] +use datafusion_functions_array::optimizer::analyzer::rewrite_expr::OperatorToFunction; pub use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::var_provider::is_system_variables; use parking_lot::RwLock; @@ -105,6 +107,11 @@ use url::Url; use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA}; use crate::catalog::listing_schema::ListingSchemaProvider; use crate::datasource::object_store::ObjectStoreUrl; +#[cfg(feature = "array_expressions")] +use datafusion_optimizer::analyzer::{ + count_wildcard_rule::CountWildcardRule, inline_table_scan::InlineTableScan, + type_coercion::TypeCoercion, +}; use datafusion_optimizer::{ analyzer::{Analyzer, AnalyzerRule}, OptimizerConfig, @@ -1398,10 +1405,22 @@ impl SessionState { datafusion_functions::register_all(&mut new_self) .expect("can not register built in functions"); - // register crate of array expressions (if enabled) + // register crate of array expressions && add array analyzer rule (if enabled) #[cfg(feature = "array_expressions")] - datafusion_functions_array::register_all(&mut new_self) - .expect("can not register array expressions"); + { + datafusion_functions_array::register_all(&mut new_self) + .expect("can not register array expressions"); + // we need keep the analyzer order + let rules: Vec> = vec![ + Arc::new(InlineTableScan::new()), + // OperatorToFunction should be run before TypeCoercion, since it rewrite based on the argument types (List or Scalar), + // and TypeCoercion may cast the argument types from Scalar to List. + Arc::new(OperatorToFunction::new()), + Arc::new(TypeCoercion::new()), + Arc::new(CountWildcardRule::new()), + ]; + new_self = new_self.with_analyzer_rules(rules); + } new_self } diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index b7f089846a11..6c6c28ffd22e 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -17,14 +17,12 @@ //! Built-in functions module contains all the built-in functions definitions. -use std::cmp::Ordering; use std::collections::HashMap; use std::fmt; use std::str::FromStr; use std::sync::{Arc, OnceLock}; use crate::signature::TIMEZONE_WILDCARD; -use crate::type_coercion::binary::get_wider_type; use crate::type_coercion::functions::data_types; use crate::{ conditional_expressions, FuncMonotonicity, Signature, TypeSignature, Volatility, @@ -114,12 +112,8 @@ pub enum BuiltinScalarFunction { Cot, // array functions - /// array_append - ArrayAppend, /// array_sort ArraySort, - /// array_concat - ArrayConcat, /// array_has ArrayHas, /// array_has_all @@ -146,8 +140,6 @@ pub enum BuiltinScalarFunction { ArrayPosition, /// array_positions ArrayPositions, - /// array_prepend - ArrayPrepend, /// array_remove ArrayRemove, /// array_remove_n @@ -176,8 +168,6 @@ pub enum BuiltinScalarFunction { Cardinality, /// array_resize ArrayResize, - /// construct an array from columns - MakeArray, /// Flatten Flatten, @@ -388,9 +378,7 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::Tan => Volatility::Immutable, BuiltinScalarFunction::Tanh => Volatility::Immutable, BuiltinScalarFunction::Trunc => Volatility::Immutable, - BuiltinScalarFunction::ArrayAppend => Volatility::Immutable, BuiltinScalarFunction::ArraySort => Volatility::Immutable, - BuiltinScalarFunction::ArrayConcat => Volatility::Immutable, BuiltinScalarFunction::ArrayEmpty => Volatility::Immutable, BuiltinScalarFunction::ArrayHasAll => Volatility::Immutable, BuiltinScalarFunction::ArrayHasAny => Volatility::Immutable, @@ -405,7 +393,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayPopBack => Volatility::Immutable, BuiltinScalarFunction::ArrayPosition => Volatility::Immutable, BuiltinScalarFunction::ArrayPositions => Volatility::Immutable, - BuiltinScalarFunction::ArrayPrepend => Volatility::Immutable, BuiltinScalarFunction::ArrayRepeat => Volatility::Immutable, BuiltinScalarFunction::ArrayRemove => Volatility::Immutable, BuiltinScalarFunction::ArrayRemoveN => Volatility::Immutable, @@ -420,7 +407,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayUnion => Volatility::Immutable, BuiltinScalarFunction::ArrayResize => Volatility::Immutable, BuiltinScalarFunction::Cardinality => Volatility::Immutable, - BuiltinScalarFunction::MakeArray => Volatility::Immutable, BuiltinScalarFunction::Ascii => Volatility::Immutable, BuiltinScalarFunction::BitLength => Volatility::Immutable, BuiltinScalarFunction::Btrim => Volatility::Immutable, @@ -489,25 +475,6 @@ impl BuiltinScalarFunction { } } - /// Returns the dimension [`DataType`] of [`DataType::List`] if - /// treated as a N-dimensional array. - /// - /// ## Examples: - /// - /// * `Int64` has dimension 1 - /// * `List(Int64)` has dimension 2 - /// * `List(List(Int64))` has dimension 3 - /// * etc. - fn return_dimension(self, input_expr_type: &DataType) -> u64 { - let mut result: u64 = 1; - let mut current_data_type = input_expr_type; - while let DataType::List(field) = current_data_type { - current_data_type = field.data_type(); - result += 1; - } - result - } - /// Returns the output [`DataType`] of this function /// /// This method should be invoked only after `input_expr_types` have been validated @@ -540,38 +507,7 @@ impl BuiltinScalarFunction { let data_type = get_base_type(&input_expr_types[0])?; Ok(data_type) } - BuiltinScalarFunction::ArrayAppend => Ok(input_expr_types[0].clone()), BuiltinScalarFunction::ArraySort => Ok(input_expr_types[0].clone()), - BuiltinScalarFunction::ArrayConcat => { - let mut expr_type = Null; - let mut max_dims = 0; - for input_expr_type in input_expr_types { - match input_expr_type { - List(field) => { - if !field.data_type().equals_datatype(&Null) { - let dims = self.return_dimension(input_expr_type); - expr_type = match max_dims.cmp(&dims) { - Ordering::Greater => expr_type, - Ordering::Equal => { - get_wider_type(&expr_type, input_expr_type)? - } - Ordering::Less => { - max_dims = dims; - input_expr_type.clone() - } - }; - } - } - _ => { - return plan_err!( - "The {self} function can only accept list as the args." - ) - } - } - } - - Ok(expr_type) - } BuiltinScalarFunction::ArrayHasAll | BuiltinScalarFunction::ArrayHasAny | BuiltinScalarFunction::ArrayHas @@ -596,7 +532,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayPositions => { Ok(List(Arc::new(Field::new("item", UInt64, true)))) } - BuiltinScalarFunction::ArrayPrepend => Ok(input_expr_types[1].clone()), BuiltinScalarFunction::ArrayRepeat => Ok(List(Arc::new(Field::new( "item", input_expr_types[0].clone(), @@ -638,20 +573,6 @@ impl BuiltinScalarFunction { } } BuiltinScalarFunction::Cardinality => Ok(UInt64), - BuiltinScalarFunction::MakeArray => match input_expr_types.len() { - 0 => Ok(List(Arc::new(Field::new("item", Null, true)))), - _ => { - let mut expr_type = Null; - for input_expr_type in input_expr_types { - if !input_expr_type.equals_datatype(&Null) { - expr_type = input_expr_type.clone(); - break; - } - } - - Ok(List(Arc::new(Field::new("item", expr_type, true)))) - } - }, BuiltinScalarFunction::Ascii => Ok(Int32), BuiltinScalarFunction::BitLength => { utf8_to_int_type(&input_expr_types[0], "bit_length") @@ -892,18 +813,8 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArraySort => { Signature::variadic_any(self.volatility()) } - BuiltinScalarFunction::ArrayAppend => { - Signature::array_and_element(self.volatility()) - } - BuiltinScalarFunction::MakeArray => { - // 0 or more arguments of arbitrary type - Signature::one_of(vec![VariadicEqual, Any(0)], self.volatility()) - } BuiltinScalarFunction::ArrayPopFront => Signature::array(self.volatility()), BuiltinScalarFunction::ArrayPopBack => Signature::array(self.volatility()), - BuiltinScalarFunction::ArrayConcat => { - Signature::variadic_any(self.volatility()) - } BuiltinScalarFunction::ArrayDims => Signature::array(self.volatility()), BuiltinScalarFunction::ArrayEmpty => Signature::array(self.volatility()), BuiltinScalarFunction::ArrayElement => { @@ -928,9 +839,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayPositions => { Signature::array_and_element(self.volatility()) } - BuiltinScalarFunction::ArrayPrepend => { - Signature::element_and_array(self.volatility()) - } BuiltinScalarFunction::ArrayRepeat => Signature::any(2, self.volatility()), BuiltinScalarFunction::ArrayRemove => { Signature::array_and_element(self.volatility()) @@ -1502,17 +1410,7 @@ impl BuiltinScalarFunction { // other functions BuiltinScalarFunction::ArrowTypeof => &["arrow_typeof"], - // array functions - BuiltinScalarFunction::ArrayAppend => &[ - "array_append", - "list_append", - "array_push_back", - "list_push_back", - ], BuiltinScalarFunction::ArraySort => &["array_sort", "list_sort"], - BuiltinScalarFunction::ArrayConcat => { - &["array_concat", "array_cat", "list_concat", "list_cat"] - } BuiltinScalarFunction::ArrayDims => &["array_dims", "list_dims"], BuiltinScalarFunction::ArrayDistinct => &["array_distinct", "list_distinct"], BuiltinScalarFunction::ArrayEmpty => &["empty"], @@ -1544,12 +1442,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayPositions => { &["array_positions", "list_positions"] } - BuiltinScalarFunction::ArrayPrepend => &[ - "array_prepend", - "list_prepend", - "array_push_front", - "list_push_front", - ], BuiltinScalarFunction::ArrayRepeat => &["array_repeat", "list_repeat"], BuiltinScalarFunction::ArrayRemove => &["array_remove", "list_remove"], BuiltinScalarFunction::ArrayRemoveN => &["array_remove_n", "list_remove_n"], @@ -1568,7 +1460,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayUnion => &["array_union", "list_union"], BuiltinScalarFunction::Cardinality => &["cardinality"], BuiltinScalarFunction::ArrayResize => &["array_resize", "list_resize"], - BuiltinScalarFunction::MakeArray => &["make_array", "make_list"], BuiltinScalarFunction::ArrayIntersect => { &["array_intersect", "list_intersect"] } diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 63f3af8868bb..efc538aee655 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -573,14 +573,6 @@ scalar_expr!( scalar_expr!(Uuid, uuid, , "returns uuid v4 as a string value"); scalar_expr!(Log, log, base x, "logarithm of a `x` for a particular `base`"); -// array functions -scalar_expr!( - ArrayAppend, - array_append, - array element, - "appends an element to the end of an array." -); - scalar_expr!(ArraySort, array_sort, array desc null_first, "returns sorted array."); scalar_expr!( @@ -597,7 +589,6 @@ scalar_expr!( "returns the array without the first element." ); -nary_scalar_expr!(ArrayConcat, array_concat, "concatenates arrays."); scalar_expr!( ArrayHas, array_has, @@ -676,12 +667,6 @@ scalar_expr!( array element, "searches for an element in the array, returns all occurrences." ); -scalar_expr!( - ArrayPrepend, - array_prepend, - array element, - "prepends an element to the beginning of an array." -); scalar_expr!( ArrayRepeat, array_repeat, @@ -752,11 +737,6 @@ scalar_expr!( "returns an array with the specified size filled with the given value." ); -nary_scalar_expr!( - MakeArray, - array, - "returns an Arrow array using the specified input expressions." -); scalar_expr!( ArrayIntersect, array_intersect, @@ -1410,7 +1390,6 @@ mod test { test_scalar_expr!(DateBin, date_bin, stride, source, origin); test_scalar_expr!(FromUnixtime, from_unixtime, unixtime); - test_scalar_expr!(ArrayAppend, array_append, array, element); test_scalar_expr!(ArraySort, array_sort, array, desc, null_first); test_scalar_expr!(ArrayPopFront, array_pop_front, array); test_scalar_expr!(ArrayPopBack, array_pop_back, array); @@ -1419,7 +1398,6 @@ mod test { test_unary_scalar_expr!(ArrayNdims, array_ndims); test_scalar_expr!(ArrayPosition, array_position, array, element, index); test_scalar_expr!(ArrayPositions, array_positions, array, element); - test_scalar_expr!(ArrayPrepend, array_prepend, array, element); test_scalar_expr!(ArrayRepeat, array_repeat, element, count); test_scalar_expr!(ArrayRemove, array_remove, array, element); test_scalar_expr!(ArrayRemoveN, array_remove_n, array, element, max); @@ -1428,7 +1406,6 @@ mod test { test_scalar_expr!(ArrayReplaceN, array_replace_n, array, from, to, max); test_scalar_expr!(ArrayReplaceAll, array_replace_all, array, from, to); test_unary_scalar_expr!(Cardinality, cardinality); - test_nary_scalar_expr!(MakeArray, array, input); test_unary_scalar_expr!(ArrowTypeof, arrow_typeof); test_nary_scalar_expr!(OverLay, overlay, string, characters, position, len); diff --git a/datafusion/functions-array/Cargo.toml b/datafusion/functions-array/Cargo.toml index 9cf769bf294e..4f2b19461ffa 100644 --- a/datafusion/functions-array/Cargo.toml +++ b/datafusion/functions-array/Cargo.toml @@ -38,8 +38,12 @@ path = "src/lib.rs" [dependencies] arrow = { workspace = true } +arrow-array = { workspace = true } +arrow-buffer = { workspace = true } +arrow-schema = { workspace = true } datafusion-common = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-optimizer = { path = "../optimizer", version = "36.0.0", default-features = false } log = "0.4.20" paste = "1.0.14" diff --git a/datafusion/functions-array/src/kernels.rs b/datafusion/functions-array/src/kernels.rs index b9a68b466605..2326e220de34 100644 --- a/datafusion/functions-array/src/kernels.rs +++ b/datafusion/functions-array/src/kernels.rs @@ -18,15 +18,26 @@ //! implementation kernels for array functions use arrow::array::{ - Array, ArrayRef, BooleanArray, Float32Array, Float64Array, GenericListArray, - Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, OffsetSizeTrait, - StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, + Array, ArrayData, ArrayRef, BooleanArray, Capacities, Float32Array, Float64Array, + GenericListArray, Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, + MutableArrayData, OffsetSizeTrait, StringArray, UInt16Array, UInt32Array, + UInt64Array, UInt8Array, }; +use arrow::buffer::OffsetBuffer; +use arrow::compute; use arrow::datatypes::DataType; +use arrow_array::{new_null_array, ListArray, NullArray}; +use arrow_buffer::{BooleanBufferBuilder, NullBuffer}; +use arrow_schema::Field; use datafusion_common::cast::{ - as_int64_array, as_large_list_array, as_list_array, as_string_array, + as_generic_list_array, as_int64_array, as_large_list_array, as_list_array, + as_string_array, +}; +use datafusion_common::utils::{array_into_list_array, list_ndims}; +use datafusion_common::{ + exec_err, not_impl_err, plan_err, DataFusionError, Result, ScalarValue, }; -use datafusion_common::{exec_err, DataFusionError}; +use datafusion_expr::{ColumnarValue, ScalarFunctionImplementation}; use std::any::type_name; use std::sync::Arc; macro_rules! downcast_arg { @@ -254,9 +265,6 @@ pub(super) fn array_to_string(args: &[ArrayRef]) -> datafusion_common::Result` +/// representing the resulting ListArray after the operation. +/// +/// # Arguments +/// +/// * `list_array` - A reference to the ListArray to which elements will be appended/prepended. +/// * `element_array` - A reference to the Array containing elements to be appended/prepended. +/// * `field` - A reference to the Field describing the data type of the arrays. +/// * `is_append` - A boolean flag indicating whether to append (`true`) or prepend (`false`) elements. +/// +/// # Examples +/// +/// generic_append_and_prepend( +/// [1, 2, 3], 4, append => [1, 2, 3, 4] +/// 5, [6, 7, 8], prepend => [5, 6, 7, 8] +/// ) +fn generic_append_and_prepend( + list_array: &GenericListArray, + element_array: &ArrayRef, + data_type: &DataType, + is_append: bool, +) -> Result +where + i64: TryInto, +{ + let mut offsets = vec![O::usize_as(0)]; + let values = list_array.values(); + let original_data = values.to_data(); + let element_data = element_array.to_data(); + let capacity = Capacities::Array(original_data.len() + element_data.len()); + + let mut mutable = MutableArrayData::with_capacities( + vec![&original_data, &element_data], + false, + capacity, + ); + + let values_index = 0; + let element_index = 1; + + for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() { + let start = offset_window[0].to_usize().unwrap(); + let end = offset_window[1].to_usize().unwrap(); + if is_append { + mutable.extend(values_index, start, end); + mutable.extend(element_index, row_index, row_index + 1); + } else { + mutable.extend(element_index, row_index, row_index + 1); + mutable.extend(values_index, start, end); + } + offsets.push(offsets[row_index] + O::usize_as(end - start + 1)); + } + + let data = mutable.freeze(); + + Ok(Arc::new(GenericListArray::::try_new( + Arc::new(Field::new("item", data_type.to_owned(), true)), + OffsetBuffer::new(offsets.into()), + arrow_array::make_array(data), + None, + )?)) +} + +fn check_datatypes(name: &str, args: &[&ArrayRef]) -> Result<()> { + let data_type = args[0].data_type(); + if !args.iter().all(|arg| { + arg.data_type().equals_datatype(data_type) + || arg.data_type().equals_datatype(&DataType::Null) + }) { + let types = args.iter().map(|arg| arg.data_type()).collect::>(); + return plan_err!("{name} received incompatible types: '{types:?}'."); + } + + Ok(()) +} + +fn general_append_and_prepend( + args: &[ArrayRef], + is_append: bool, +) -> Result +where + i64: TryInto, +{ + let (list_array, element_array) = if is_append { + let list_array = as_generic_list_array::(&args[0])?; + let element_array = &args[1]; + check_datatypes("array_append", &[element_array, list_array.values()])?; + (list_array, element_array) + } else { + let list_array = as_generic_list_array::(&args[1])?; + let element_array = &args[0]; + check_datatypes("array_prepend", &[list_array.values(), element_array])?; + (list_array, element_array) + }; + + let res = match list_array.value_type() { + DataType::List(_) => concat_internal::(args)?, + DataType::LargeList(_) => concat_internal::(args)?, + data_type => { + return generic_append_and_prepend::( + list_array, + element_array, + &data_type, + is_append, + ); + } + }; + + Ok(res) +} + +/// Convert one or more [`ArrayRef`] of the same type into a +/// `ListArray` or 'LargeListArray' depending on the offset size. +/// +/// # Example (non nested) +/// +/// Calling `array(col1, col2)` where col1 and col2 are non nested +/// would return a single new `ListArray`, where each row was a list +/// of 2 elements: +/// +/// ```text +/// ┌─────────┐ ┌─────────┐ ┌──────────────┐ +/// │ ┌─────┐ │ │ ┌─────┐ │ │ ┌──────────┐ │ +/// │ │ A │ │ │ │ X │ │ │ │ [A, X] │ │ +/// │ ├─────┤ │ │ ├─────┤ │ │ ├──────────┤ │ +/// │ │NULL │ │ │ │ Y │ │──────────▶│ │[NULL, Y] │ │ +/// │ ├─────┤ │ │ ├─────┤ │ │ ├──────────┤ │ +/// │ │ C │ │ │ │ Z │ │ │ │ [C, Z] │ │ +/// │ └─────┘ │ │ └─────┘ │ │ └──────────┘ │ +/// └─────────┘ └─────────┘ └──────────────┘ +/// col1 col2 output +/// ``` +/// +/// # Example (nested) +/// +/// Calling `array(col1, col2)` where col1 and col2 are lists +/// would return a single new `ListArray`, where each row was a list +/// of the corresponding elements of col1 and col2. +/// +/// ``` text +/// ┌──────────────┐ ┌──────────────┐ ┌─────────────────────────────┐ +/// │ ┌──────────┐ │ │ ┌──────────┐ │ │ ┌────────────────────────┐ │ +/// │ │ [A, X] │ │ │ │ [] │ │ │ │ [[A, X], []] │ │ +/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────┤ │ +/// │ │[NULL, Y] │ │ │ │[Q, R, S] │ │───────▶│ │ [[NULL, Y], [Q, R, S]] │ │ +/// │ ├──────────┤ │ │ ├──────────┤ │ │ ├────────────────────────│ │ +/// │ │ [C, Z] │ │ │ │ NULL │ │ │ │ [[C, Z], NULL] │ │ +/// │ └──────────┘ │ │ └──────────┘ │ │ └────────────────────────┘ │ +/// └──────────────┘ └──────────────┘ └─────────────────────────────┘ +/// col1 col2 output +/// ``` +fn array_array( + args: &[ArrayRef], + data_type: DataType, +) -> Result { + // do not accept 0 arguments. + if args.is_empty() { + return plan_err!("Array requires at least one argument"); + } + + let mut data = vec![]; + let mut total_len = 0; + for arg in args { + let arg_data = if arg.as_any().is::() { + ArrayData::new_empty(&data_type) + } else { + arg.to_data() + }; + total_len += arg_data.len(); + data.push(arg_data); + } + + let mut offsets: Vec = Vec::with_capacity(total_len); + offsets.push(O::usize_as(0)); + + let capacity = Capacities::Array(total_len); + let data_ref = data.iter().collect::>(); + let mut mutable = MutableArrayData::with_capacities(data_ref, true, capacity); + + let num_rows = args[0].len(); + for row_idx in 0..num_rows { + for (arr_idx, arg) in args.iter().enumerate() { + if !arg.as_any().is::() + && !arg.is_null(row_idx) + && arg.is_valid(row_idx) + { + mutable.extend(arr_idx, row_idx, row_idx + 1); + } else { + mutable.extend_nulls(1); + } + } + offsets.push(O::usize_as(mutable.len())); + } + let data = mutable.freeze(); + + Ok(Arc::new(GenericListArray::::try_new( + Arc::new(Field::new("item", data_type, true)), + OffsetBuffer::new(offsets.into()), + arrow_array::make_array(data), + None, + )?)) +} + +/// Array_append SQL function +pub(crate) fn array_append(args: &[ArrayRef]) -> Result { + if args.len() != 2 { + return exec_err!("array_append expects two arguments"); + } + + match args[0].data_type() { + DataType::LargeList(_) => general_append_and_prepend::(args, true), + _ => general_append_and_prepend::(args, true), + } +} + +/// Array_prepend SQL function +pub fn array_prepend(args: &[ArrayRef]) -> Result { + if args.len() != 2 { + return exec_err!("array_prepend expects two arguments"); + } + + match args[1].data_type() { + DataType::LargeList(_) => general_append_and_prepend::(args, false), + _ => general_append_and_prepend::(args, false), + } +} + +/// Array_concat/Array_cat SQL function +pub fn array_concat(args: &[ArrayRef]) -> Result { + if args.is_empty() { + return exec_err!("array_concat expects at least one arguments"); + } + + let mut new_args = vec![]; + for arg in args { + let ndim = list_ndims(arg.data_type()); + let base_type = datafusion_common::utils::base_type(arg.data_type()); + if ndim == 0 { + return not_impl_err!("Array is not type '{base_type:?}'."); + } + if !base_type.eq(&DataType::Null) { + new_args.push(arg.clone()); + } + } + + match &args[0].data_type() { + DataType::LargeList(_) => concat_internal::(new_args.as_slice()), + _ => concat_internal::(new_args.as_slice()), + } +} + +/// `make_array` SQL function +pub fn make_array(arrays: &[ArrayRef]) -> Result { + let mut data_type = DataType::Null; + for arg in arrays { + let arg_data_type = arg.data_type(); + if !arg_data_type.equals_datatype(&DataType::Null) { + data_type = arg_data_type.clone(); + break; + } + } + + match data_type { + // Either an empty array or all nulls: + DataType::Null => { + let array = + new_null_array(&DataType::Null, arrays.iter().map(|a| a.len()).sum()); + Ok(Arc::new(array_into_list_array(array))) + } + DataType::LargeList(..) => array_array::(arrays, data_type), + _ => array_array::(arrays, data_type), + } +} + +fn concat_internal(args: &[ArrayRef]) -> Result { + let args = align_array_dimensions::(args.to_vec())?; + + let list_arrays = args + .iter() + .map(|arg| as_generic_list_array::(arg)) + .collect::>>()?; + // Assume number of rows is the same for all arrays + let row_count = list_arrays[0].len(); + + let mut array_lengths = vec![]; + let mut arrays = vec![]; + let mut valid = BooleanBufferBuilder::new(row_count); + for i in 0..row_count { + let nulls = list_arrays + .iter() + .map(|arr| arr.is_null(i)) + .collect::>(); + + // If all the arrays are null, the concatenated array is null + let is_null = nulls.iter().all(|&x| x); + if is_null { + array_lengths.push(0); + valid.append(false); + } else { + // Get all the arrays on i-th row + let values = list_arrays + .iter() + .map(|arr| arr.value(i)) + .collect::>(); + + let elements = values + .iter() + .map(|a| a.as_ref()) + .collect::>(); + + // Concatenated array on i-th row + let concated_array = compute::concat(elements.as_slice())?; + array_lengths.push(concated_array.len()); + arrays.push(concated_array); + valid.append(true); + } + } + // Assume all arrays have the same data type + let data_type = list_arrays[0].value_type(); + let buffer = valid.finish(); + + let elements = arrays + .iter() + .map(|a| a.as_ref()) + .collect::>(); + + let list_arr = GenericListArray::::new( + Arc::new(Field::new("item", data_type, true)), + OffsetBuffer::from_lengths(array_lengths), + Arc::new(compute::concat(elements.as_slice())?), + Some(NullBuffer::new(buffer)), + ); + + Ok(Arc::new(list_arr)) +} + +fn align_array_dimensions( + args: Vec, +) -> Result> { + let args_ndim = args + .iter() + .map(|arg| datafusion_common::utils::list_ndims(arg.data_type())) + .collect::>(); + let max_ndim = args_ndim.iter().max().unwrap_or(&0); + + // Align the dimensions of the arrays + let aligned_args: Result> = args + .into_iter() + .zip(args_ndim.iter()) + .map(|(array, ndim)| { + if ndim < max_ndim { + let mut aligned_array = array.clone(); + for _ in 0..(max_ndim - ndim) { + let data_type = aligned_array.data_type().to_owned(); + let array_lengths = vec![1; aligned_array.len()]; + let offsets = OffsetBuffer::::from_lengths(array_lengths); + + aligned_array = Arc::new(GenericListArray::::try_new( + Arc::new(Field::new("item", data_type, true)), + offsets, + aligned_array, + None, + )?) + } + Ok(aligned_array) + } else { + Ok(array.clone()) + } + }) + .collect(); + + aligned_args +} + +pub(crate) fn make_scalar_function_with_hints(inner: F) -> ScalarFunctionImplementation +where + F: Fn(&[ArrayRef]) -> Result + Sync + Send + 'static, +{ + Arc::new(move |args: &[ColumnarValue]| { + // first, identify if any of the arguments is an Array. If yes, store its `len`, + // as any scalar will need to be converted to an array of len `len`. + let len = args + .iter() + .fold(Option::::None, |acc, arg| match arg { + ColumnarValue::Scalar(_) => acc, + ColumnarValue::Array(a) => Some(a.len()), + }); + + let is_scalar = len.is_none(); + + let args = ColumnarValue::values_to_arrays(args)?; + + let result = (inner)(&args); + + if is_scalar { + // If all inputs are scalar, keeps output as scalar + let result = result.and_then(|arr| ScalarValue::try_from_array(&arr, 0)); + result.map(ColumnarValue::Scalar) + } else { + result.map(ColumnarValue::Array) + } + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::Int64Type; + use arrow_array::ListArray; + + /// Only test internal functions, array-related sql functions will be tested in sqllogictest `array.slt` + #[test] + fn test_align_array_dimensions() { + let array1d_1 = + Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(3)]), + Some(vec![Some(4), Some(5)]), + ])); + let array1d_2 = + Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(6), Some(7), Some(8)]), + ])); + + let array2d_1 = Arc::new(array_into_list_array(array1d_1.clone())) as ArrayRef; + let array2d_2 = Arc::new(array_into_list_array(array1d_2.clone())) as ArrayRef; + + let res = align_array_dimensions::(vec![ + array1d_1.to_owned(), + array2d_2.to_owned(), + ]) + .unwrap(); + + let expected = as_list_array(&array2d_1).unwrap(); + let expected_dim = datafusion_common::utils::list_ndims(array2d_1.data_type()); + assert_ne!(as_list_array(&res[0]).unwrap(), expected); + assert_eq!( + datafusion_common::utils::list_ndims(res[0].data_type()), + expected_dim + ); + + let array3d_1 = Arc::new(array_into_list_array(array2d_1)) as ArrayRef; + let array3d_2 = array_into_list_array(array2d_2.to_owned()); + let res = + align_array_dimensions::(vec![array1d_1, Arc::new(array3d_2.clone())]) + .unwrap(); + + let expected = as_list_array(&array3d_1).unwrap(); + let expected_dim = datafusion_common::utils::list_ndims(array3d_1.data_type()); + assert_ne!(as_list_array(&res[0]).unwrap(), expected); + assert_eq!( + datafusion_common::utils::list_ndims(res[0].data_type()), + expected_dim + ); + } +} diff --git a/datafusion/functions-array/src/lib.rs b/datafusion/functions-array/src/lib.rs index e3515ccf9f72..5aaef52faf4d 100644 --- a/datafusion/functions-array/src/lib.rs +++ b/datafusion/functions-array/src/lib.rs @@ -27,6 +27,7 @@ #[macro_use] pub mod macros; +pub mod optimizer; mod kernels; mod udf; @@ -39,8 +40,12 @@ use std::sync::Arc; /// Fluent-style API for creating `Expr`s pub mod expr_fn { + pub use super::udf::array_append; + pub use super::udf::array_concat; + pub use super::udf::array_prepend; pub use super::udf::array_to_string; pub use super::udf::gen_series; + pub use super::udf::make_array; pub use super::udf::range; } @@ -48,6 +53,10 @@ pub mod expr_fn { pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> { let functions: Vec> = vec![ udf::array_to_string_udf(), + udf::array_append_udf(), + udf::array_prepend_udf(), + udf::array_concat_udf(), + udf::make_array_udf(), udf::range_udf(), udf::gen_series_udf(), ]; diff --git a/datafusion/functions-array/src/macros.rs b/datafusion/functions-array/src/macros.rs index c503fde05b18..13a97f5bf3ab 100644 --- a/datafusion/functions-array/src/macros.rs +++ b/datafusion/functions-array/src/macros.rs @@ -56,6 +56,37 @@ macro_rules! make_udf_function { )) } + /// Singleton instance of [`$UDF`], ensures the UDF is only created once + /// named STATIC_$(UDF). For example `STATIC_ArrayToString` + #[allow(non_upper_case_globals)] + static [< STATIC_ $UDF >]: std::sync::OnceLock> = + std::sync::OnceLock::new(); + + /// ScalarFunction that returns a [`ScalarUDF`] for [`$UDF`] + /// + /// [`ScalarUDF`]: datafusion_expr::ScalarUDF + pub fn $SCALAR_UDF_FN() -> std::sync::Arc { + [< STATIC_ $UDF >] + .get_or_init(|| { + std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl( + <$UDF>::new(), + )) + }) + .clone() + } + } + }; + ($UDF:ty, $EXPR_FN:ident, $DOC:expr , $SCALAR_UDF_FN:ident) => { + paste::paste! { + // "fluent expr_fn" style function + #[doc = $DOC] + pub fn $EXPR_FN(arg: Vec) -> Expr { + Expr::ScalarFunction(ScalarFunction::new_udf( + $SCALAR_UDF_FN(), + arg, + )) + } + /// Singleton instance of [`$UDF`], ensures the UDF is only created once /// named STATIC_$(UDF). For example `STATIC_ArrayToString` #[allow(non_upper_case_globals)] diff --git a/datafusion/functions-array/src/optimizer/analyzer/mod.rs b/datafusion/functions-array/src/optimizer/analyzer/mod.rs new file mode 100644 index 000000000000..4ace2fa63664 --- /dev/null +++ b/datafusion/functions-array/src/optimizer/analyzer/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod rewrite_expr; diff --git a/datafusion/optimizer/src/analyzer/rewrite_expr.rs b/datafusion/functions-array/src/optimizer/analyzer/rewrite_expr.rs similarity index 62% rename from datafusion/optimizer/src/analyzer/rewrite_expr.rs rename to datafusion/functions-array/src/optimizer/analyzer/rewrite_expr.rs index eedfc40a7f80..1bc3661285d6 100644 --- a/datafusion/optimizer/src/analyzer/rewrite_expr.rs +++ b/datafusion/functions-array/src/optimizer/analyzer/rewrite_expr.rs @@ -28,12 +28,14 @@ use datafusion_common::Result; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::expr_rewriter::rewrite_preserving_name; use datafusion_expr::utils::merge_schema; -use datafusion_expr::BuiltinScalarFunction; use datafusion_expr::Operator; use datafusion_expr::ScalarFunctionDefinition; use datafusion_expr::{BinaryExpr, Expr, LogicalPlan}; +use datafusion_optimizer::analyzer::AnalyzerRule; -use super::AnalyzerRule; +use crate::udf::array_append; +use crate::udf::array_concat; +use crate::udf::array_prepend; #[derive(Default)] pub struct OperatorToFunction {} @@ -103,7 +105,7 @@ impl TreeNodeRewriter for OperatorToFunctionRewriter { op, ref right, }) => { - if let Some(fun) = rewrite_array_concat_operator_to_func_for_column( + if let Some(expr) = rewrite_array_concat_operator_to_func_for_column( left.as_ref(), op, right.as_ref(), @@ -116,13 +118,7 @@ impl TreeNodeRewriter for OperatorToFunctionRewriter { right.as_ref(), ) }) { - // Convert &Box -> Expr - let left = (**left).clone(); - let right = (**right).clone(); - return Ok(Expr::ScalarFunction(ScalarFunction { - func_def: ScalarFunctionDefinition::BuiltIn(fun), - args: vec![left, right], - })); + return Ok(expr); } Ok(expr) @@ -147,7 +143,7 @@ fn rewrite_array_concat_operator_to_func( left: &Expr, op: Operator, right: &Expr, -) -> Option { +) -> Option { // Convert `Array StringConcat Array` to ScalarFunction::ArrayConcat if op != Operator::StringConcat { @@ -159,97 +155,65 @@ fn rewrite_array_concat_operator_to_func( // (arry concat, array append, array prepend) || array -> array concat ( Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayConcat), + func_def: ScalarFunctionDefinition::UDF(left_fun), args: _left_args, }), Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray), + func_def: ScalarFunctionDefinition::UDF(right_fun), args: _right_args, }), - ) - | ( - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayAppend), - args: _left_args, - }), - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray), - args: _right_args, - }), - ) - | ( - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayPrepend), - args: _left_args, - }), - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray), - args: _right_args, - }), - ) => Some(BuiltinScalarFunction::ArrayConcat), + ) if ["array_append", "array_prepend", "array_concat"] + .contains(&left_fun.name()) + && right_fun.name() == "make_array" => + { + Some(array_concat(vec![left.clone(), right.clone()])) + } // Chain concat operator (a || b) || scalar, // (arry concat, array append, array prepend) || scalar -> array append ( Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayConcat), - args: _left_args, - }), - _scalar, - ) - | ( - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayAppend), + func_def: ScalarFunctionDefinition::UDF(left_fun), args: _left_args, }), _scalar, - ) - | ( - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayPrepend), - args: _left_args, - }), - _scalar, - ) => Some(BuiltinScalarFunction::ArrayAppend), + ) if ["array_append", "array_prepend", "array_concat"] + .contains(&left_fun.name()) => + { + Some(array_append(left.clone(), right.clone())) + } // array || array -> array concat ( Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray), + func_def: ScalarFunctionDefinition::UDF(left_fun), args: _left_args, }), Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray), + func_def: ScalarFunctionDefinition::UDF(right_fun), args: _right_args, }), - ) => Some(BuiltinScalarFunction::ArrayConcat), + ) if left_fun.name() == "make_array" && right_fun.name() == "make_array" => { + Some(array_concat(vec![left.clone(), right.clone()])) + } // array || scalar -> array append ( Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray), + func_def: ScalarFunctionDefinition::UDF(left_fun), args: _left_args, }), _right_scalar, - ) => Some(BuiltinScalarFunction::ArrayAppend), + ) if left_fun.name() == "make_array" => { + Some(array_append(left.clone(), right.clone())) + } // scalar || array -> array prepend ( _left_scalar, Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::MakeArray), + func_def: ScalarFunctionDefinition::UDF(right_fun), args: _right_args, }), - ) => Some(BuiltinScalarFunction::ArrayPrepend), + ) if right_fun.name() == "make_array" => { + Some(array_prepend(left.clone(), right.clone())) + } _ => None, } @@ -265,7 +229,7 @@ fn rewrite_array_concat_operator_to_func_for_column( op: Operator, right: &Expr, schema: &DFSchema, -) -> Result> { +) -> Result> { if op != Operator::StringConcat { return Ok(None); } @@ -275,33 +239,18 @@ fn rewrite_array_concat_operator_to_func_for_column( // 1) array_prepend/append/concat || column ( Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayPrepend), - args: _left_args, - }), - Expr::Column(c), - ) - | ( - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayAppend), - args: _left_args, - }), - Expr::Column(c), - ) - | ( - Expr::ScalarFunction(ScalarFunction { - func_def: - ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::ArrayConcat), + func_def: ScalarFunctionDefinition::UDF(left_fun), args: _left_args, }), Expr::Column(c), - ) => { + ) if ["array_append", "array_prepend", "array_concat"] + .contains(&left_fun.name()) => + { let d = schema.field_from_column(c)?.data_type(); let ndim = list_ndims(d); match ndim { - 0 => Ok(Some(BuiltinScalarFunction::ArrayAppend)), - _ => Ok(Some(BuiltinScalarFunction::ArrayConcat)), + 0 => Ok(Some(array_append(left.clone(), right.clone()))), + _ => Ok(Some(array_concat(vec![left.clone(), right.clone()]))), } } // 2) select column1 || column2 @@ -311,9 +260,9 @@ fn rewrite_array_concat_operator_to_func_for_column( let ndim1 = list_ndims(d1); let ndim2 = list_ndims(d2); match (ndim1, ndim2) { - (0, _) => Ok(Some(BuiltinScalarFunction::ArrayPrepend)), - (_, 0) => Ok(Some(BuiltinScalarFunction::ArrayAppend)), - _ => Ok(Some(BuiltinScalarFunction::ArrayConcat)), + (0, _) => Ok(Some(array_prepend(left.clone(), right.clone()))), + (_, 0) => Ok(Some(array_append(left.clone(), right.clone()))), + _ => Ok(Some(array_concat(vec![left.clone(), right.clone()]))), } } _ => Ok(None), diff --git a/datafusion/functions-array/src/optimizer/mod.rs b/datafusion/functions-array/src/optimizer/mod.rs new file mode 100644 index 000000000000..72d6f76eec7a --- /dev/null +++ b/datafusion/functions-array/src/optimizer/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod analyzer; diff --git a/datafusion/functions-array/src/udf.rs b/datafusion/functions-array/src/udf.rs index 17769419c0b2..97463a2639d1 100644 --- a/datafusion/functions-array/src/udf.rs +++ b/datafusion/functions-array/src/udf.rs @@ -18,21 +18,77 @@ //! [`ScalarUDFImpl`] definitions for array functions. use arrow::datatypes::DataType; -use arrow::datatypes::Field; +use arrow_schema::Field; +use datafusion_common::utils::list_ndims; +// use arrow::datatypes::Field; use datafusion_common::plan_err; use datafusion_expr::expr::ScalarFunction; +use datafusion_expr::type_coercion::binary::get_wider_type; use datafusion_expr::Expr; use datafusion_expr::TypeSignature::Exact; +use datafusion_expr::TypeSignature::{Any as expr_Any, VariadicEqual}; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; use std::any::Any; +use std::cmp::Ordering; use std::sync::Arc; + +use crate::kernels::make_scalar_function_with_hints; + // Create static instances of ScalarUDFs for each function -make_udf_function!(ArrayToString, +make_udf_function!( + ArrayToString, array_to_string, - array delimiter, // arg name + array delimiter, // arg name "converts each element to its text representation.", // doc - array_to_string_udf // internal function name + array_to_string_udf // internal function name ); + +make_udf_function!( + ArrayAppend, + array_append, + array element, // arg name + "appends an element to the end of an array.", // doc + array_append_udf // internal function name +); + +make_udf_function!( + ArrayPrepend, + array_prepend, + element array, // arg name + "Prepends an element to the beginning of an array.", // doc + array_prepend_udf // internal function name +); + +make_udf_function!( + ArrayConcat, + array_concat, + "Concatenates arrays.", // doc + array_concat_udf // internal function name +); + +make_udf_function!( + MakeArray, + make_array, + "Returns an Arrow array using the specified input expressions.", // doc + make_array_udf // internal function name +); + +make_udf_function!( + Range, + range, + start stop step, + "create a list of values in the range between start and stop", + range_udf +); + +make_udf_function!( + GenSeries, + gen_series, + start stop step, + "create a list of values in the range between start and stop, include upper bound", + gen_series_udf +); + #[derive(Debug)] pub(super) struct ArrayToString { signature: Signature, @@ -76,8 +132,233 @@ impl ScalarUDFImpl for ArrayToString { } fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result { - let args = ColumnarValue::values_to_arrays(args)?; - crate::kernels::array_to_string(&args).map(ColumnarValue::Array) + make_scalar_function_with_hints(crate::kernels::array_to_string)(args) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + +#[derive(Debug)] +pub(super) struct ArrayAppend { + signature: Signature, + aliases: Vec, +} + +impl ArrayAppend { + pub fn new() -> Self { + Self { + signature: Signature::array_and_element(Volatility::Immutable), + aliases: vec![ + String::from("array_append"), + String::from("list_append"), + String::from("array_push_back"), + String::from("list_push_back"), + ], + } + } +} + +impl ScalarUDFImpl for ArrayAppend { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "array_append" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { + Ok(arg_types[0].clone()) + } + + fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result { + make_scalar_function_with_hints(crate::kernels::array_append)(args) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + +#[derive(Debug)] +pub(super) struct ArrayPrepend { + signature: Signature, + aliases: Vec, +} + +impl ArrayPrepend { + pub fn new() -> Self { + Self { + signature: Signature::element_and_array(Volatility::Immutable), + aliases: vec![ + String::from("array_prepend"), + String::from("list_prepend"), + String::from("array_push_front"), + String::from("list_push_front"), + ], + } + } +} + +impl ScalarUDFImpl for ArrayPrepend { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "array_prepend" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { + Ok(arg_types[1].clone()) + } + + fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result { + make_scalar_function_with_hints(crate::kernels::array_prepend)(args) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + +#[derive(Debug)] +pub(super) struct ArrayConcat { + signature: Signature, + aliases: Vec, +} + +impl ArrayConcat { + pub fn new() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + aliases: vec![ + String::from("array_concat"), + String::from("array_cat"), + String::from("list_concat"), + String::from("list_cat"), + ], + } + } +} + +impl ScalarUDFImpl for ArrayConcat { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "array_concat" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { + let mut expr_type = DataType::Null; + let mut max_dims = 0; + for arg_type in arg_types { + match arg_type { + DataType::List(field) => { + if !field.data_type().equals_datatype(&DataType::Null) { + let dims = list_ndims(arg_type); + expr_type = match max_dims.cmp(&dims) { + Ordering::Greater => expr_type, + Ordering::Equal => get_wider_type(&expr_type, arg_type)?, + Ordering::Less => { + max_dims = dims; + arg_type.clone() + } + }; + } + } + _ => { + return plan_err!( + "The array_concat function can only accept list as the args." + ) + } + } + } + + Ok(expr_type) + } + + fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result { + make_scalar_function_with_hints(crate::kernels::array_concat)(args) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + +#[derive(Debug)] +pub struct MakeArray { + signature: Signature, + aliases: Vec, +} + +impl MakeArray { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![VariadicEqual, expr_Any(0)], + Volatility::Immutable, + ), + aliases: vec![String::from("make_array"), String::from("make_list")], + } + } +} + +impl ScalarUDFImpl for MakeArray { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "make_array" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result { + match arg_types.len() { + 0 => Ok(DataType::List(Arc::new(Field::new( + "item", + DataType::Null, + true, + )))), + _ => { + let mut expr_type = DataType::Null; + for arg_type in arg_types { + if !arg_type.equals_datatype(&DataType::Null) { + expr_type = arg_type.clone(); + break; + } + } + + Ok(DataType::List(Arc::new(Field::new( + "item", expr_type, true, + )))) + } + } + } + + fn invoke(&self, args: &[ColumnarValue]) -> datafusion_common::Result { + make_scalar_function_with_hints(crate::kernels::make_array)(args) } fn aliases(&self) -> &[String] { @@ -85,13 +366,6 @@ impl ScalarUDFImpl for ArrayToString { } } -make_udf_function!( - Range, - range, - start stop step, - "create a list of values in the range between start and stop", - range_udf -); #[derive(Debug)] pub(super) struct Range { signature: Signature, @@ -144,13 +418,6 @@ impl ScalarUDFImpl for Range { } } -make_udf_function!( - GenSeries, - gen_series, - start stop step, - "create a list of values in the range between start and stop, include upper bound", - gen_series_udf -); #[derive(Debug)] pub(super) struct GenSeries { signature: Signature, diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index 9d47299a5616..14d5ddf47378 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -17,7 +17,6 @@ pub mod count_wildcard_rule; pub mod inline_table_scan; -pub mod rewrite_expr; pub mod subquery; pub mod type_coercion; @@ -38,8 +37,6 @@ use log::debug; use std::sync::Arc; use std::time::Instant; -use self::rewrite_expr::OperatorToFunction; - /// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make /// the plan valid prior to the rest of the DataFusion optimization process. /// @@ -75,9 +72,6 @@ impl Analyzer { pub fn new() -> Self { let rules: Vec> = vec![ Arc::new(InlineTableScan::new()), - // OperatorToFunction should be run before TypeCoercion, since it rewrite based on the argument types (List or Scalar), - // and TypeCoercion may cast the argument types from Scalar to List. - Arc::new(OperatorToFunction::new()), Arc::new(TypeCoercion::new()), Arc::new(CountWildcardRule::new()), ]; diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 3a43e3cd7c20..c4b01f70cfe5 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -45,8 +45,8 @@ use datafusion_expr::type_coercion::{is_datetime, is_utf8_or_large_utf8}; use datafusion_expr::utils::merge_schema; use datafusion_expr::{ is_false, is_not_false, is_not_true, is_not_unknown, is_true, is_unknown, not, - type_coercion, AggregateFunction, BuiltinScalarFunction, Expr, ExprSchemable, - LogicalPlan, Operator, Projection, ScalarFunctionDefinition, Signature, WindowFrame, + type_coercion, AggregateFunction, Expr, ExprSchemable, LogicalPlan, Operator, + Projection, ScalarFunctionDefinition, ScalarUDF, Signature, WindowFrame, WindowFrameBound, WindowFrameUnits, }; @@ -333,11 +333,6 @@ impl TreeNodeRewriter for TypeCoercionRewriter { &self.schema, &fun.signature(), )?; - let new_args = coerce_arguments_for_fun( - new_args.as_slice(), - &self.schema, - &fun, - )?; Ok(Expr::ScalarFunction(ScalarFunction::new(fun, new_args))) } ScalarFunctionDefinition::UDF(fun) => { @@ -346,6 +341,11 @@ impl TreeNodeRewriter for TypeCoercionRewriter { &self.schema, fun.signature(), )?; + let new_expr = coerce_arguments_for_fun( + new_expr.as_slice(), + &self.schema, + &fun, + )?; Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, new_expr))) } ScalarFunctionDefinition::Name(_) => { @@ -589,7 +589,7 @@ fn coerce_arguments_for_signature( fn coerce_arguments_for_fun( expressions: &[Expr], schema: &DFSchema, - fun: &BuiltinScalarFunction, + fun: &Arc, ) -> Result> { if expressions.is_empty() { return Ok(vec![]); @@ -597,7 +597,7 @@ fn coerce_arguments_for_fun( let mut expressions: Vec = expressions.to_vec(); // Cast Fixedsizelist to List for array functions - if *fun == BuiltinScalarFunction::MakeArray { + if fun.name() == "make_array" { expressions = expressions .into_iter() .map(|expr| { @@ -764,10 +764,8 @@ mod test { use std::any::Any; use std::sync::{Arc, OnceLock}; - use arrow::array::{FixedSizeListArray, Int32Array}; use arrow::datatypes::{DataType, TimeUnit}; - use arrow::datatypes::Field; use datafusion_common::tree_node::TreeNode; use datafusion_common::{DFField, DFSchema, DFSchemaRef, Result, ScalarValue}; use datafusion_expr::expr::{self, InSubquery, Like, ScalarFunction}; @@ -785,7 +783,7 @@ mod test { use datafusion_physical_expr::expressions::AvgAccumulator; use crate::analyzer::type_coercion::{ - cast_expr, coerce_case_expression, TypeCoercion, TypeCoercionRewriter, + coerce_case_expression, TypeCoercion, TypeCoercionRewriter, }; use crate::test::assert_analyzed_plan_eq; @@ -1265,57 +1263,6 @@ mod test { Ok(()) } - #[test] - fn test_casting_for_fixed_size_list() -> Result<()> { - let val = lit(ScalarValue::FixedSizeList(Arc::new( - FixedSizeListArray::new( - Arc::new(Field::new("item", DataType::Int32, true)), - 3, - Arc::new(Int32Array::from(vec![1, 2, 3])), - None, - ), - ))); - let expr = Expr::ScalarFunction(ScalarFunction::new( - BuiltinScalarFunction::MakeArray, - vec![val.clone()], - )); - let schema = Arc::new(DFSchema::new_with_metadata( - vec![DFField::new_unqualified( - "item", - DataType::FixedSizeList( - Arc::new(Field::new("a", DataType::Int32, true)), - 3, - ), - true, - )], - std::collections::HashMap::new(), - )?); - let mut rewriter = TypeCoercionRewriter { schema }; - let result = expr.rewrite(&mut rewriter)?; - - let schema = Arc::new(DFSchema::new_with_metadata( - vec![DFField::new_unqualified( - "item", - DataType::List(Arc::new(Field::new("a", DataType::Int32, true))), - true, - )], - std::collections::HashMap::new(), - )?); - let expected_casted_expr = cast_expr( - &val, - &DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), - &schema, - )?; - - let expected = Expr::ScalarFunction(ScalarFunction::new( - BuiltinScalarFunction::MakeArray, - vec![expected_casted_expr], - )); - - assert_eq!(result, expected); - Ok(()) - } - #[test] fn test_type_coercion_rewrite() -> Result<()> { // gt diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 01b2ae13c8d4..c18b42b63cfc 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -34,10 +34,10 @@ use datafusion_common::cast::{ as_generic_list_array, as_generic_string_array, as_int64_array, as_large_list_array, as_list_array, as_null_array, as_string_array, }; -use datafusion_common::utils::{array_into_list_array, list_ndims}; +use datafusion_common::utils::array_into_list_array; use datafusion_common::{ - exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, - DataFusionError, Result, ScalarValue, + exec_err, internal_datafusion_err, internal_err, plan_err, DataFusionError, Result, + ScalarValue, }; use itertools::Itertools; @@ -820,72 +820,6 @@ pub fn array_pop_back(args: &[ArrayRef]) -> Result { } } -/// Appends or prepends elements to a ListArray. -/// -/// This function takes a ListArray, an ArrayRef, a FieldRef, and a boolean flag -/// indicating whether to append or prepend the elements. It returns a `Result` -/// representing the resulting ListArray after the operation. -/// -/// # Arguments -/// -/// * `list_array` - A reference to the ListArray to which elements will be appended/prepended. -/// * `element_array` - A reference to the Array containing elements to be appended/prepended. -/// * `field` - A reference to the Field describing the data type of the arrays. -/// * `is_append` - A boolean flag indicating whether to append (`true`) or prepend (`false`) elements. -/// -/// # Examples -/// -/// generic_append_and_prepend( -/// [1, 2, 3], 4, append => [1, 2, 3, 4] -/// 5, [6, 7, 8], prepend => [5, 6, 7, 8] -/// ) -fn generic_append_and_prepend( - list_array: &GenericListArray, - element_array: &ArrayRef, - data_type: &DataType, - is_append: bool, -) -> Result -where - i64: TryInto, -{ - let mut offsets = vec![O::usize_as(0)]; - let values = list_array.values(); - let original_data = values.to_data(); - let element_data = element_array.to_data(); - let capacity = Capacities::Array(original_data.len() + element_data.len()); - - let mut mutable = MutableArrayData::with_capacities( - vec![&original_data, &element_data], - false, - capacity, - ); - - let values_index = 0; - let element_index = 1; - - for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() { - let start = offset_window[0].to_usize().unwrap(); - let end = offset_window[1].to_usize().unwrap(); - if is_append { - mutable.extend(values_index, start, end); - mutable.extend(element_index, row_index, row_index + 1); - } else { - mutable.extend(element_index, row_index, row_index + 1); - mutable.extend(values_index, start, end); - } - offsets.push(offsets[row_index] + O::usize_as(end - start + 1)); - } - - let data = mutable.freeze(); - - Ok(Arc::new(GenericListArray::::try_new( - Arc::new(Field::new("item", data_type.to_owned(), true)), - OffsetBuffer::new(offsets.into()), - arrow_array::make_array(data), - None, - )?)) -} - /// Array_sort SQL function pub fn array_sort(args: &[ArrayRef]) -> Result { if args.is_empty() || args.len() > 3 { @@ -969,189 +903,6 @@ fn order_nulls_first(modifier: &str) -> Result { } } -fn general_append_and_prepend( - args: &[ArrayRef], - is_append: bool, -) -> Result -where - i64: TryInto, -{ - let (list_array, element_array) = if is_append { - let list_array = as_generic_list_array::(&args[0])?; - let element_array = &args[1]; - check_datatypes("array_append", &[element_array, list_array.values()])?; - (list_array, element_array) - } else { - let list_array = as_generic_list_array::(&args[1])?; - let element_array = &args[0]; - check_datatypes("array_prepend", &[list_array.values(), element_array])?; - (list_array, element_array) - }; - - let res = match list_array.value_type() { - DataType::List(_) => concat_internal::(args)?, - DataType::LargeList(_) => concat_internal::(args)?, - data_type => { - return generic_append_and_prepend::( - list_array, - element_array, - &data_type, - is_append, - ); - } - }; - - Ok(res) -} - -/// Array_append SQL function -pub fn array_append(args: &[ArrayRef]) -> Result { - if args.len() != 2 { - return exec_err!("array_append expects two arguments"); - } - - match args[0].data_type() { - DataType::LargeList(_) => general_append_and_prepend::(args, true), - _ => general_append_and_prepend::(args, true), - } -} - -/// Array_prepend SQL function -pub fn array_prepend(args: &[ArrayRef]) -> Result { - if args.len() != 2 { - return exec_err!("array_prepend expects two arguments"); - } - - match args[1].data_type() { - DataType::LargeList(_) => general_append_and_prepend::(args, false), - _ => general_append_and_prepend::(args, false), - } -} - -fn align_array_dimensions( - args: Vec, -) -> Result> { - let args_ndim = args - .iter() - .map(|arg| datafusion_common::utils::list_ndims(arg.data_type())) - .collect::>(); - let max_ndim = args_ndim.iter().max().unwrap_or(&0); - - // Align the dimensions of the arrays - let aligned_args: Result> = args - .into_iter() - .zip(args_ndim.iter()) - .map(|(array, ndim)| { - if ndim < max_ndim { - let mut aligned_array = array.clone(); - for _ in 0..(max_ndim - ndim) { - let data_type = aligned_array.data_type().to_owned(); - let array_lengths = vec![1; aligned_array.len()]; - let offsets = OffsetBuffer::::from_lengths(array_lengths); - - aligned_array = Arc::new(GenericListArray::::try_new( - Arc::new(Field::new("item", data_type, true)), - offsets, - aligned_array, - None, - )?) - } - Ok(aligned_array) - } else { - Ok(array.clone()) - } - }) - .collect(); - - aligned_args -} - -// Concatenate arrays on the same row. -fn concat_internal(args: &[ArrayRef]) -> Result { - let args = align_array_dimensions::(args.to_vec())?; - - let list_arrays = args - .iter() - .map(|arg| as_generic_list_array::(arg)) - .collect::>>()?; - // Assume number of rows is the same for all arrays - let row_count = list_arrays[0].len(); - - let mut array_lengths = vec![]; - let mut arrays = vec![]; - let mut valid = BooleanBufferBuilder::new(row_count); - for i in 0..row_count { - let nulls = list_arrays - .iter() - .map(|arr| arr.is_null(i)) - .collect::>(); - - // If all the arrays are null, the concatenated array is null - let is_null = nulls.iter().all(|&x| x); - if is_null { - array_lengths.push(0); - valid.append(false); - } else { - // Get all the arrays on i-th row - let values = list_arrays - .iter() - .map(|arr| arr.value(i)) - .collect::>(); - - let elements = values - .iter() - .map(|a| a.as_ref()) - .collect::>(); - - // Concatenated array on i-th row - let concated_array = compute::concat(elements.as_slice())?; - array_lengths.push(concated_array.len()); - arrays.push(concated_array); - valid.append(true); - } - } - // Assume all arrays have the same data type - let data_type = list_arrays[0].value_type(); - let buffer = valid.finish(); - - let elements = arrays - .iter() - .map(|a| a.as_ref()) - .collect::>(); - - let list_arr = GenericListArray::::new( - Arc::new(Field::new("item", data_type, true)), - OffsetBuffer::from_lengths(array_lengths), - Arc::new(compute::concat(elements.as_slice())?), - Some(NullBuffer::new(buffer)), - ); - - Ok(Arc::new(list_arr)) -} - -/// Array_concat/Array_cat SQL function -pub fn array_concat(args: &[ArrayRef]) -> Result { - if args.is_empty() { - return exec_err!("array_concat expects at least one arguments"); - } - - let mut new_args = vec![]; - for arg in args { - let ndim = list_ndims(arg.data_type()); - let base_type = datafusion_common::utils::base_type(arg.data_type()); - if ndim == 0 { - return not_impl_err!("Array is not type '{base_type:?}'."); - } else if !base_type.eq(&DataType::Null) { - new_args.push(arg.clone()); - } - } - - match &args[0].data_type() { - DataType::LargeList(_) => concat_internal::(new_args.as_slice()), - _ => concat_internal::(new_args.as_slice()), - } -} - /// Array_empty SQL function pub fn array_empty(args: &[ArrayRef]) -> Result { if args.len() != 1 { @@ -2580,54 +2331,3 @@ where Some(nulls.into()), )?)) } - -#[cfg(test)] -mod tests { - use super::*; - use arrow::datatypes::Int64Type; - - /// Only test internal functions, array-related sql functions will be tested in sqllogictest `array.slt` - #[test] - fn test_align_array_dimensions() { - let array1d_1 = - Arc::new(ListArray::from_iter_primitive::(vec![ - Some(vec![Some(1), Some(2), Some(3)]), - Some(vec![Some(4), Some(5)]), - ])); - let array1d_2 = - Arc::new(ListArray::from_iter_primitive::(vec![ - Some(vec![Some(6), Some(7), Some(8)]), - ])); - - let array2d_1 = Arc::new(array_into_list_array(array1d_1.clone())) as ArrayRef; - let array2d_2 = Arc::new(array_into_list_array(array1d_2.clone())) as ArrayRef; - - let res = align_array_dimensions::(vec![ - array1d_1.to_owned(), - array2d_2.to_owned(), - ]) - .unwrap(); - - let expected = as_list_array(&array2d_1).unwrap(); - let expected_dim = datafusion_common::utils::list_ndims(array2d_1.data_type()); - assert_ne!(as_list_array(&res[0]).unwrap(), expected); - assert_eq!( - datafusion_common::utils::list_ndims(res[0].data_type()), - expected_dim - ); - - let array3d_1 = Arc::new(array_into_list_array(array2d_1)) as ArrayRef; - let array3d_2 = array_into_list_array(array2d_2.to_owned()); - let res = - align_array_dimensions::(vec![array1d_1, Arc::new(array3d_2.clone())]) - .unwrap(); - - let expected = as_list_array(&array3d_1).unwrap(); - let expected_dim = datafusion_common::utils::list_ndims(array3d_1.data_type()); - assert_ne!(as_list_array(&res[0]).unwrap(), expected); - assert_eq!( - datafusion_common::utils::list_ndims(res[0].data_type()), - expected_dim - ); - } -} diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 2552381a79b0..0fdbe2c34877 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -318,15 +318,9 @@ pub fn create_physical_fun( } // array functions - BuiltinScalarFunction::ArrayAppend => Arc::new(|args| { - make_scalar_function_inner(array_expressions::array_append)(args) - }), BuiltinScalarFunction::ArraySort => Arc::new(|args| { make_scalar_function_inner(array_expressions::array_sort)(args) }), - BuiltinScalarFunction::ArrayConcat => Arc::new(|args| { - make_scalar_function_inner(array_expressions::array_concat)(args) - }), BuiltinScalarFunction::ArrayEmpty => Arc::new(|args| { make_scalar_function_inner(array_expressions::array_empty)(args) }), @@ -372,9 +366,6 @@ pub fn create_physical_fun( BuiltinScalarFunction::ArrayPositions => Arc::new(|args| { make_scalar_function_inner(array_expressions::array_positions)(args) }), - BuiltinScalarFunction::ArrayPrepend => Arc::new(|args| { - make_scalar_function_inner(array_expressions::array_prepend)(args) - }), BuiltinScalarFunction::ArrayRepeat => Arc::new(|args| { make_scalar_function_inner(array_expressions::array_repeat)(args) }), @@ -411,9 +402,6 @@ pub fn create_physical_fun( BuiltinScalarFunction::ArrayResize => Arc::new(|args| { make_scalar_function_inner(array_expressions::array_resize)(args) }), - BuiltinScalarFunction::MakeArray => Arc::new(|args| { - make_scalar_function_inner(array_expressions::make_array)(args) - }), BuiltinScalarFunction::ArrayUnion => Arc::new(|args| { make_scalar_function_inner(array_expressions::array_union)(args) }), diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index bfe0fdb279f5..1c9f0e609c3c 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -153,14 +153,15 @@ impl PhysicalExpr for ScalarFunctionExpr { if scalar_fun .signature() .type_signature - .supports_zero_argument() - && scalar_fun != BuiltinScalarFunction::MakeArray => + .supports_zero_argument() => { vec![ColumnarValue::create_null_array(batch.num_rows())] } // If the function supports zero argument, we pass in a null array indicating the batch size. // This is for user-defined functions. - (true, Err(_)) if self.supports_zero_argument => { + (true, Err(_)) + if self.supports_zero_argument && self.name != "make_array" => + { vec![ColumnarValue::create_null_array(batch.num_rows())] } _ => self diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index a4a06bab854c..08db7016d0ec 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -567,7 +567,7 @@ enum ScalarFunction { Sqrt = 17; Tan = 18; Trunc = 19; - Array = 20; + // 20 was Array // RegexpMatch = 21; BitLength = 22; Btrim = 23; @@ -633,15 +633,15 @@ enum ScalarFunction { Factorial = 83; Lcm = 84; Gcd = 85; - ArrayAppend = 86; - ArrayConcat = 87; + // 86 was ArrayAppend + // 87 was ArrayConcat ArrayDims = 88; ArrayRepeat = 89; ArrayLength = 90; ArrayNdims = 91; ArrayPosition = 92; ArrayPositions = 93; - ArrayPrepend = 94; + // 94 was ArrayPrepend ArrayRemove = 95; ArrayReplace = 96; // 97 was ArrayToString diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 443597bebc20..5e823d846290 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -22339,7 +22339,6 @@ impl serde::Serialize for ScalarFunction { Self::Sqrt => "Sqrt", Self::Tan => "Tan", Self::Trunc => "Trunc", - Self::Array => "Array", Self::BitLength => "BitLength", Self::Btrim => "Btrim", Self::CharacterLength => "CharacterLength", @@ -22403,15 +22402,12 @@ impl serde::Serialize for ScalarFunction { Self::Factorial => "Factorial", Self::Lcm => "Lcm", Self::Gcd => "Gcd", - Self::ArrayAppend => "ArrayAppend", - Self::ArrayConcat => "ArrayConcat", Self::ArrayDims => "ArrayDims", Self::ArrayRepeat => "ArrayRepeat", Self::ArrayLength => "ArrayLength", Self::ArrayNdims => "ArrayNdims", Self::ArrayPosition => "ArrayPosition", Self::ArrayPositions => "ArrayPositions", - Self::ArrayPrepend => "ArrayPrepend", Self::ArrayRemove => "ArrayRemove", Self::ArrayReplace => "ArrayReplace", Self::Cardinality => "Cardinality", @@ -22478,7 +22474,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Sqrt", "Tan", "Trunc", - "Array", "BitLength", "Btrim", "CharacterLength", @@ -22542,15 +22537,12 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Factorial", "Lcm", "Gcd", - "ArrayAppend", - "ArrayConcat", "ArrayDims", "ArrayRepeat", "ArrayLength", "ArrayNdims", "ArrayPosition", "ArrayPositions", - "ArrayPrepend", "ArrayRemove", "ArrayReplace", "Cardinality", @@ -22646,7 +22638,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Sqrt" => Ok(ScalarFunction::Sqrt), "Tan" => Ok(ScalarFunction::Tan), "Trunc" => Ok(ScalarFunction::Trunc), - "Array" => Ok(ScalarFunction::Array), "BitLength" => Ok(ScalarFunction::BitLength), "Btrim" => Ok(ScalarFunction::Btrim), "CharacterLength" => Ok(ScalarFunction::CharacterLength), @@ -22710,15 +22701,12 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Factorial" => Ok(ScalarFunction::Factorial), "Lcm" => Ok(ScalarFunction::Lcm), "Gcd" => Ok(ScalarFunction::Gcd), - "ArrayAppend" => Ok(ScalarFunction::ArrayAppend), - "ArrayConcat" => Ok(ScalarFunction::ArrayConcat), "ArrayDims" => Ok(ScalarFunction::ArrayDims), "ArrayRepeat" => Ok(ScalarFunction::ArrayRepeat), "ArrayLength" => Ok(ScalarFunction::ArrayLength), "ArrayNdims" => Ok(ScalarFunction::ArrayNdims), "ArrayPosition" => Ok(ScalarFunction::ArrayPosition), "ArrayPositions" => Ok(ScalarFunction::ArrayPositions), - "ArrayPrepend" => Ok(ScalarFunction::ArrayPrepend), "ArrayRemove" => Ok(ScalarFunction::ArrayRemove), "ArrayReplace" => Ok(ScalarFunction::ArrayReplace), "Cardinality" => Ok(ScalarFunction::Cardinality), diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index c0d234443c94..f2d39cea0aa2 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2655,7 +2655,7 @@ pub enum ScalarFunction { Sqrt = 17, Tan = 18, Trunc = 19, - Array = 20, + /// 20 was Array /// RegexpMatch = 21; BitLength = 22, Btrim = 23, @@ -2721,15 +2721,15 @@ pub enum ScalarFunction { Factorial = 83, Lcm = 84, Gcd = 85, - ArrayAppend = 86, - ArrayConcat = 87, + /// 86 was ArrayAppend + /// 87 was ArrayConcat ArrayDims = 88, ArrayRepeat = 89, ArrayLength = 90, ArrayNdims = 91, ArrayPosition = 92, ArrayPositions = 93, - ArrayPrepend = 94, + /// 94 was ArrayPrepend ArrayRemove = 95, ArrayReplace = 96, /// 97 was ArrayToString @@ -2796,7 +2796,6 @@ impl ScalarFunction { ScalarFunction::Sqrt => "Sqrt", ScalarFunction::Tan => "Tan", ScalarFunction::Trunc => "Trunc", - ScalarFunction::Array => "Array", ScalarFunction::BitLength => "BitLength", ScalarFunction::Btrim => "Btrim", ScalarFunction::CharacterLength => "CharacterLength", @@ -2860,15 +2859,12 @@ impl ScalarFunction { ScalarFunction::Factorial => "Factorial", ScalarFunction::Lcm => "Lcm", ScalarFunction::Gcd => "Gcd", - ScalarFunction::ArrayAppend => "ArrayAppend", - ScalarFunction::ArrayConcat => "ArrayConcat", ScalarFunction::ArrayDims => "ArrayDims", ScalarFunction::ArrayRepeat => "ArrayRepeat", ScalarFunction::ArrayLength => "ArrayLength", ScalarFunction::ArrayNdims => "ArrayNdims", ScalarFunction::ArrayPosition => "ArrayPosition", ScalarFunction::ArrayPositions => "ArrayPositions", - ScalarFunction::ArrayPrepend => "ArrayPrepend", ScalarFunction::ArrayRemove => "ArrayRemove", ScalarFunction::ArrayReplace => "ArrayReplace", ScalarFunction::Cardinality => "Cardinality", @@ -2929,7 +2925,6 @@ impl ScalarFunction { "Sqrt" => Some(Self::Sqrt), "Tan" => Some(Self::Tan), "Trunc" => Some(Self::Trunc), - "Array" => Some(Self::Array), "BitLength" => Some(Self::BitLength), "Btrim" => Some(Self::Btrim), "CharacterLength" => Some(Self::CharacterLength), @@ -2993,15 +2988,12 @@ impl ScalarFunction { "Factorial" => Some(Self::Factorial), "Lcm" => Some(Self::Lcm), "Gcd" => Some(Self::Gcd), - "ArrayAppend" => Some(Self::ArrayAppend), - "ArrayConcat" => Some(Self::ArrayConcat), "ArrayDims" => Some(Self::ArrayDims), "ArrayRepeat" => Some(Self::ArrayRepeat), "ArrayLength" => Some(Self::ArrayLength), "ArrayNdims" => Some(Self::ArrayNdims), "ArrayPosition" => Some(Self::ArrayPosition), "ArrayPositions" => Some(Self::ArrayPositions), - "ArrayPrepend" => Some(Self::ArrayPrepend), "ArrayRemove" => Some(Self::ArrayRemove), "ArrayReplace" => Some(Self::ArrayReplace), "Cardinality" => Some(Self::Cardinality), diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index c89b3d1ed0f2..72086987308c 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -47,15 +47,15 @@ use datafusion_common::{ use datafusion_expr::expr::Unnest; use datafusion_expr::window_frame::{check_window_frame, regularize_window_order_by}; use datafusion_expr::{ - acosh, array, array_append, array_concat, array_dims, array_distinct, array_element, - array_empty, array_except, array_has, array_has_all, array_has_any, array_intersect, - array_length, array_ndims, array_pop_back, array_pop_front, array_position, - array_positions, array_prepend, array_remove, array_remove_all, array_remove_n, - array_repeat, array_replace, array_replace_all, array_replace_n, array_resize, - array_slice, array_sort, array_union, arrow_typeof, ascii, asinh, atan, atan2, atanh, - bit_length, btrim, cardinality, cbrt, ceil, character_length, chr, coalesce, - concat_expr, concat_ws_expr, cos, cosh, cot, current_date, current_time, date_bin, - date_part, date_trunc, degrees, digest, ends_with, exp, + acosh, array_dims, array_distinct, array_element, array_empty, array_except, + array_has, array_has_all, array_has_any, array_intersect, array_length, array_ndims, + array_pop_back, array_pop_front, array_position, array_positions, array_remove, + array_remove_all, array_remove_n, array_repeat, array_replace, array_replace_all, + array_replace_n, array_resize, array_slice, array_sort, array_union, arrow_typeof, + ascii, asinh, atan, atan2, atanh, bit_length, btrim, cardinality, cbrt, ceil, + character_length, chr, coalesce, concat_expr, concat_ws_expr, cos, cosh, cot, + current_date, current_time, date_bin, date_part, date_trunc, degrees, digest, + ends_with, exp, expr::{self, InList, Sort, WindowFunction}, factorial, find_in_set, flatten, floor, from_unixtime, gcd, initcap, instr, iszero, lcm, left, levenshtein, ln, log, log10, log2, @@ -476,9 +476,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::Trim => Self::Trim, ScalarFunction::Ltrim => Self::Ltrim, ScalarFunction::Rtrim => Self::Rtrim, - ScalarFunction::ArrayAppend => Self::ArrayAppend, ScalarFunction::ArraySort => Self::ArraySort, - ScalarFunction::ArrayConcat => Self::ArrayConcat, ScalarFunction::ArrayEmpty => Self::ArrayEmpty, ScalarFunction::ArrayExcept => Self::ArrayExcept, ScalarFunction::ArrayHasAll => Self::ArrayHasAll, @@ -494,7 +492,6 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::ArrayPopBack => Self::ArrayPopBack, ScalarFunction::ArrayPosition => Self::ArrayPosition, ScalarFunction::ArrayPositions => Self::ArrayPositions, - ScalarFunction::ArrayPrepend => Self::ArrayPrepend, ScalarFunction::ArrayRepeat => Self::ArrayRepeat, ScalarFunction::ArrayRemove => Self::ArrayRemove, ScalarFunction::ArrayRemoveN => Self::ArrayRemoveN, @@ -508,7 +505,6 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::ArrayUnion => Self::ArrayUnion, ScalarFunction::ArrayResize => Self::ArrayResize, ScalarFunction::Cardinality => Self::Cardinality, - ScalarFunction::Array => Self::MakeArray, ScalarFunction::DatePart => Self::DatePart, ScalarFunction::DateTrunc => Self::DateTrunc, ScalarFunction::DateBin => Self::DateBin, @@ -1359,16 +1355,6 @@ pub fn parse_expr( ScalarFunction::Unknown => Err(proto_error("Unknown scalar function")), ScalarFunction::Asinh => Ok(asinh(parse_expr(&args[0], registry)?)), ScalarFunction::Acosh => Ok(acosh(parse_expr(&args[0], registry)?)), - ScalarFunction::Array => Ok(array( - args.to_owned() - .iter() - .map(|expr| parse_expr(expr, registry)) - .collect::, _>>()?, - )), - ScalarFunction::ArrayAppend => Ok(array_append( - parse_expr(&args[0], registry)?, - parse_expr(&args[1], registry)?, - )), ScalarFunction::ArraySort => Ok(array_sort( parse_expr(&args[0], registry)?, parse_expr(&args[1], registry)?, @@ -1380,16 +1366,6 @@ pub fn parse_expr( ScalarFunction::ArrayPopBack => { Ok(array_pop_back(parse_expr(&args[0], registry)?)) } - ScalarFunction::ArrayPrepend => Ok(array_prepend( - parse_expr(&args[0], registry)?, - parse_expr(&args[1], registry)?, - )), - ScalarFunction::ArrayConcat => Ok(array_concat( - args.to_owned() - .iter() - .map(|expr| parse_expr(expr, registry)) - .collect::, _>>()?, - )), ScalarFunction::ArrayExcept => Ok(array_except( parse_expr(&args[0], registry)?, parse_expr(&args[1], registry)?, diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index b98be075f314..16a475cf38a2 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -1457,9 +1457,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::Rtrim => Self::Rtrim, BuiltinScalarFunction::ToChar => Self::ToChar, BuiltinScalarFunction::ToTimestamp => Self::ToTimestamp, - BuiltinScalarFunction::ArrayAppend => Self::ArrayAppend, BuiltinScalarFunction::ArraySort => Self::ArraySort, - BuiltinScalarFunction::ArrayConcat => Self::ArrayConcat, BuiltinScalarFunction::ArrayEmpty => Self::ArrayEmpty, BuiltinScalarFunction::ArrayExcept => Self::ArrayExcept, BuiltinScalarFunction::ArrayHasAll => Self::ArrayHasAll, @@ -1475,7 +1473,6 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::ArrayPopBack => Self::ArrayPopBack, BuiltinScalarFunction::ArrayPosition => Self::ArrayPosition, BuiltinScalarFunction::ArrayPositions => Self::ArrayPositions, - BuiltinScalarFunction::ArrayPrepend => Self::ArrayPrepend, BuiltinScalarFunction::ArrayRepeat => Self::ArrayRepeat, BuiltinScalarFunction::ArrayResize => Self::ArrayResize, BuiltinScalarFunction::ArrayRemove => Self::ArrayRemove, @@ -1489,7 +1486,6 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::ArrayIntersect => Self::ArrayIntersect, BuiltinScalarFunction::ArrayUnion => Self::ArrayUnion, BuiltinScalarFunction::Cardinality => Self::Cardinality, - BuiltinScalarFunction::MakeArray => Self::Array, BuiltinScalarFunction::DatePart => Self::DatePart, BuiltinScalarFunction::DateTrunc => Self::DateTrunc, BuiltinScalarFunction::DateBin => Self::DateBin, diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index e3bd2cb1dc47..4f4bd7e79b75 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -578,7 +578,14 @@ async fn roundtrip_expr_api() -> Result<()> { let expr_list = vec![ encode(col("a").cast_to(&DataType::Utf8, &schema)?, lit("hex")), decode(lit("1234"), lit("hex")), - array_to_string(array(vec![lit(1), lit(2), lit(3)]), lit(",")), + array_to_string(make_array(vec![lit(1), lit(2), lit(3)]), lit(",")), + array_append(make_array(vec![lit(1), lit(2), lit(3)]), lit(4)), + array_prepend(lit(1), make_array(vec![lit(2), lit(3), lit(4)])), + array_concat(vec![ + make_array(vec![lit(1), lit(2)]), + make_array(vec![lit(3), lit(4)]), + ]), + make_array(vec![lit(1), lit(2), lit(3)]), ]; // ensure expressions created with the expr api can be round tripped diff --git a/datafusion/sql/src/expr/value.rs b/datafusion/sql/src/expr/value.rs index c0870cc54106..f2b6a2e5af48 100644 --- a/datafusion/sql/src/expr/value.rs +++ b/datafusion/sql/src/expr/value.rs @@ -22,9 +22,7 @@ use arrow_schema::DataType; use datafusion_common::{ not_impl_err, plan_err, DFSchema, DataFusionError, Result, ScalarValue, }; -use datafusion_expr::expr::ScalarFunction; -use datafusion_expr::expr::{BinaryExpr, Placeholder}; -use datafusion_expr::BuiltinScalarFunction; +use datafusion_expr::expr::{BinaryExpr, Placeholder, ScalarFunction}; use datafusion_expr::{lit, Expr, Operator}; use log::debug; use sqlparser::ast::{BinaryOperator, Expr as SQLExpr, Interval, Value}; @@ -142,10 +140,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }) .collect::>>()?; - Ok(Expr::ScalarFunction(ScalarFunction::new( - BuiltinScalarFunction::MakeArray, - values, - ))) + if let Some(udf) = self.context_provider.get_function_meta("make_array") { + Ok(Expr::ScalarFunction(ScalarFunction::new_udf(udf, values))) + } else { + not_impl_err!( + "array_expression featrue is disable, So should implement make_array UDF by yourself" + ) + } } /// Convert a SQL interval expression to a DataFusion logical plan