Skip to content

Commit

Permalink
implement distinct func
Browse files Browse the repository at this point in the history
implement slt & proto

fix null & empty list
  • Loading branch information
my-vegetable-has-exploded committed Nov 22, 2023
1 parent 952e7c3 commit 1e615b2
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 8 deletions.
6 changes: 6 additions & 0 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ pub enum BuiltinScalarFunction {
ArrayPopBack,
/// array_dims
ArrayDims,
/// array_distinct
ArrayDistinct,
/// array_element
ArrayElement,
/// array_empty
Expand Down Expand Up @@ -395,6 +397,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayHasAny => Volatility::Immutable,
BuiltinScalarFunction::ArrayHas => Volatility::Immutable,
BuiltinScalarFunction::ArrayDims => Volatility::Immutable,
BuiltinScalarFunction::ArrayDistinct => Volatility::Immutable,
BuiltinScalarFunction::ArrayElement => Volatility::Immutable,
BuiltinScalarFunction::ArrayExcept => Volatility::Immutable,
BuiltinScalarFunction::ArrayLength => Volatility::Immutable,
Expand Down Expand Up @@ -571,6 +574,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::ArrayDims => {
Ok(List(Arc::new(Field::new("item", UInt64, true))))
}
BuiltinScalarFunction::ArrayDistinct => Ok(input_expr_types[0].clone()),
BuiltinScalarFunction::ArrayElement => match &input_expr_types[0] {
List(field) => Ok(field.data_type().clone()),
_ => plan_err!(
Expand Down Expand Up @@ -912,6 +916,7 @@ impl BuiltinScalarFunction {
Signature::variadic_any(self.volatility())
}
BuiltinScalarFunction::ArrayNdims => Signature::any(1, self.volatility()),
BuiltinScalarFunction::ArrayDistinct => Signature::any(1, self.volatility()),
BuiltinScalarFunction::ArrayPosition => {
Signature::variadic_any(self.volatility())
}
Expand Down Expand Up @@ -1531,6 +1536,7 @@ fn aliases(func: &BuiltinScalarFunction) -> &'static [&'static str] {
&["array_concat", "array_cat", "list_concat", "list_cat"]
}
BuiltinScalarFunction::ArrayDims => &["array_dims", "list_dims"],
BuiltinScalarFunction::ArrayDistinct => &["array_distinct", "list_distinct"],
BuiltinScalarFunction::ArrayEmpty => &["empty"],
BuiltinScalarFunction::ArrayElement => &[
"array_element",
Expand Down
6 changes: 6 additions & 0 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,12 @@ scalar_expr!(
array,
"returns the number of dimensions of the array."
);
scalar_expr!(
ArrayDistinct,
array_distinct,
array,
"return distinct values from the array after removing duplicates."
);
scalar_expr!(
ArrayPosition,
array_position,
Expand Down
77 changes: 77 additions & 0 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1991,6 +1991,56 @@ pub fn array_intersect(args: &[ArrayRef]) -> Result<ArrayRef> {
}
}

/// array_distinct SQL function
/// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4]
pub fn array_distinct(args: &[ArrayRef]) -> Result<ArrayRef> {
assert_eq!(args.len(), 1);

// handle null
if args[0].data_type() == &DataType::Null {
return Ok(args[0].clone());
}

let array = as_list_array(&args[0])?;
let dt = array.value_type();

let mut offsets = vec![0];
let mut new_arrays = vec![];

let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
// distinct for each list in ListArray
for arr in array.iter().flatten() {
let values = converter.convert_columns(&[arr])?;

let mut rows = Vec::with_capacity(values.num_rows());
// sort elements in list and remove duplicates
for val in values.iter().sorted().dedup() {
rows.push(val);
}

let last_offset: i32 = match offsets.last().copied() {
Some(offset) => offset,
None => return internal_err!("offsets should not be empty"),
};
offsets.push(last_offset + rows.len() as i32);
let arrays = converter.convert_rows(rows)?;
let array = match arrays.get(0) {
Some(array) => array.clone(),
None => {
return internal_err!("array_distinct: failed to get array from rows")
}
};
new_arrays.push(array);
}

let field = Arc::new(Field::new("item", dt, true));
let offsets = OffsetBuffer::new(offsets.into());
let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::<Vec<_>>();
let values = compute::concat(&new_arrays_ref)?;
let arr = Arc::new(ListArray::try_new(field, offsets, values, None)?);
Ok(arr)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -3028,4 +3078,31 @@ mod tests {

make_array(&[arr1, arr2]).expect("failed to initialize function array")
}

#[test]
fn test_array_distinct() {
// test: from array [[1, 3, 2, 3, 1, 2, 4], [1, 1]] to [[1, 2, 3, 4], [1]]
let data = vec![
Some(vec![
Some(1),
Some(3),
Some(2),
Some(3),
Some(1),
Some(2),
Some(4),
]),
Some(vec![Some(1), Some(1)]),
];
let excepted = vec![
Some(vec![Some(1), Some(2), Some(3), Some(4)]),
Some(vec![Some(1)]),
];
let input =
Arc::new(ListArray::from_iter_primitive::<Int64Type, _, _>(data)) as ArrayRef;
let excepted = ListArray::from_iter_primitive::<Int64Type, _, _>(excepted);
let result = array_distinct(&[input]).unwrap();
let result = as_list_array(&result).unwrap();
assert_eq!(result, &excepted);
}
}
3 changes: 3 additions & 0 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ pub fn create_physical_fun(
BuiltinScalarFunction::ArrayDims => {
Arc::new(|args| make_scalar_function(array_expressions::array_dims)(args))
}
BuiltinScalarFunction::ArrayDistinct => {
Arc::new(|args| make_scalar_function(array_expressions::array_distinct)(args))
}
BuiltinScalarFunction::ArrayElement => {
Arc::new(|args| make_scalar_function(array_expressions::array_element)(args))
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ enum ScalarFunction {
ArrayExcept = 123;
ArrayPopFront = 124;
Levenshtein = 125;
ArrayDistinct = 126;
}

message ScalarFunctionNode {
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 13 additions & 8 deletions datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,15 @@ use datafusion_common::{
DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, Result, ScalarValue,
};
use datafusion_expr::{
abs, acos, acosh, array, array_append, array_concat, array_dims, array_element,
array_except, array_has, array_has_all, array_has_any, array_intersect, array_length,
array_ndims, array_position, array_positions, array_prepend, array_remove,
array_remove_all, array_remove_n, array_repeat, array_replace, array_replace_all,
array_replace_n, array_slice, array_to_string, arrow_typeof, ascii, asin, asinh,
atan, atan2, atanh, bit_length, btrim, cardinality, cbrt, ceil, character_length,
chr, coalesce, concat_expr, concat_ws_expr, cos, cosh, cot, current_date,
current_time, date_bin, date_part, date_trunc, decode, degrees, digest, encode, exp,
abs, acos, acosh, array, array_append, array_concat, array_dims, array_distinct,
array_element, array_except, array_has, array_has_all, array_has_any,
array_intersect, array_length, array_ndims, array_position, array_positions,
array_prepend, array_remove, array_remove_all, array_remove_n, array_repeat,
array_replace, array_replace_all, array_replace_n, array_slice, array_to_string,
arrow_typeof, ascii, asin, asinh, atan, atan2, atanh, bit_length, btrim, cardinality,
cbrt, ceil, character_length, chr, coalesce, concat_expr, concat_ws_expr, cos, cosh,
cot, current_date, current_time, date_bin, date_part, date_trunc, decode, degrees,
digest, encode, exp,
expr::{self, InList, Sort, WindowFunction},
factorial, flatten, floor, from_unixtime, gcd, gen_range, isnan, iszero, lcm, left,
levenshtein, ln, log, log10, log2,
Expand Down Expand Up @@ -470,6 +471,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
ScalarFunction::ArrayHasAny => Self::ArrayHasAny,
ScalarFunction::ArrayHas => Self::ArrayHas,
ScalarFunction::ArrayDims => Self::ArrayDims,
ScalarFunction::ArrayDistinct => Self::ArrayDistinct,
ScalarFunction::ArrayElement => Self::ArrayElement,
ScalarFunction::Flatten => Self::Flatten,
ScalarFunction::ArrayLength => Self::ArrayLength,
Expand Down Expand Up @@ -1441,6 +1443,9 @@ pub fn parse_expr(
ScalarFunction::ArrayDims => {
Ok(array_dims(parse_expr(&args[0], registry)?))
}
ScalarFunction::ArrayDistinct => {
Ok(array_distinct(parse_expr(&args[0], registry)?))
}
ScalarFunction::ArrayElement => Ok(array_element(
parse_expr(&args[0], registry)?,
parse_expr(&args[1], registry)?,
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
BuiltinScalarFunction::ArrayHasAny => Self::ArrayHasAny,
BuiltinScalarFunction::ArrayHas => Self::ArrayHas,
BuiltinScalarFunction::ArrayDims => Self::ArrayDims,
BuiltinScalarFunction::ArrayDistinct => Self::ArrayDistinct,
BuiltinScalarFunction::ArrayElement => Self::ArrayElement,
BuiltinScalarFunction::Flatten => Self::Flatten,
BuiltinScalarFunction::ArrayLength => Self::ArrayLength,
Expand Down
63 changes: 63 additions & 0 deletions datafusion/sqllogictest/test_files/array.slt
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,30 @@ AS VALUES
(make_array([[1], [2]], [[2], [3]]), make_array([1], [2]))
;

statement ok
CREATE TABLE array_distinct_table_1D
AS VALUES
(make_array(1, 1, 2, 2, 3)),
(make_array(1, 2, 3, 4, 5)),
(make_array(3, 5, 3, 3, 3))
;

statement ok
CREATE TABLE array_distinct_table_1D_UTF8
AS VALUES
(make_array('a', 'a', 'bc', 'bc', 'def')),
(make_array('a', 'bc', 'def', 'defg', 'defg')),
(make_array('defg', 'defg', 'defg', 'defg', 'defg'))
;

statement ok
CREATE TABLE array_distinct_table_2D
AS VALUES
(make_array([1,2], [1,2], [3,4], [3,4], [5,6])),
(make_array([1,2], [3,4], [5,6], [7,8], [9,10])),
(make_array([5,6], [5,6], NULL))
;

statement ok
CREATE TABLE array_intersect_table_1D
AS VALUES
Expand Down Expand Up @@ -2626,6 +2650,45 @@ select array_has_all(make_array(1,2,3), make_array(1,3)),
----
true false true false false false true true false false true false true

query ?
select array_distinct(null);
----
NULL

query ?
select array_distinct([]);
----
[]

query ?
select array_distinct([[], []]);
----
[[]]

query ?
select array_distinct(column1)
from array_distinct_table_1D;
----
[1, 2, 3]
[1, 2, 3, 4, 5]
[3, 5]

query ?
select array_distinct(column1)
from array_distinct_table_1D_UTF8;
----
[a, bc, def]
[a, bc, def, defg]
[defg]

query ?
select array_distinct(column1)
from array_distinct_table_2D;
----
[[1, 2], [3, 4], [5, 6]]
[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]]
[, [5, 6]]

query ???
select array_intersect(column1, column2),
array_intersect(column3, column4),
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ Unlike to some databases the math functions in Datafusion works the same way as
| array_has_all(array, sub-array) | Returns true if all elements of sub-array exist in array `array_has_all([1,2,3], [1,3]) -> true` |
| array_has_any(array, sub-array) | Returns true if any elements exist in both arrays `array_has_any([1,2,3], [1,4]) -> true` |
| array_dims(array) | Returns an array of the array's dimensions. `array_dims([[1, 2, 3], [4, 5, 6]]) -> [2, 3]` |
| array_distinct(array) | Returns distinct values from the array after removing duplicates. `array_distinct([1, 3, 2, 3, 1, 2, 4]) -> [1, 2, 3, 4]` |
| array_element(array, index) | Extracts the element with the index n from the array `array_element([1, 2, 3, 4], 3) -> 3` |
| flatten(array) | Converts an array of arrays to a flat array `flatten([[1], [2, 3], [4, 5, 6]]) -> [1, 2, 3, 4, 5, 6]` |
| array_length(array, dimension) | Returns the length of the array dimension. `array_length([1, 2, 3, 4, 5]) -> 5` |
Expand Down

0 comments on commit 1e615b2

Please sign in to comment.