Skip to content

Commit

Permalink
rm convert_first_level_array_to_scalar_vec
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <[email protected]>
  • Loading branch information
jayzhan211 committed Feb 15, 2024
1 parent a3f8e12 commit 085df61
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 54 deletions.
59 changes: 10 additions & 49 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ use arrow::{
UInt16Type, UInt32Type, UInt64Type, UInt8Type, DECIMAL128_MAX_PRECISION,
},
};
use arrow_array::cast::as_list_array;
use arrow_array::{ArrowNativeTypeOp, Scalar};
use arrow_buffer::NullBuffer;

Expand Down Expand Up @@ -2059,73 +2058,37 @@ impl ScalarValue {
/// use datafusion_common::ScalarValue;
/// use arrow::array::ListArray;
/// use arrow::datatypes::{DataType, Int32Type};
/// use datafusion_common::utils::array_into_list_array;
/// use std::sync::Arc;
///
/// let list_arr = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
/// Some(vec![Some(1), Some(2), Some(3)]),
/// None,
/// Some(vec![Some(4), Some(5)])
/// ]);
///
/// let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&list_arr).unwrap();
///
/// let expected = vec![
/// vec![
/// vec![
/// ScalarValue::Int32(Some(1)),
/// ScalarValue::Int32(Some(2)),
/// ScalarValue::Int32(Some(3)),
/// ],
/// vec![],
/// vec![ScalarValue::Int32(Some(4)), ScalarValue::Int32(Some(5))]
/// ],
/// vec![
/// ScalarValue::Int32(Some(4)),
/// ScalarValue::Int32(Some(5)),
/// ],
/// ];
///
/// assert_eq!(scalar_vec, expected);
/// ```
pub fn convert_array_to_scalar_vec(array: &dyn Array) -> Result<Vec<Vec<Self>>> {
let mut scalars = Vec::with_capacity(array.len());

for index in 0..array.len() {
let scalar_values = match array.data_type() {
DataType::List(_) => {
let list_array = as_list_array(array);
match list_array.is_null(index) {
true => Vec::new(),
false => {
let nested_array = list_array.value(index);
ScalarValue::convert_array_to_scalar_vec(&nested_array)?
.into_iter()
.flatten()
.collect()
}
}
}
_ => {
let scalar = ScalarValue::try_from_array(array, index)?;
vec![scalar]
}
};
scalars.push(scalar_values);
}
Ok(scalars)
}

/// convert_array_to_scalar_vec but only convert the first level instead of recursively converting
/// all the levels, so list remains as ScalarValue::List
///
/// Example
/// ```
/// use datafusion_common::ScalarValue;
/// use arrow::array::ListArray;
/// use arrow::datatypes::{DataType, Int32Type};
/// use datafusion_common::utils::array_into_list_array;
/// use std::sync::Arc;
///
/// let list_arr = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
/// Some(vec![Some(1), Some(2), Some(3)]),
/// Some(vec![Some(4), Some(5)])
/// ]);
/// let list_arr = array_into_list_array(Arc::new(list_arr));
///
/// let scalar_vec = ScalarValue::convert_first_level_array_to_scalar_vec(&list_arr).unwrap();
/// let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&list_arr).unwrap();
///
/// let l1 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
/// Some(vec![Some(1), Some(2), Some(3)]),
Expand All @@ -2143,9 +2106,7 @@ impl ScalarValue {
///
/// assert_eq!(scalar_vec, expected);
/// ```
pub fn convert_first_level_array_to_scalar_vec(
array: &dyn Array,
) -> Result<Vec<Vec<Self>>> {
pub fn convert_array_to_scalar_vec(array: &dyn Array) -> Result<Vec<Vec<Self>>> {
let mut scalars = Vec::with_capacity(array.len());

for index in 0..array.len() {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ async fn csv_query_array_agg_distinct() -> Result<()> {
// We should have 1 row containing a list
let column = actual[0].column(0);
assert_eq!(column.len(), 1);

let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&column)?;
let mut scalars = scalar_vec[0].clone();

// workaround lack of Ord of ScalarValue
let cmp = |a: &ScalarValue, b: &ScalarValue| {
a.partial_cmp(b).expect("Can compare ScalarValues")
Expand Down
3 changes: 1 addition & 2 deletions datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
partition_ordering_values.push(self.ordering_values.clone().into());

// Convert array to Scalars to sort them easily. Convert back to array at evaluation.
let array_agg_res =
ScalarValue::convert_first_level_array_to_scalar_vec(array_agg_values)?;
let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?;
for v in array_agg_res.into_iter() {
partition_values.push(v.into());
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr/src/aggregate/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl Accumulator for NthValueAccumulator {
let n_required = self.n.unsigned_abs() as usize;
if self.ordering_req.is_empty() {
let array_agg_res =
ScalarValue::convert_first_level_array_to_scalar_vec(array_agg_values)?;
ScalarValue::convert_array_to_scalar_vec(array_agg_values)?;
for v in array_agg_res.into_iter() {
self.values.extend(v);
if self.values.len() > n_required {
Expand All @@ -260,7 +260,7 @@ impl Accumulator for NthValueAccumulator {
partition_ordering_values.push(self.ordering_values.clone());

let array_agg_res =
ScalarValue::convert_first_level_array_to_scalar_vec(array_agg_values)?;
ScalarValue::convert_array_to_scalar_vec(array_agg_values)?;

for v in array_agg_res.into_iter() {
partition_values.push(v.into());
Expand Down

0 comments on commit 085df61

Please sign in to comment.