From 430caa21c26bec5038ab6976d9dc63ee3ca636e7 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Sun, 19 Nov 2023 17:28:36 +0800 Subject: [PATCH 1/8] implement distinct func implement slt & proto fix null & empty list --- datafusion/expr/src/built_in_function.rs | 144 +++++++++--------- datafusion/expr/src/expr_fn.rs | 6 + .../physical-expr/src/array_expressions.rs | 50 ++++++ datafusion/physical-expr/src/functions.rs | 3 + datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/generated/pbjson.rs | 3 + datafusion/proto/src/generated/prost.rs | 3 + .../proto/src/logical_plan/from_proto.rs | 21 ++- datafusion/proto/src/logical_plan/to_proto.rs | 1 + datafusion/sqllogictest/test_files/array.slt | 63 ++++++++ docs/source/user-guide/expressions.md | 1 + 11 files changed, 214 insertions(+), 82 deletions(-) diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index d48e9e7a67fe..3e11e92bc538 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -144,6 +144,8 @@ pub enum BuiltinScalarFunction { ArrayPopBack, /// array_dims ArrayDims, + /// array_distinct + ArrayDistinct, /// array_element ArrayElement, /// array_empty @@ -404,6 +406,7 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayHasAny => Volatility::Immutable, BuiltinScalarFunction::ArrayHas => Volatility::Immutable, BuiltinScalarFunction::ArrayDims => Volatility::Immutable, + BuiltinScalarFunction::ArrayDistinct => Volatility::Immutable, BuiltinScalarFunction::ArrayElement => Volatility::Immutable, BuiltinScalarFunction::ArrayExcept => Volatility::Immutable, BuiltinScalarFunction::ArrayLength => Volatility::Immutable, @@ -582,6 +585,7 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::ArrayDims => { Ok(List(Arc::new(Field::new("item", UInt64, true)))) } + BuiltinScalarFunction::ArrayDistinct => Ok(input_expr_types[0].clone()), BuiltinScalarFunction::ArrayElement => match &input_expr_types[0] { List(field) => Ok(field.data_type().clone()), _ => plan_err!( @@ -926,6 +930,7 @@ impl BuiltinScalarFunction { Signature::variadic_any(self.volatility()) } BuiltinScalarFunction::ArrayNdims => Signature::any(1, self.volatility()), + BuiltinScalarFunction::ArrayDistinct => Signature::any(1, self.volatility()), BuiltinScalarFunction::ArrayPosition => { Signature::variadic_any(self.volatility()) } @@ -1551,80 +1556,71 @@ impl BuiltinScalarFunction { // other functions BuiltinScalarFunction::ArrowTypeof => &["arrow_typeof"], - // array functions - BuiltinScalarFunction::ArrayAppend => &[ - "array_append", - "list_append", - "array_push_back", - "list_push_back", - ], - BuiltinScalarFunction::ArrayConcat => { - &["array_concat", "array_cat", "list_concat", "list_cat"] - } - BuiltinScalarFunction::ArrayDims => &["array_dims", "list_dims"], - BuiltinScalarFunction::ArrayEmpty => &["empty"], - BuiltinScalarFunction::ArrayElement => &[ - "array_element", - "array_extract", - "list_element", - "list_extract", - ], - BuiltinScalarFunction::ArrayExcept => &["array_except", "list_except"], - BuiltinScalarFunction::Flatten => &["flatten"], - BuiltinScalarFunction::ArrayHasAll => &["array_has_all", "list_has_all"], - BuiltinScalarFunction::ArrayHasAny => &["array_has_any", "list_has_any"], - BuiltinScalarFunction::ArrayHas => { - &["array_has", "list_has", "array_contains", "list_contains"] - } - BuiltinScalarFunction::ArrayLength => &["array_length", "list_length"], - BuiltinScalarFunction::ArrayNdims => &["array_ndims", "list_ndims"], - BuiltinScalarFunction::ArrayPopFront => { - &["array_pop_front", "list_pop_front"] - } - BuiltinScalarFunction::ArrayPopBack => &["array_pop_back", "list_pop_back"], - BuiltinScalarFunction::ArrayPosition => &[ - "array_position", - "list_position", - "array_indexof", - "list_indexof", - ], - BuiltinScalarFunction::ArrayPositions => { - &["array_positions", "list_positions"] - } - BuiltinScalarFunction::ArrayPrepend => &[ - "array_prepend", - "list_prepend", - "array_push_front", - "list_push_front", - ], - BuiltinScalarFunction::ArrayRepeat => &["array_repeat", "list_repeat"], - BuiltinScalarFunction::ArrayRemove => &["array_remove", "list_remove"], - BuiltinScalarFunction::ArrayRemoveN => &["array_remove_n", "list_remove_n"], - BuiltinScalarFunction::ArrayRemoveAll => { - &["array_remove_all", "list_remove_all"] - } - BuiltinScalarFunction::ArrayReplace => &["array_replace", "list_replace"], - BuiltinScalarFunction::ArrayReplaceN => { - &["array_replace_n", "list_replace_n"] - } - BuiltinScalarFunction::ArrayReplaceAll => { - &["array_replace_all", "list_replace_all"] - } - BuiltinScalarFunction::ArraySlice => &["array_slice", "list_slice"], - BuiltinScalarFunction::ArrayToString => &[ - "array_to_string", - "list_to_string", - "array_join", - "list_join", - ], - BuiltinScalarFunction::ArrayUnion => &["array_union", "list_union"], - BuiltinScalarFunction::Cardinality => &["cardinality"], - BuiltinScalarFunction::MakeArray => &["make_array", "make_list"], - BuiltinScalarFunction::ArrayIntersect => { - &["array_intersect", "list_intersect"] - } - BuiltinScalarFunction::OverLay => &["overlay"], - BuiltinScalarFunction::Range => &["range", "generate_series"], + // array functions + BuiltinScalarFunction::ArrayAppend => &[ + "array_append", + "list_append", + "array_push_back", + "list_push_back", + ], + BuiltinScalarFunction::ArrayConcat => { + &["array_concat", "array_cat", "list_concat", "list_cat"] + } + BuiltinScalarFunction::ArrayDims => &["array_dims", "list_dims"], + BuiltinScalarFunction::ArrayDistinct => &["array_distinct", "list_distinct"], + BuiltinScalarFunction::ArrayEmpty => &["empty"], + BuiltinScalarFunction::ArrayElement => &[ + "array_element", + "array_extract", + "list_element", + "list_extract", + ], + BuiltinScalarFunction::ArrayExcept => &["array_except", "list_except"], + BuiltinScalarFunction::Flatten => &["flatten"], + BuiltinScalarFunction::ArrayHasAll => &["array_has_all", "list_has_all"], + BuiltinScalarFunction::ArrayHasAny => &["array_has_any", "list_has_any"], + BuiltinScalarFunction::ArrayHas => { + &["array_has", "list_has", "array_contains", "list_contains"] + } + BuiltinScalarFunction::ArrayLength => &["array_length", "list_length"], + BuiltinScalarFunction::ArrayNdims => &["array_ndims", "list_ndims"], + BuiltinScalarFunction::ArrayPopFront => &["array_pop_front", "list_pop_front"], + BuiltinScalarFunction::ArrayPopBack => &["array_pop_back", "list_pop_back"], + BuiltinScalarFunction::ArrayPosition => &[ + "array_position", + "list_position", + "array_indexof", + "list_indexof", + ], + BuiltinScalarFunction::ArrayPositions => &["array_positions", "list_positions"], + BuiltinScalarFunction::ArrayPrepend => &[ + "array_prepend", + "list_prepend", + "array_push_front", + "list_push_front", + ], + BuiltinScalarFunction::ArrayRepeat => &["array_repeat", "list_repeat"], + BuiltinScalarFunction::ArrayRemove => &["array_remove", "list_remove"], + BuiltinScalarFunction::ArrayRemoveN => &["array_remove_n", "list_remove_n"], + BuiltinScalarFunction::ArrayRemoveAll => &["array_remove_all", "list_remove_all"], + BuiltinScalarFunction::ArrayReplace => &["array_replace", "list_replace"], + BuiltinScalarFunction::ArrayReplaceN => &["array_replace_n", "list_replace_n"], + BuiltinScalarFunction::ArrayReplaceAll => { + &["array_replace_all", "list_replace_all"] + } + BuiltinScalarFunction::ArraySlice => &["array_slice", "list_slice"], + BuiltinScalarFunction::ArrayToString => &[ + "array_to_string", + "list_to_string", + "array_join", + "list_join", + ], + BuiltinScalarFunction::ArrayUnion => &["array_union", "list_union"], + BuiltinScalarFunction::Cardinality => &["cardinality"], + BuiltinScalarFunction::MakeArray => &["make_array", "make_list"], + BuiltinScalarFunction::ArrayIntersect => &["array_intersect", "list_intersect"], + BuiltinScalarFunction::OverLay => &["overlay"], + BuiltinScalarFunction::Range => &["range", "generate_series"], // struct functions BuiltinScalarFunction::Struct => &["struct"], diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 6148226f6b1a..9ed41b7affde 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -658,6 +658,12 @@ scalar_expr!( array, "returns the number of dimensions of the array." ); +scalar_expr!( + ArrayDistinct, + array_distinct, + array, + "return distinct values from the array after removing duplicates." +); scalar_expr!( ArrayPosition, array_position, diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 9489a51fa385..177ec7f59abf 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -1998,6 +1998,56 @@ pub fn array_intersect(args: &[ArrayRef]) -> Result { } } +/// array_distinct SQL function +/// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4] +pub fn array_distinct(args: &[ArrayRef]) -> Result { + assert_eq!(args.len(), 1); + + // handle null + if args[0].data_type() == &DataType::Null { + return Ok(args[0].clone()); + } + + let array = as_list_array(&args[0])?; + let dt = array.value_type(); + + let mut offsets = vec![0]; + let mut new_arrays = vec![]; + + let converter = RowConverter::new(vec![SortField::new(dt.clone())])?; + // distinct for each list in ListArray + for arr in array.iter().flatten() { + let values = converter.convert_columns(&[arr])?; + + let mut rows = Vec::with_capacity(values.num_rows()); + // sort elements in list and remove duplicates + for val in values.iter().sorted().dedup() { + rows.push(val); + } + + let last_offset: i32 = match offsets.last().copied() { + Some(offset) => offset, + None => return internal_err!("offsets should not be empty"), + }; + offsets.push(last_offset + rows.len() as i32); + let arrays = converter.convert_rows(rows)?; + let array = match arrays.get(0) { + Some(array) => array.clone(), + None => { + return internal_err!("array_distinct: failed to get array from rows") + } + }; + new_arrays.push(array); + } + + let field = Arc::new(Field::new("item", dt, true)); + let offsets = OffsetBuffer::new(offsets.into()); + let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::>(); + let values = compute::concat(&new_arrays_ref)?; + let arr = Arc::new(ListArray::try_new(field, offsets, values, None)?); + Ok(arr) +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 72c7f492166d..e2fd9a73bc8e 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -347,6 +347,9 @@ pub fn create_physical_fun( BuiltinScalarFunction::ArrayDims => { Arc::new(|args| make_scalar_function(array_expressions::array_dims)(args)) } + BuiltinScalarFunction::ArrayDistinct => { + Arc::new(|args| make_scalar_function(array_expressions::array_distinct)(args)) + } BuiltinScalarFunction::ArrayElement => { Arc::new(|args| make_scalar_function(array_expressions::array_element)(args)) } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 8c2fd5369e33..08b38aadcf0e 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -643,6 +643,7 @@ enum ScalarFunction { Levenshtein = 125; SubstrIndex = 126; FindInSet = 127; + ArrayDistinct = 128; } message ScalarFunctionNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index b8c5f6a4aae8..dec89e49b2ba 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -20865,6 +20865,7 @@ impl serde::Serialize for ScalarFunction { Self::Levenshtein => "Levenshtein", Self::SubstrIndex => "SubstrIndex", Self::FindInSet => "FindInSet", + Self::ArrayDistinct => "ArrayDistinct", }; serializer.serialize_str(variant) } @@ -21004,6 +21005,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Levenshtein", "SubstrIndex", "FindInSet", + "ArrayDistinct", ]; struct GeneratedVisitor; @@ -21172,6 +21174,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Levenshtein" => Ok(ScalarFunction::Levenshtein), "SubstrIndex" => Ok(ScalarFunction::SubstrIndex), "FindInSet" => Ok(ScalarFunction::FindInSet), + "ArrayDistinct" => Ok(ScalarFunction::ArrayDistinct), _ => Err(serde::de::Error::unknown_variant(value, FIELDS)), } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index c31bc4ab5948..75d08fd84e56 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2596,6 +2596,7 @@ pub enum ScalarFunction { Levenshtein = 125, SubstrIndex = 126, FindInSet = 127, + ArrayDistinct = 128, } impl ScalarFunction { /// String value of the enum field names used in the ProtoBuf definition. @@ -2732,6 +2733,7 @@ impl ScalarFunction { ScalarFunction::Levenshtein => "Levenshtein", ScalarFunction::SubstrIndex => "SubstrIndex", ScalarFunction::FindInSet => "FindInSet", + ScalarFunction::ArrayDistinct => "ArrayDistinct", } } /// Creates an enum from field names used in the ProtoBuf definition. @@ -2865,6 +2867,7 @@ impl ScalarFunction { "Levenshtein" => Some(Self::Levenshtein), "SubstrIndex" => Some(Self::SubstrIndex), "FindInSet" => Some(Self::FindInSet), + "ArrayDistinct" => Some(Self::ArrayDistinct), _ => None, } } diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index ae3628bddeb2..6c111bd1aec2 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -40,14 +40,15 @@ use datafusion_common::{ DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, Result, ScalarValue, }; use datafusion_expr::{ - abs, acos, acosh, array, array_append, array_concat, array_dims, array_element, - array_except, array_has, array_has_all, array_has_any, array_intersect, array_length, - array_ndims, array_position, array_positions, array_prepend, array_remove, - array_remove_all, array_remove_n, array_repeat, array_replace, array_replace_all, - array_replace_n, array_slice, array_to_string, arrow_typeof, ascii, asin, asinh, - atan, atan2, atanh, bit_length, btrim, cardinality, cbrt, ceil, character_length, - chr, coalesce, concat_expr, concat_ws_expr, cos, cosh, cot, current_date, - current_time, date_bin, date_part, date_trunc, decode, degrees, digest, encode, exp, + abs, acos, acosh, array, array_append, array_concat, array_dims, array_distinct, + array_element, array_except, array_has, array_has_all, array_has_any, + array_intersect, array_length, array_ndims, array_position, array_positions, + array_prepend, array_remove, array_remove_all, array_remove_n, array_repeat, + array_replace, array_replace_all, array_replace_n, array_slice, array_to_string, + arrow_typeof, ascii, asin, asinh, atan, atan2, atanh, bit_length, btrim, cardinality, + cbrt, ceil, character_length, chr, coalesce, concat_expr, concat_ws_expr, cos, cosh, + cot, current_date, current_time, date_bin, date_part, date_trunc, decode, degrees, + digest, encode, exp, expr::{self, InList, Sort, WindowFunction}, factorial, find_in_set, flatten, floor, from_unixtime, gcd, gen_range, isnan, iszero, lcm, left, levenshtein, ln, log, log10, log2, @@ -470,6 +471,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::ArrayHasAny => Self::ArrayHasAny, ScalarFunction::ArrayHas => Self::ArrayHas, ScalarFunction::ArrayDims => Self::ArrayDims, + ScalarFunction::ArrayDistinct => Self::ArrayDistinct, ScalarFunction::ArrayElement => Self::ArrayElement, ScalarFunction::Flatten => Self::Flatten, ScalarFunction::ArrayLength => Self::ArrayLength, @@ -1446,6 +1448,9 @@ pub fn parse_expr( ScalarFunction::ArrayDims => { Ok(array_dims(parse_expr(&args[0], registry)?)) } + ScalarFunction::ArrayDistinct => { + Ok(array_distinct(parse_expr(&args[0], registry)?)) + } ScalarFunction::ArrayElement => Ok(array_element( parse_expr(&args[0], registry)?, parse_expr(&args[1], registry)?, diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index ecbfaca5dbfe..a83b25090bd2 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -1509,6 +1509,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::ArrayHasAny => Self::ArrayHasAny, BuiltinScalarFunction::ArrayHas => Self::ArrayHas, BuiltinScalarFunction::ArrayDims => Self::ArrayDims, + BuiltinScalarFunction::ArrayDistinct => Self::ArrayDistinct, BuiltinScalarFunction::ArrayElement => Self::ArrayElement, BuiltinScalarFunction::Flatten => Self::Flatten, BuiltinScalarFunction::ArrayLength => Self::ArrayLength, diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 6ec2b2cb013b..2354bc8d16fc 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -182,6 +182,30 @@ AS VALUES (make_array([[1], [2]], [[2], [3]]), make_array([1], [2])) ; +statement ok +CREATE TABLE array_distinct_table_1D +AS VALUES + (make_array(1, 1, 2, 2, 3)), + (make_array(1, 2, 3, 4, 5)), + (make_array(3, 5, 3, 3, 3)) +; + +statement ok +CREATE TABLE array_distinct_table_1D_UTF8 +AS VALUES + (make_array('a', 'a', 'bc', 'bc', 'def')), + (make_array('a', 'bc', 'def', 'defg', 'defg')), + (make_array('defg', 'defg', 'defg', 'defg', 'defg')) +; + +statement ok +CREATE TABLE array_distinct_table_2D +AS VALUES + (make_array([1,2], [1,2], [3,4], [3,4], [5,6])), + (make_array([1,2], [3,4], [5,6], [7,8], [9,10])), + (make_array([5,6], [5,6], NULL)) +; + statement ok CREATE TABLE array_intersect_table_1D AS VALUES @@ -2715,6 +2739,45 @@ select array_has_all(make_array(1,2,3), make_array(1,3)), ---- true false true false false false true true false false true false true +query ? +select array_distinct(null); +---- +NULL + +query ? +select array_distinct([]); +---- +[] + +query ? +select array_distinct([[], []]); +---- +[[]] + +query ? +select array_distinct(column1) +from array_distinct_table_1D; +---- +[1, 2, 3] +[1, 2, 3, 4, 5] +[3, 5] + +query ? +select array_distinct(column1) +from array_distinct_table_1D_UTF8; +---- +[a, bc, def] +[a, bc, def, defg] +[defg] + +query ? +select array_distinct(column1) +from array_distinct_table_2D; +---- +[[1, 2], [3, 4], [5, 6]] +[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] +[, [5, 6]] + query ??? select array_intersect(column1, column2), array_intersect(column3, column4), diff --git a/docs/source/user-guide/expressions.md b/docs/source/user-guide/expressions.md index 257c50dfa497..b8689e556741 100644 --- a/docs/source/user-guide/expressions.md +++ b/docs/source/user-guide/expressions.md @@ -215,6 +215,7 @@ Unlike to some databases the math functions in Datafusion works the same way as | array_has_all(array, sub-array) | Returns true if all elements of sub-array exist in array `array_has_all([1,2,3], [1,3]) -> true` | | array_has_any(array, sub-array) | Returns true if any elements exist in both arrays `array_has_any([1,2,3], [1,4]) -> true` | | array_dims(array) | Returns an array of the array's dimensions. `array_dims([[1, 2, 3], [4, 5, 6]]) -> [2, 3]` | +| array_distinct(array) | Returns distinct values from the array after removing duplicates. `array_distinct([1, 3, 2, 3, 1, 2, 4]) -> [1, 2, 3, 4]` | | array_element(array, index) | Extracts the element with the index n from the array `array_element([1, 2, 3, 4], 3) -> 3` | | flatten(array) | Converts an array of arrays to a flat array `flatten([[1], [2, 3], [4, 5, 6]]) -> [1, 2, 3, 4, 5, 6]` | | array_length(array, dimension) | Returns the length of the array dimension. `array_length([1, 2, 3, 4, 5]) -> 5` | From 2ffc9f824218969e45e1013352e60b54694f762d Mon Sep 17 00:00:00 2001 From: yi wang <48236141+my-vegetable-has-exploded@users.noreply.github.com> Date: Mon, 27 Nov 2023 19:24:09 +0800 Subject: [PATCH 2/8] add comment for slt Co-authored-by: Alex Huang --- datafusion/sqllogictest/test_files/array.slt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 2354bc8d16fc..d2a1c91aed3d 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -2739,6 +2739,8 @@ select array_has_all(make_array(1,2,3), make_array(1,3)), ---- true false true false false false true true false false true false true +## array_distinct + query ? select array_distinct(null); ---- From c777184f72cbafd9c1efe342b30d35924ad54b3b Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Mon, 27 Nov 2023 20:41:50 +0800 Subject: [PATCH 3/8] fix largelist --- .../physical-expr/src/array_expressions.rs | 61 ++++++++++++------- 1 file changed, 38 insertions(+), 23 deletions(-) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 177ec7f59abf..565e5190bffc 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -31,7 +31,7 @@ use arrow_buffer::NullBuffer; use arrow_schema::FieldRef; use datafusion_common::cast::{ as_generic_list_array, as_generic_string_array, as_int64_array, as_list_array, - as_null_array, as_string_array, + as_null_array, as_string_array,as_large_list_array }; use datafusion_common::utils::{array_into_list_array, list_ndims}; use datafusion_common::{ @@ -1998,38 +1998,27 @@ pub fn array_intersect(args: &[ArrayRef]) -> Result { } } -/// array_distinct SQL function -/// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4] -pub fn array_distinct(args: &[ArrayRef]) -> Result { - assert_eq!(args.len(), 1); - - // handle null - if args[0].data_type() == &DataType::Null { - return Ok(args[0].clone()); - } - - let array = as_list_array(&args[0])?; +pub fn general_array_distinct( + array: &GenericListArray, + field: &FieldRef, +) -> Result { let dt = array.value_type(); - - let mut offsets = vec![0]; + let mut offsets = vec![OffsetSize::usize_as(0)]; let mut new_arrays = vec![]; - let converter = RowConverter::new(vec![SortField::new(dt.clone())])?; // distinct for each list in ListArray for arr in array.iter().flatten() { let values = converter.convert_columns(&[arr])?; - let mut rows = Vec::with_capacity(values.num_rows()); // sort elements in list and remove duplicates for val in values.iter().sorted().dedup() { rows.push(val); } - - let last_offset: i32 = match offsets.last().copied() { + let last_offset: OffsetSize = match offsets.last().copied() { Some(offset) => offset, None => return internal_err!("offsets should not be empty"), }; - offsets.push(last_offset + rows.len() as i32); + offsets.push(last_offset + OffsetSize::usize_as(rows.len())); let arrays = converter.convert_rows(rows)?; let array = match arrays.get(0) { Some(array) => array.clone(), @@ -2039,13 +2028,39 @@ pub fn array_distinct(args: &[ArrayRef]) -> Result { }; new_arrays.push(array); } - - let field = Arc::new(Field::new("item", dt, true)); let offsets = OffsetBuffer::new(offsets.into()); let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::>(); let values = compute::concat(&new_arrays_ref)?; - let arr = Arc::new(ListArray::try_new(field, offsets, values, None)?); - Ok(arr) + Ok(Arc::new(GenericListArray::::try_new( + field.clone(), + offsets, + values, + None, + )?)) +} + +/// array_distinct SQL function +/// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4] +pub fn array_distinct(args: &[ArrayRef]) -> Result { + assert_eq!(args.len(), 1); + + // handle null + if args[0].data_type() == &DataType::Null { + return Ok(args[0].clone()); + } + + // handle for list & largelist + match args[0].data_type() { + DataType::List(field) => { + let array = as_list_array(&args[0])?; + general_array_distinct(array, field) + } + DataType::LargeList(field) => { + let array = as_large_list_array(&args[0])?; + general_array_distinct(array, field) + } + _ => internal_err!("array_distinct only support list array"), + } } #[cfg(test)] From 009d6de58e4900272eb6c1c9a6ce8bf4b5711e64 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Mon, 27 Nov 2023 20:56:29 +0800 Subject: [PATCH 4/8] add largelist for slt --- datafusion/sqllogictest/test_files/array.slt | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index d2a1c91aed3d..12dc9f53b368 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -206,6 +206,14 @@ AS VALUES (make_array([5,6], [5,6], NULL)) ; +statement ok +CREATE TABLE array_distinct_table_1D_large +AS VALUES + (arrow_cast(make_array(1, 1, 2, 2, 3), 'LargeList(Int64)')), + (arrow_cast(make_array(1, 2, 3, 4, 5), 'LargeList(Int64)')), + (arrow_cast(make_array(3, 5, 3, 3, 3), 'LargeList(Int64)')) +; + statement ok CREATE TABLE array_intersect_table_1D AS VALUES @@ -2780,6 +2788,14 @@ from array_distinct_table_2D; [[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]] [, [5, 6]] +query ? +select array_distinct(column1) +from array_distinct_table_1D_large; +---- +[1, 2, 3] +[1, 2, 3, 4, 5] +[3, 5] + query ??? select array_intersect(column1, column2), array_intersect(column3, column4), From 7f0152e0c14afc62fd2b24a6b1fcaad22bf99633 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Wed, 29 Nov 2023 21:56:30 +0800 Subject: [PATCH 5/8] Use collect for rows & init capcity for offsets. --- datafusion/physical-expr/src/array_expressions.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 565e5190bffc..53f7690a332f 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -2003,17 +2003,15 @@ pub fn general_array_distinct( field: &FieldRef, ) -> Result { let dt = array.value_type(); - let mut offsets = vec![OffsetSize::usize_as(0)]; - let mut new_arrays = vec![]; + let mut offsets = Vec::with_capacity(array.len()); + offsets.push(OffsetSize::usize_as(0)); + let mut new_arrays = Vec::with_capacity(array.len()); let converter = RowConverter::new(vec![SortField::new(dt.clone())])?; // distinct for each list in ListArray for arr in array.iter().flatten() { let values = converter.convert_columns(&[arr])?; - let mut rows = Vec::with_capacity(values.num_rows()); // sort elements in list and remove duplicates - for val in values.iter().sorted().dedup() { - rows.push(val); - } + let rows = values.iter().sorted().dedup().collect::>(); let last_offset: OffsetSize = match offsets.last().copied() { Some(offset) => offset, None => return internal_err!("offsets should not be empty"), From c2f54510fc83e729906d4d5f4b4c930f6ab09ee2 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Thu, 30 Nov 2023 10:47:15 +0800 Subject: [PATCH 6/8] fixup: remove useless match --- datafusion/expr/src/built_in_function.rs | 140 ++++++++++-------- .../physical-expr/src/array_expressions.rs | 5 +- 2 files changed, 76 insertions(+), 69 deletions(-) diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index 3e11e92bc538..055beb9f8d0e 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -1556,71 +1556,81 @@ impl BuiltinScalarFunction { // other functions BuiltinScalarFunction::ArrowTypeof => &["arrow_typeof"], - // array functions - BuiltinScalarFunction::ArrayAppend => &[ - "array_append", - "list_append", - "array_push_back", - "list_push_back", - ], - BuiltinScalarFunction::ArrayConcat => { - &["array_concat", "array_cat", "list_concat", "list_cat"] - } - BuiltinScalarFunction::ArrayDims => &["array_dims", "list_dims"], - BuiltinScalarFunction::ArrayDistinct => &["array_distinct", "list_distinct"], - BuiltinScalarFunction::ArrayEmpty => &["empty"], - BuiltinScalarFunction::ArrayElement => &[ - "array_element", - "array_extract", - "list_element", - "list_extract", - ], - BuiltinScalarFunction::ArrayExcept => &["array_except", "list_except"], - BuiltinScalarFunction::Flatten => &["flatten"], - BuiltinScalarFunction::ArrayHasAll => &["array_has_all", "list_has_all"], - BuiltinScalarFunction::ArrayHasAny => &["array_has_any", "list_has_any"], - BuiltinScalarFunction::ArrayHas => { - &["array_has", "list_has", "array_contains", "list_contains"] - } - BuiltinScalarFunction::ArrayLength => &["array_length", "list_length"], - BuiltinScalarFunction::ArrayNdims => &["array_ndims", "list_ndims"], - BuiltinScalarFunction::ArrayPopFront => &["array_pop_front", "list_pop_front"], - BuiltinScalarFunction::ArrayPopBack => &["array_pop_back", "list_pop_back"], - BuiltinScalarFunction::ArrayPosition => &[ - "array_position", - "list_position", - "array_indexof", - "list_indexof", - ], - BuiltinScalarFunction::ArrayPositions => &["array_positions", "list_positions"], - BuiltinScalarFunction::ArrayPrepend => &[ - "array_prepend", - "list_prepend", - "array_push_front", - "list_push_front", - ], - BuiltinScalarFunction::ArrayRepeat => &["array_repeat", "list_repeat"], - BuiltinScalarFunction::ArrayRemove => &["array_remove", "list_remove"], - BuiltinScalarFunction::ArrayRemoveN => &["array_remove_n", "list_remove_n"], - BuiltinScalarFunction::ArrayRemoveAll => &["array_remove_all", "list_remove_all"], - BuiltinScalarFunction::ArrayReplace => &["array_replace", "list_replace"], - BuiltinScalarFunction::ArrayReplaceN => &["array_replace_n", "list_replace_n"], - BuiltinScalarFunction::ArrayReplaceAll => { - &["array_replace_all", "list_replace_all"] - } - BuiltinScalarFunction::ArraySlice => &["array_slice", "list_slice"], - BuiltinScalarFunction::ArrayToString => &[ - "array_to_string", - "list_to_string", - "array_join", - "list_join", - ], - BuiltinScalarFunction::ArrayUnion => &["array_union", "list_union"], - BuiltinScalarFunction::Cardinality => &["cardinality"], - BuiltinScalarFunction::MakeArray => &["make_array", "make_list"], - BuiltinScalarFunction::ArrayIntersect => &["array_intersect", "list_intersect"], - BuiltinScalarFunction::OverLay => &["overlay"], - BuiltinScalarFunction::Range => &["range", "generate_series"], + // array functions + BuiltinScalarFunction::ArrayAppend => &[ + "array_append", + "list_append", + "array_push_back", + "list_push_back", + ], + BuiltinScalarFunction::ArrayConcat => { + &["array_concat", "array_cat", "list_concat", "list_cat"] + } + BuiltinScalarFunction::ArrayDims => &["array_dims", "list_dims"], + BuiltinScalarFunction::ArrayDistinct => &["array_distinct", "list_distinct"], + BuiltinScalarFunction::ArrayEmpty => &["empty"], + BuiltinScalarFunction::ArrayElement => &[ + "array_element", + "array_extract", + "list_element", + "list_extract", + ], + BuiltinScalarFunction::ArrayExcept => &["array_except", "list_except"], + BuiltinScalarFunction::Flatten => &["flatten"], + BuiltinScalarFunction::ArrayHasAll => &["array_has_all", "list_has_all"], + BuiltinScalarFunction::ArrayHasAny => &["array_has_any", "list_has_any"], + BuiltinScalarFunction::ArrayHas => { + &["array_has", "list_has", "array_contains", "list_contains"] + } + BuiltinScalarFunction::ArrayLength => &["array_length", "list_length"], + BuiltinScalarFunction::ArrayNdims => &["array_ndims", "list_ndims"], + BuiltinScalarFunction::ArrayPopFront => { + &["array_pop_front", "list_pop_front"] + } + BuiltinScalarFunction::ArrayPopBack => &["array_pop_back", "list_pop_back"], + BuiltinScalarFunction::ArrayPosition => &[ + "array_position", + "list_position", + "array_indexof", + "list_indexof", + ], + BuiltinScalarFunction::ArrayPositions => { + &["array_positions", "list_positions"] + } + BuiltinScalarFunction::ArrayPrepend => &[ + "array_prepend", + "list_prepend", + "array_push_front", + "list_push_front", + ], + BuiltinScalarFunction::ArrayRepeat => &["array_repeat", "list_repeat"], + BuiltinScalarFunction::ArrayRemove => &["array_remove", "list_remove"], + BuiltinScalarFunction::ArrayRemoveN => &["array_remove_n", "list_remove_n"], + BuiltinScalarFunction::ArrayRemoveAll => { + &["array_remove_all", "list_remove_all"] + } + BuiltinScalarFunction::ArrayReplace => &["array_replace", "list_replace"], + BuiltinScalarFunction::ArrayReplaceN => { + &["array_replace_n", "list_replace_n"] + } + BuiltinScalarFunction::ArrayReplaceAll => { + &["array_replace_all", "list_replace_all"] + } + BuiltinScalarFunction::ArraySlice => &["array_slice", "list_slice"], + BuiltinScalarFunction::ArrayToString => &[ + "array_to_string", + "list_to_string", + "array_join", + "list_join", + ], + BuiltinScalarFunction::ArrayUnion => &["array_union", "list_union"], + BuiltinScalarFunction::Cardinality => &["cardinality"], + BuiltinScalarFunction::MakeArray => &["make_array", "make_list"], + BuiltinScalarFunction::ArrayIntersect => { + &["array_intersect", "list_intersect"] + } + BuiltinScalarFunction::OverLay => &["overlay"], + BuiltinScalarFunction::Range => &["range", "generate_series"], // struct functions BuiltinScalarFunction::Struct => &["struct"], diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 53f7690a332f..cb5e00ac55fb 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -2012,10 +2012,7 @@ pub fn general_array_distinct( let values = converter.convert_columns(&[arr])?; // sort elements in list and remove duplicates let rows = values.iter().sorted().dedup().collect::>(); - let last_offset: OffsetSize = match offsets.last().copied() { - Some(offset) => offset, - None => return internal_err!("offsets should not be empty"), - }; + let last_offset: OffsetSize = offsets.last().copied().unwrap(); offsets.push(last_offset + OffsetSize::usize_as(rows.len())); let arrays = converter.convert_rows(rows)?; let array = match arrays.get(0) { From ac3321599a6e295f0084d817add02ea3402352d0 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Tue, 5 Dec 2023 19:51:53 +0800 Subject: [PATCH 7/8] fix fmt --- datafusion/physical-expr/src/array_expressions.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index cb5e00ac55fb..bfa20089ba07 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -30,8 +30,8 @@ use arrow_buffer::NullBuffer; use arrow_schema::FieldRef; use datafusion_common::cast::{ - as_generic_list_array, as_generic_string_array, as_int64_array, as_list_array, - as_null_array, as_string_array,as_large_list_array + as_generic_list_array, as_generic_string_array, as_int64_array, as_large_list_array, + as_list_array, as_null_array, as_string_array, }; use datafusion_common::utils::{array_into_list_array, list_ndims}; use datafusion_common::{ From 9d8bc006ba7c9573d6bc841c3039005493a28f01 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Thu, 7 Dec 2023 12:39:53 +0800 Subject: [PATCH 8/8] fix fmt --- datafusion/proto/src/logical_plan/from_proto.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 6b9b6badf9c5..1b7e3d483818 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -44,11 +44,11 @@ use datafusion_expr::{ array_element, array_except, array_has, array_has_all, array_has_any, array_intersect, array_length, array_ndims, array_position, array_positions, array_prepend, array_remove, array_remove_all, array_remove_n, array_repeat, - array_replace, array_replace_all, array_replace_n, array_slice, array_sort, array_to_string, - arrow_typeof, ascii, asin, asinh, atan, atan2, atanh, bit_length, btrim, cardinality, - cbrt, ceil, character_length, chr, coalesce, concat_expr, concat_ws_expr, cos, cosh, - cot, current_date, current_time, date_bin, date_part, date_trunc, decode, degrees, - digest, encode, exp, + array_replace, array_replace_all, array_replace_n, array_slice, array_sort, + array_to_string, arrow_typeof, ascii, asin, asinh, atan, atan2, atanh, bit_length, + btrim, cardinality, cbrt, ceil, character_length, chr, coalesce, concat_expr, + concat_ws_expr, cos, cosh, cot, current_date, current_time, date_bin, date_part, + date_trunc, decode, degrees, digest, encode, exp, expr::{self, InList, Sort, WindowFunction}, factorial, find_in_set, flatten, floor, from_unixtime, gcd, gen_range, isnan, iszero, lcm, left, levenshtein, ln, log, log10, log2,