Skip to content

Commit

Permalink
feat: support largelist in array_slice (#8561)
Browse files Browse the repository at this point in the history
* support largelist in array_slice

* remove T trait

* fix clippy
  • Loading branch information
Weijun-H authored Dec 17, 2023
1 parent 2e16c75 commit fc6cc48
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 31 deletions.
110 changes: 79 additions & 31 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,11 +524,33 @@ pub fn array_except(args: &[ArrayRef]) -> Result<ArrayRef> {
///
/// See test cases in `array.slt` for more details.
pub fn array_slice(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
let from_array = as_int64_array(&args[1])?;
let to_array = as_int64_array(&args[2])?;
let array_data_type = args[0].data_type();
match array_data_type {
DataType::List(_) => {
let array = as_list_array(&args[0])?;
let from_array = as_int64_array(&args[1])?;
let to_array = as_int64_array(&args[2])?;
general_array_slice::<i32>(array, from_array, to_array)
}
DataType::LargeList(_) => {
let array = as_large_list_array(&args[0])?;
let from_array = as_int64_array(&args[1])?;
let to_array = as_int64_array(&args[2])?;
general_array_slice::<i64>(array, from_array, to_array)
}
_ => not_impl_err!("array_slice does not support type: {:?}", array_data_type),
}
}

let values = list_array.values();
fn general_array_slice<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
from_array: &Int64Array,
to_array: &Int64Array,
) -> Result<ArrayRef>
where
i64: TryInto<O>,
{
let values = array.values();
let original_data = values.to_data();
let capacity = Capacities::Array(original_data.len());

Expand All @@ -539,72 +561,98 @@ pub fn array_slice(args: &[ArrayRef]) -> Result<ArrayRef> {
// We have the slice syntax compatible with DuckDB v0.8.1.
// The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb.

fn adjusted_from_index(index: i64, len: usize) -> Option<i64> {
fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
where
i64: TryInto<O>,
{
// 0 ~ len - 1
let adjusted_zero_index = if index < 0 {
index + len as i64
if let Ok(index) = index.try_into() {
index + len
} else {
return exec_err!("array_slice got invalid index: {}", index);
}
} else {
// array_slice(arr, 1, to) is the same as array_slice(arr, 0, to)
std::cmp::max(index - 1, 0)
if let Ok(index) = index.try_into() {
std::cmp::max(index - O::usize_as(1), O::usize_as(0))
} else {
return exec_err!("array_slice got invalid index: {}", index);
}
};

if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 {
Some(adjusted_zero_index)
if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
Ok(Some(adjusted_zero_index))
} else {
// Out of bounds
None
Ok(None)
}
}

fn adjusted_to_index(index: i64, len: usize) -> Option<i64> {
fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
where
i64: TryInto<O>,
{
// 0 ~ len - 1
let adjusted_zero_index = if index < 0 {
// array_slice in duckdb with negative to_index is python-like, so index itself is exclusive
index + len as i64 - 1
if let Ok(index) = index.try_into() {
index + len - O::usize_as(1)
} else {
return exec_err!("array_slice got invalid index: {}", index);
}
} else {
// array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len)
std::cmp::min(index - 1, len as i64 - 1)
if let Ok(index) = index.try_into() {
std::cmp::min(index - O::usize_as(1), len - O::usize_as(1))
} else {
return exec_err!("array_slice got invalid index: {}", index);
}
};

if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 {
Some(adjusted_zero_index)
if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
Ok(Some(adjusted_zero_index))
} else {
// Out of bounds
None
Ok(None)
}
}

let mut offsets = vec![0];
let mut offsets = vec![O::usize_as(0)];

for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
let start = offset_window[0] as usize;
let end = offset_window[1] as usize;
for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
let start = offset_window[0];
let end = offset_window[1];
let len = end - start;

// len 0 indicate array is null, return empty array in this row.
if len == 0 {
if len == O::usize_as(0) {
offsets.push(offsets[row_index]);
continue;
}

// If index is null, we consider it as the minimum / maximum index of the array.
let from_index = if from_array.is_null(row_index) {
Some(0)
Some(O::usize_as(0))
} else {
adjusted_from_index(from_array.value(row_index), len)
adjusted_from_index::<O>(from_array.value(row_index), len)?
};

let to_index = if to_array.is_null(row_index) {
Some(len as i64 - 1)
Some(len - O::usize_as(1))
} else {
adjusted_to_index(to_array.value(row_index), len)
adjusted_to_index::<O>(to_array.value(row_index), len)?
};

if let (Some(from), Some(to)) = (from_index, to_index) {
if from <= to {
assert!(start + to as usize <= end);
mutable.extend(0, start + from as usize, start + to as usize + 1);
offsets.push(offsets[row_index] + (to - from + 1) as i32);
assert!(start + to <= end);
mutable.extend(
0,
(start + from).to_usize().unwrap(),
(start + to + O::usize_as(1)).to_usize().unwrap(),
);
offsets.push(offsets[row_index] + (to - from + O::usize_as(1)));
} else {
// invalid range, return empty array
offsets.push(offsets[row_index]);
Expand All @@ -617,9 +665,9 @@ pub fn array_slice(args: &[ArrayRef]) -> Result<ArrayRef> {

let data = mutable.freeze();

Ok(Arc::new(ListArray::try_new(
Arc::new(Field::new("item", list_array.value_type(), true)),
OffsetBuffer::new(offsets.into()),
Ok(Arc::new(GenericListArray::<O>::try_new(
Arc::new(Field::new("item", array.value_type(), true)),
OffsetBuffer::<O>::new(offsets.into()),
arrow_array::make_array(data),
None,
)?))
Expand Down
Loading

0 comments on commit fc6cc48

Please sign in to comment.