Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support largelist in array_slice #8561

Merged
merged 3 commits into from
Dec 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 79 additions & 31 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,11 +524,33 @@ pub fn array_except(args: &[ArrayRef]) -> Result<ArrayRef> {
///
/// See test cases in `array.slt` for more details.
pub fn array_slice(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
let from_array = as_int64_array(&args[1])?;
let to_array = as_int64_array(&args[2])?;
let array_data_type = args[0].data_type();
match array_data_type {
DataType::List(_) => {
let array = as_list_array(&args[0])?;
let from_array = as_int64_array(&args[1])?;
let to_array = as_int64_array(&args[2])?;
general_array_slice::<i32>(array, from_array, to_array)
}
DataType::LargeList(_) => {
let array = as_large_list_array(&args[0])?;
let from_array = as_int64_array(&args[1])?;
let to_array = as_int64_array(&args[2])?;
general_array_slice::<i64>(array, from_array, to_array)
}
_ => not_impl_err!("array_slice does not support type: {:?}", array_data_type),
}
}

let values = list_array.values();
fn general_array_slice<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
from_array: &Int64Array,
to_array: &Int64Array,
) -> Result<ArrayRef>
where
i64: TryInto<O>,
{
let values = array.values();
let original_data = values.to_data();
let capacity = Capacities::Array(original_data.len());

Expand All @@ -539,72 +561,98 @@ pub fn array_slice(args: &[ArrayRef]) -> Result<ArrayRef> {
// We have the slice syntax compatible with DuckDB v0.8.1.
// The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb.

fn adjusted_from_index(index: i64, len: usize) -> Option<i64> {
fn adjusted_from_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
where
i64: TryInto<O>,
{
// 0 ~ len - 1
let adjusted_zero_index = if index < 0 {
index + len as i64
if let Ok(index) = index.try_into() {
index + len
} else {
return exec_err!("array_slice got invalid index: {}", index);
}
} else {
// array_slice(arr, 1, to) is the same as array_slice(arr, 0, to)
std::cmp::max(index - 1, 0)
if let Ok(index) = index.try_into() {
std::cmp::max(index - O::usize_as(1), O::usize_as(0))
} else {
return exec_err!("array_slice got invalid index: {}", index);
}
};

if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 {
Some(adjusted_zero_index)
if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
Ok(Some(adjusted_zero_index))
} else {
// Out of bounds
None
Ok(None)
}
}

fn adjusted_to_index(index: i64, len: usize) -> Option<i64> {
fn adjusted_to_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
where
i64: TryInto<O>,
{
// 0 ~ len - 1
let adjusted_zero_index = if index < 0 {
// array_slice in duckdb with negative to_index is python-like, so index itself is exclusive
index + len as i64 - 1
if let Ok(index) = index.try_into() {
index + len - O::usize_as(1)
} else {
return exec_err!("array_slice got invalid index: {}", index);
}
} else {
// array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len)
std::cmp::min(index - 1, len as i64 - 1)
if let Ok(index) = index.try_into() {
std::cmp::min(index - O::usize_as(1), len - O::usize_as(1))
} else {
return exec_err!("array_slice got invalid index: {}", index);
}
};

if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 {
Some(adjusted_zero_index)
if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
Ok(Some(adjusted_zero_index))
} else {
// Out of bounds
None
Ok(None)
}
}

let mut offsets = vec![0];
let mut offsets = vec![O::usize_as(0)];

for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
let start = offset_window[0] as usize;
let end = offset_window[1] as usize;
for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
let start = offset_window[0];
let end = offset_window[1];
let len = end - start;

// len 0 indicate array is null, return empty array in this row.
if len == 0 {
if len == O::usize_as(0) {
offsets.push(offsets[row_index]);
continue;
}

// If index is null, we consider it as the minimum / maximum index of the array.
let from_index = if from_array.is_null(row_index) {
Some(0)
Some(O::usize_as(0))
} else {
adjusted_from_index(from_array.value(row_index), len)
adjusted_from_index::<O>(from_array.value(row_index), len)?
};

let to_index = if to_array.is_null(row_index) {
Some(len as i64 - 1)
Some(len - O::usize_as(1))
} else {
adjusted_to_index(to_array.value(row_index), len)
adjusted_to_index::<O>(to_array.value(row_index), len)?
};

if let (Some(from), Some(to)) = (from_index, to_index) {
if from <= to {
assert!(start + to as usize <= end);
mutable.extend(0, start + from as usize, start + to as usize + 1);
offsets.push(offsets[row_index] + (to - from + 1) as i32);
assert!(start + to <= end);
mutable.extend(
0,
(start + from).to_usize().unwrap(),
(start + to + O::usize_as(1)).to_usize().unwrap(),
);
offsets.push(offsets[row_index] + (to - from + O::usize_as(1)));
} else {
// invalid range, return empty array
offsets.push(offsets[row_index]);
Expand All @@ -617,9 +665,9 @@ pub fn array_slice(args: &[ArrayRef]) -> Result<ArrayRef> {

let data = mutable.freeze();

Ok(Arc::new(ListArray::try_new(
Arc::new(Field::new("item", list_array.value_type(), true)),
OffsetBuffer::new(offsets.into()),
Ok(Arc::new(GenericListArray::<O>::try_new(
Arc::new(Field::new("item", array.value_type(), true)),
OffsetBuffer::<O>::new(offsets.into()),
arrow_array::make_array(data),
None,
)?))
Expand Down
Loading