Skip to content

Commit

Permalink
fix largelist
Browse files Browse the repository at this point in the history
  • Loading branch information
my-vegetable-has-exploded committed Nov 27, 2023
1 parent 138f14d commit 86ccb87
Showing 1 changed file with 39 additions and 23 deletions.
62 changes: 39 additions & 23 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use arrow_buffer::NullBuffer;

use arrow_schema::FieldRef;
use datafusion_common::cast::{
as_generic_string_array, as_int64_array, as_list_array, as_string_array,
as_generic_string_array, as_int64_array, as_large_list_array, as_list_array,
as_string_array,
};
use datafusion_common::utils::array_into_list_array;
use datafusion_common::{
Expand Down Expand Up @@ -1991,38 +1992,27 @@ pub fn array_intersect(args: &[ArrayRef]) -> Result<ArrayRef> {
}
}

/// array_distinct SQL function
/// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4]
pub fn array_distinct(args: &[ArrayRef]) -> Result<ArrayRef> {
assert_eq!(args.len(), 1);

// handle null
if args[0].data_type() == &DataType::Null {
return Ok(args[0].clone());
}

let array = as_list_array(&args[0])?;
pub fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
array: &GenericListArray<OffsetSize>,
field: &FieldRef,
) -> Result<ArrayRef> {
let dt = array.value_type();

let mut offsets = vec![0];
let mut offsets = vec![OffsetSize::usize_as(0)];
let mut new_arrays = vec![];

let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
// distinct for each list in ListArray
for arr in array.iter().flatten() {
let values = converter.convert_columns(&[arr])?;

let mut rows = Vec::with_capacity(values.num_rows());
// sort elements in list and remove duplicates
for val in values.iter().sorted().dedup() {
rows.push(val);
}

let last_offset: i32 = match offsets.last().copied() {
let last_offset: OffsetSize = match offsets.last().copied() {
Some(offset) => offset,
None => return internal_err!("offsets should not be empty"),
};
offsets.push(last_offset + rows.len() as i32);
offsets.push(last_offset + OffsetSize::usize_as(rows.len()));
let arrays = converter.convert_rows(rows)?;
let array = match arrays.get(0) {
Some(array) => array.clone(),
Expand All @@ -2032,13 +2022,39 @@ pub fn array_distinct(args: &[ArrayRef]) -> Result<ArrayRef> {
};
new_arrays.push(array);
}

let field = Arc::new(Field::new("item", dt, true));
let offsets = OffsetBuffer::new(offsets.into());
let new_arrays_ref = new_arrays.iter().map(|v| v.as_ref()).collect::<Vec<_>>();
let values = compute::concat(&new_arrays_ref)?;
let arr = Arc::new(ListArray::try_new(field, offsets, values, None)?);
Ok(arr)
Ok(Arc::new(GenericListArray::<OffsetSize>::try_new(
field.clone(),
offsets,
values,
None,
)?))
}

/// array_distinct SQL function
/// example: from list [1, 3, 2, 3, 1, 2, 4] to [1, 2, 3, 4]
pub fn array_distinct(args: &[ArrayRef]) -> Result<ArrayRef> {
assert_eq!(args.len(), 1);

// handle null
if args[0].data_type() == &DataType::Null {
return Ok(args[0].clone());
}

// handle for list & largelist
match args[0].data_type() {
DataType::List(field) => {
let array = as_list_array(&args[0])?;
general_array_distinct(array, field)
}
DataType::LargeList(field) => {
let array = as_large_list_array(&args[0])?;
general_array_distinct(array, field)
}
_ => internal_err!("array_distinct only support list array"),
}
}

#[cfg(test)]
Expand Down

0 comments on commit 86ccb87

Please sign in to comment.