From c19d55b0a4b18dd3ce740b74004d70e18ea75b32 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Thu, 2 Nov 2023 08:30:41 +0800 Subject: [PATCH 1/6] checkpoint Signed-off-by: jayzhan211 --- .../physical-expr/src/array_expressions.rs | 29 ++++++++++++++++++- datafusion/physical-expr/src/functions.rs | 2 +- 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 84fd301b84de..6978399f0c48 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -27,7 +27,7 @@ use arrow::datatypes::{DataType, Field, UInt64Type}; use arrow_buffer::NullBuffer; use datafusion_common::cast::{ - as_generic_string_array, as_int64_array, as_list_array, as_string_array, + as_generic_string_array, as_int64_array, as_list_array, as_string_array, as_int32_array, }; use datafusion_common::utils::wrap_into_list_array; use datafusion_common::{ @@ -1436,6 +1436,33 @@ array_replacement_function!( "Array_replace_all SQL function" ); +pub fn array_replace_v2(args: &[ArrayRef]) -> Result { + let list_arr = as_list_array(&args[0])?; + let from_arr = as_int64_array(&args[1])?; + let to_arr = &args[2]; + + let row_number = list_arr.len(); + for i in 0..row_number { + let arr = list_arr.value(i); + match arr.data_type() { + DataType::Int64 => { + let from = from_arr.value(i); + let arr = as_int64_array(&arr)?; + let to_arr = as_int64_array(&to_arr)?; + + let to = to_arr.value(i); + arr.to_data() + + } + _ => not_impl_err!("array_replace_v2 only support int64 array") + } + } + + + let arr = Arc::new(list_arr.to_owned()) as ArrayRef; + Ok(arr) +} + macro_rules! to_string { ($ARG:expr, $ARRAY:expr, $DELIMITER:expr, $NULL_STRING:expr, $WITH_NULL_STRING:expr, $ARRAY_TYPE:ident) => {{ let arr = downcast_arg!($ARRAY, $ARRAY_TYPE); diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index b66bac41014d..d49cf37fdf27 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -514,7 +514,7 @@ pub fn create_physical_fun( make_scalar_function(array_expressions::array_remove_all)(args) }), BuiltinScalarFunction::ArrayReplace => { - Arc::new(|args| make_scalar_function(array_expressions::array_replace)(args)) + Arc::new(|args| make_scalar_function(array_expressions::array_replace_v2)(args)) } BuiltinScalarFunction::ArrayReplaceN => Arc::new(|args| { make_scalar_function(array_expressions::array_replace_n)(args) From f9717eb1de265fa98964d769e214d0f3b0b140b0 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 4 Nov 2023 12:08:24 +0800 Subject: [PATCH 2/6] optimize non-list Signed-off-by: jayzhan211 --- Cargo.toml | 1 + datafusion-cli/Cargo.lock | 1 + datafusion/physical-expr/Cargo.toml | 1 + .../physical-expr/src/array_expressions.rs | 181 ++++++++++++++++-- datafusion/physical-expr/src/functions.rs | 6 +- 5 files changed, 167 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 22d5f2f64464..85acd7dc1a06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ arrow-array = { version = "48.0.0", default-features = false, features = ["chron arrow-buffer = { version = "48.0.0", default-features = false } arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] } arrow-schema = { version = "48.0.0", default-features = false } +arrow-ord = { version = "48.0.0", default-features = false } parquet = { version = "48.0.0", features = ["arrow", "async", "object_store"] } sqlparser = { version = "0.39.0", features = ["visitor"] } chrono = { version = "0.4.31", default-features = false } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index dc828f018fd5..a5eafa68cf44 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1241,6 +1241,7 @@ dependencies = [ "arrow", "arrow-array", "arrow-buffer", + "arrow-ord", "arrow-schema", "base64", "blake2", diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index f7c0221756fd..6f2536e1f11b 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -45,6 +45,7 @@ arrow = { workspace = true } arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-schema = { workspace = true } +arrow-ord = { workspace = true } base64 = { version = "0.21", optional = true } blake2 = { version = "^0.10.2", optional = true } blake3 = { version = "1.0", optional = true } diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 6978399f0c48..29d62af68f6e 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -24,10 +24,12 @@ use arrow::array::*; use arrow::buffer::OffsetBuffer; use arrow::compute; use arrow::datatypes::{DataType, Field, UInt64Type}; +use arrow_array::types::Int64Type; use arrow_buffer::NullBuffer; use datafusion_common::cast::{ - as_generic_string_array, as_int64_array, as_list_array, as_string_array, as_int32_array, + as_generic_string_array, as_int32_array, as_int64_array, as_list_array, + as_string_array, }; use datafusion_common::utils::wrap_into_list_array; use datafusion_common::{ @@ -1436,31 +1438,130 @@ array_replacement_function!( "Array_replace_all SQL function" ); -pub fn array_replace_v2(args: &[ArrayRef]) -> Result { - let list_arr = as_list_array(&args[0])?; - let from_arr = as_int64_array(&args[1])?; - let to_arr = &args[2]; - - let row_number = list_arr.len(); - for i in 0..row_number { - let arr = list_arr.value(i); - match arr.data_type() { - DataType::Int64 => { - let from = from_arr.value(i); - let arr = as_int64_array(&arr)?; - let to_arr = as_int64_array(&to_arr)?; - - let to = to_arr.value(i); - arr.to_data() +fn general_replace(args: &[ArrayRef], n: u32) -> Result { + let list_array = as_list_array(&args[0])?; + let from_array = &args[1]; + let to_array = &args[2]; + + let mut offsets: Vec = vec![0]; + // TODO: change to array value type + let mut values = new_empty_array(from_array.data_type()); + + for (row_index, arr) in list_array.iter().enumerate() { + let last_offset: i32 = offsets.last().copied().ok_or_else(|| { + DataFusionError::Internal(format!("offsets should not be empty")) + })?; + match arr { + Some(arr) => { + let indices = UInt32Array::from(vec![row_index as u32]); + let p = arrow::compute::take(from_array, &indices, None).unwrap(); + let from_a = Scalar::new(p); + println!("arr: {:?}, from_a: {:?}", arr, from_a); + let eq_array = arrow_ord::cmp::eq(&arr, &from_a).unwrap(); + + let arrays = vec![arr, to_array.clone()]; + let arrays_data = arrays + .iter() + .map(|a| a.to_data()) + .collect::>(); + let arrays_data = arrays_data.iter().collect::>(); + + let arrays = arrays + .iter() + .map(|arr| arr.as_ref()) + .collect::>(); + let capacity = Capacities::Array(arrays.iter().map(|a| a.len()).sum()); + + let mut mutable = + MutableArrayData::with_capacities(arrays_data, false, capacity); + + let mut counter = 0; + for (i, itm) in eq_array.iter().enumerate() { + if let Some(v) = itm { + if v { + mutable.extend(1, row_index, row_index + 1); + counter += 1; + if counter == n { + // extend the rest of the array + mutable.extend(0, i + 1, eq_array.len()); + break; + } + } else { + mutable.extend(0, i, i + 1); + } + } else { + } + } + + let data = mutable.freeze(); + let replaced_array = arrow_array::make_array(data); + println!("replaced_array: {:?}", replaced_array); + let v = arrow::compute::concat(&[&values, &replaced_array])?; + values = v; + offsets.push(last_offset + replaced_array.len() as i32); + } + None => { + offsets.push(last_offset); } - _ => not_impl_err!("array_replace_v2 only support int64 array") } } + let field = Arc::new(Field::new("item", from_array.data_type().clone(), true)); - let arr = Arc::new(list_arr.to_owned()) as ArrayRef; - Ok(arr) + let res = Arc::new(ListArray::try_new( + field, + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + )?); + Ok(res) +} + +pub fn array_replace_v2(args: &[ArrayRef]) -> Result { + // let list_arr = as_list_array(&args[0])?; + // let from_arr = as_int64_array(&args[1])?; + // let to_arr = &args[2]; + + // let row_number = list_arr.len(); + // for i in 0..row_number { + // let arr = list_arr.value(i); + // match arr.data_type() { + // DataType::Int64 => { + // let from = from_arr.value(i); + // let arr = as_int64_array(&arr)?; + // let to_arr = as_int64_array(&to_arr)?; + + // let to = to_arr.value(i); + // println!("arr: {:?}, from: {}, to: {}", arr, from, to); + // } + // _ => return not_impl_err!("array_replace_v2 only support int64 array"), + // } + // } + + // let arr = Arc::new(list_arr.to_owned()) as ArrayRef; + // Ok(arr) + + let arr = as_list_array(&args[0])?; + let from = &args[1]; + let to = &args[2]; + let max = Int64Array::from_value(1, args[0].len()); + + let res = match arr.value_type() { + DataType::List(field) => { + macro_rules! array_function { + ($ARRAY_TYPE:ident) => { + general_replace_list!(arr, from, to, max, $ARRAY_TYPE) + }; + } + call_array_function!(field.data_type(), true) + } + data_type => { + return general_replace(args, 1); + } + }; + + Ok(res) } macro_rules! to_string { @@ -1948,8 +2049,48 @@ pub fn string_to_array(args: &[ArrayRef]) -> Result>(); + let capacity = Capacities::Array(arrays.iter().map(|a| a.len()).sum()); + + let array_data = vec![a_data.clone(), d_data.clone()]; + let array_data = array_data.iter().collect::>(); + + let mut mutable = MutableArrayData::with_capacities(array_data, false, capacity); + + for (i, itm) in c.iter().enumerate() { + if let Some(v) = itm { + if v { + mutable.extend(1, 0, 1); + } else { + mutable.extend(0, i, i + 1); + } + } else { + } + } + + let data = mutable.freeze(); + let res = arrow_array::make_array(data); + println!("res: {:?}", res); + } + #[test] fn test_array() { // make_array(1, 2, 3) = [1, 2, 3] diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index d49cf37fdf27..fe5164678f23 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -513,9 +513,9 @@ pub fn create_physical_fun( BuiltinScalarFunction::ArrayRemoveAll => Arc::new(|args| { make_scalar_function(array_expressions::array_remove_all)(args) }), - BuiltinScalarFunction::ArrayReplace => { - Arc::new(|args| make_scalar_function(array_expressions::array_replace_v2)(args)) - } + BuiltinScalarFunction::ArrayReplace => Arc::new(|args| { + make_scalar_function(array_expressions::array_replace_v2)(args) + }), BuiltinScalarFunction::ArrayReplaceN => Arc::new(|args| { make_scalar_function(array_expressions::array_replace_n)(args) }), From d6be6171a89ccd086038d8e4472ec908bcbfa614 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 4 Nov 2023 13:59:38 +0800 Subject: [PATCH 3/6] replace list ver Signed-off-by: jayzhan211 --- .../physical-expr/src/array_expressions.rs | 137 +++++++++++++++++- 1 file changed, 131 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 29d62af68f6e..16d3d7a94657 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -1518,6 +1518,103 @@ fn general_replace(args: &[ArrayRef], n: u32) -> Result { Ok(res) } +fn general_list_replace(args: &[ArrayRef], n: u32) -> Result { + let list_array = as_list_array(&args[0])?; + let from_array = &args[1]; + let to_array = &args[2]; + + let mut offsets: Vec = vec![0]; + // TODO: change to array value type + let mut values = new_empty_array(from_array.data_type()); + + for (row_index, arr) in list_array.iter().enumerate() { + let last_offset: i32 = offsets.last().copied().ok_or_else(|| { + DataFusionError::Internal(format!("offsets should not be empty")) + })?; + match arr { + Some(arr) => { + let indices = UInt32Array::from(vec![row_index as u32]); + let from_a = arrow::compute::take(from_array, &indices, None).unwrap(); + let from_a = as_list_array(&from_a)?.value(0); + + let list_arr = as_list_array(&arr)?; + + let mut bool_values = vec![]; + for a in list_arr.iter() { + if let Some(a) = a { + if a.eq(&from_a) { + bool_values.push(Some(true)); + } else { + bool_values.push(Some(false)); + } + } + } + let eq_array = BooleanArray::from(bool_values); + + // println!("arr: {:?}, from_a: {:?}", arr, from_a); + // let eq_array = arrow_ord::cmp::eq(&arr, &from_a).unwrap(); + println!("eq_array: {:?}", eq_array); + + let arrays = vec![arr, to_array.clone()]; + let arrays_data = arrays + .iter() + .map(|a| a.to_data()) + .collect::>(); + let arrays_data = arrays_data.iter().collect::>(); + + let arrays = arrays + .iter() + .map(|arr| arr.as_ref()) + .collect::>(); + let capacity = Capacities::Array(arrays.iter().map(|a| a.len()).sum()); + + let mut mutable = + MutableArrayData::with_capacities(arrays_data, false, capacity); + + let mut counter = 0; + + for (i, itm) in eq_array.iter().enumerate() { + if let Some(v) = itm { + if v { + mutable.extend(1, row_index, row_index + 1); + counter += 1; + if counter == n { + // extend the rest of the array + mutable.extend(0, i + 1, eq_array.len()); + break; + } + } else { + mutable.extend(0, i, i + 1); + } + } else { + } + } + + let data = mutable.freeze(); + let replaced_array = arrow_array::make_array(data); + println!("replaced_array: {:?}", replaced_array); + + let v = arrow::compute::concat(&[&values, &replaced_array])?; + values = v; + offsets.push(last_offset + replaced_array.len() as i32); + } + None => { + offsets.push(last_offset); + } + } + } + + let field = Arc::new(Field::new("item", from_array.data_type().clone(), true)); + + let res = Arc::new(ListArray::try_new( + field, + OffsetBuffer::new(offsets.into()), + Arc::new(values), + None, + )?); + Ok(res) +} + pub fn array_replace_v2(args: &[ArrayRef]) -> Result { // let list_arr = as_list_array(&args[0])?; // let from_arr = as_int64_array(&args[1])?; @@ -1549,12 +1646,13 @@ pub fn array_replace_v2(args: &[ArrayRef]) -> Result { let res = match arr.value_type() { DataType::List(field) => { - macro_rules! array_function { - ($ARRAY_TYPE:ident) => { - general_replace_list!(arr, from, to, max, $ARRAY_TYPE) - }; - } - call_array_function!(field.data_type(), true) + // macro_rules! array_function { + // ($ARRAY_TYPE:ident) => { + // general_replace_list!(arr, from, to, max, $ARRAY_TYPE) + // }; + // } + // call_array_function!(field.data_type(), true) + return general_list_replace(args, 1); } data_type => { return general_replace(args, 1); @@ -2053,6 +2151,33 @@ mod tests { use arrow_ord; use datafusion_common::cast::as_uint64_array; + #[test] + fn test_eq2() { + let arr_a = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(3)]), + Some(vec![Some(4), Some(5), Some(6)]), + ]); + let arr_b = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), Some(3)]), + ]); + // let b = arr_b.value(0); + let b = Arc::new(arr_b) as ArrayRef; + for a in arr_a.iter() { + println!("a: {:?}, b: {:?}", a, b.clone()); + if let Some(a) = a { + if a.eq(&b) { + println!("equal"); + } else { + println!("not equal"); + } + } else { + println!("none"); + } + } + // let a_f = Arc::new(a) as ArrayRef; + + } + #[test] fn test_eq() { let a = Arc::new(Int64Array::from(vec![Some(1), Some(2), Some(3)])) as ArrayRef; From bae5c5d75f6be5af82c9bbbfb75e5681fea87d6a Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 4 Nov 2023 14:57:04 +0800 Subject: [PATCH 4/6] cleanup Signed-off-by: jayzhan211 --- .../physical-expr/src/array_expressions.rs | 505 +++--------------- datafusion/physical-expr/src/functions.rs | 4 +- 2 files changed, 63 insertions(+), 446 deletions(-) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 16d3d7a94657..1b8a27c2b109 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -24,16 +24,15 @@ use arrow::array::*; use arrow::buffer::OffsetBuffer; use arrow::compute; use arrow::datatypes::{DataType, Field, UInt64Type}; -use arrow_array::types::Int64Type; use arrow_buffer::NullBuffer; use datafusion_common::cast::{ - as_generic_string_array, as_int32_array, as_int64_array, as_list_array, - as_string_array, + as_generic_string_array, as_int64_array, as_list_array, as_string_array, }; use datafusion_common::utils::wrap_into_list_array; use datafusion_common::{ - exec_err, internal_err, not_impl_err, plan_err, DataFusionError, Result, + exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, + DataFusionError, Result, }; use itertools::Itertools; @@ -1227,334 +1226,56 @@ array_removement_function!( "Array_remove_all SQL function" ); -macro_rules! general_replace { - ($ARRAY:expr, $FROM:expr, $TO:expr, $MAX:expr, $ARRAY_TYPE:ident) => {{ - let mut offsets: Vec = vec![0]; - let mut values = - downcast_arg!(new_empty_array($FROM.data_type()), $ARRAY_TYPE).clone(); - - let from_array = downcast_arg!($FROM, $ARRAY_TYPE); - let to_array = downcast_arg!($TO, $ARRAY_TYPE); - for (((arr, from), to), max) in $ARRAY - .iter() - .zip(from_array.iter()) - .zip(to_array.iter()) - .zip($MAX.iter()) - { - let last_offset: i32 = offsets.last().copied().ok_or_else(|| { - DataFusionError::Internal(format!("offsets should not be empty")) - })?; - match arr { - Some(arr) => { - let child_array = downcast_arg!(arr, $ARRAY_TYPE); - let mut counter = 0; - let max = if max < Some(1) { 1 } else { max.unwrap() }; - - let replaced_array = child_array - .iter() - .map(|el| { - if counter != max && el == from { - counter += 1; - to - } else { - el - } - }) - .collect::<$ARRAY_TYPE>(); - - values = downcast_arg!( - compute::concat(&[&values, &replaced_array])?.clone(), - $ARRAY_TYPE - ) - .clone(); - offsets.push(last_offset + replaced_array.len() as i32); - } - None => { - offsets.push(last_offset); - } - } - } - - let field = Arc::new(Field::new("item", $FROM.data_type().clone(), true)); - - Arc::new(ListArray::try_new( - field, - OffsetBuffer::new(offsets.into()), - Arc::new(values), - None, - )?) - }}; -} - -macro_rules! general_replace_list { - ($ARRAY:expr, $FROM:expr, $TO:expr, $MAX:expr, $ARRAY_TYPE:ident) => {{ - let mut offsets: Vec = vec![0]; - let mut values = - downcast_arg!(new_empty_array($FROM.data_type()), ListArray).clone(); - - let from_array = downcast_arg!($FROM, ListArray); - let to_array = downcast_arg!($TO, ListArray); - for (((arr, from), to), max) in $ARRAY - .iter() - .zip(from_array.iter()) - .zip(to_array.iter()) - .zip($MAX.iter()) - { - let last_offset: i32 = offsets.last().copied().ok_or_else(|| { - DataFusionError::Internal(format!("offsets should not be empty")) - })?; - match arr { - Some(arr) => { - let child_array = downcast_arg!(arr, ListArray); - let mut counter = 0; - let max = if max < Some(1) { 1 } else { max.unwrap() }; - - let replaced_vec = child_array - .iter() - .map(|el| { - if counter != max && el == from { - counter += 1; - to.clone().unwrap() - } else { - el.clone().unwrap() - } - }) - .collect::>(); - - let mut i: i32 = 0; - let mut replaced_offsets = vec![i]; - replaced_offsets.extend( - replaced_vec - .clone() - .into_iter() - .map(|a| { - i += a.len() as i32; - i - }) - .collect::>(), - ); - - let mut replaced_values = downcast_arg!( - new_empty_array(&from_array.value_type()), - $ARRAY_TYPE - ) - .clone(); - for replaced_list in replaced_vec { - replaced_values = downcast_arg!( - compute::concat(&[&replaced_values, &replaced_list])?, - $ARRAY_TYPE - ) - .clone(); - } - - let field = Arc::new(Field::new( - "item", - from_array.value_type().clone(), - true, - )); - let replaced_array = ListArray::try_new( - field, - OffsetBuffer::new(replaced_offsets.clone().into()), - Arc::new(replaced_values), - None, - )?; - - values = downcast_arg!( - compute::concat(&[&values, &replaced_array,])?.clone(), - ListArray - ) - .clone(); - offsets.push(last_offset + replaced_array.len() as i32); - } - None => { - offsets.push(last_offset); - } - } - } - - let field = Arc::new(Field::new("item", $FROM.data_type().clone(), true)); - - Arc::new(ListArray::try_new( - field, - OffsetBuffer::new(offsets.into()), - Arc::new(values), - None, - )?) - }}; -} - -macro_rules! array_replacement_function { - ($FUNC:ident, $MAX_FUNC:expr, $DOC:expr) => { - #[doc = $DOC] - pub fn $FUNC(args: &[ArrayRef]) -> Result { - let arr = as_list_array(&args[0])?; - let from = &args[1]; - let to = &args[2]; - let max = $MAX_FUNC(args)?; - - check_datatypes(stringify!($FUNC), &[arr.values(), from, to])?; - let res = match arr.value_type() { - DataType::List(field) => { - macro_rules! array_function { - ($ARRAY_TYPE:ident) => { - general_replace_list!(arr, from, to, max, $ARRAY_TYPE) - }; - } - call_array_function!(field.data_type(), true) - } - data_type => { - macro_rules! array_function { - ($ARRAY_TYPE:ident) => { - general_replace!(arr, from, to, max, $ARRAY_TYPE) - }; - } - call_array_function!(data_type, false) - } - }; - - Ok(res) - } - }; -} - -fn replace_one(args: &[ArrayRef]) -> Result { - Ok(Int64Array::from_value(1, args[0].len())) -} - -fn replace_n(args: &[ArrayRef]) -> Result { - as_int64_array(&args[3]).cloned() -} - -fn replace_all(args: &[ArrayRef]) -> Result { - Ok(Int64Array::from_value(i64::MAX, args[0].len())) -} - -// array replacement functions -array_replacement_function!(array_replace, replace_one, "Array_replace SQL function"); -array_replacement_function!(array_replace_n, replace_n, "Array_replace_n SQL function"); -array_replacement_function!( - array_replace_all, - replace_all, - "Array_replace_all SQL function" -); - -fn general_replace(args: &[ArrayRef], n: u32) -> Result { +fn general_replace(args: &[ArrayRef], arr_n: Vec) -> Result { let list_array = as_list_array(&args[0])?; let from_array = &args[1]; let to_array = &args[2]; let mut offsets: Vec = vec![0]; - // TODO: change to array value type - let mut values = new_empty_array(from_array.data_type()); - - for (row_index, arr) in list_array.iter().enumerate() { - let last_offset: i32 = offsets.last().copied().ok_or_else(|| { - DataFusionError::Internal(format!("offsets should not be empty")) - })?; + let data_type = list_array.value_type(); + let mut values = new_empty_array(&data_type); + + for (row_index, (arr, n)) in list_array.iter().zip(arr_n.iter()).enumerate() { + let last_offset: i32 = offsets + .last() + .copied() + .ok_or_else(|| internal_datafusion_err!("offsets should not be empty"))?; match arr { Some(arr) => { let indices = UInt32Array::from(vec![row_index as u32]); - let p = arrow::compute::take(from_array, &indices, None).unwrap(); - let from_a = Scalar::new(p); - println!("arr: {:?}, from_a: {:?}", arr, from_a); - let eq_array = arrow_ord::cmp::eq(&arr, &from_a).unwrap(); + let from_a = arrow::compute::take(from_array, &indices, None)?; - let arrays = vec![arr, to_array.clone()]; - let arrays_data = arrays - .iter() - .map(|a| a.to_data()) - .collect::>(); - let arrays_data = arrays_data.iter().collect::>(); + let eq_array = match from_a.data_type() { + // Not found arrow_ord::cmp_eq for List, compare it by loop + DataType::List(_) => { + let from_a = as_list_array(&from_a)?.value(0); - let arrays = arrays - .iter() - .map(|arr| arr.as_ref()) - .collect::>(); - let capacity = Capacities::Array(arrays.iter().map(|a| a.len()).sum()); + let list_arr = as_list_array(&arr)?; - let mut mutable = - MutableArrayData::with_capacities(arrays_data, false, capacity); - - let mut counter = 0; - for (i, itm) in eq_array.iter().enumerate() { - if let Some(v) = itm { - if v { - mutable.extend(1, row_index, row_index + 1); - counter += 1; - if counter == n { - // extend the rest of the array - mutable.extend(0, i + 1, eq_array.len()); - break; + let mut bool_values = vec![]; + for a in list_arr.iter() { + if let Some(a) = a { + if a.eq(&from_a) { + bool_values.push(Some(true)); + } else { + bool_values.push(Some(false)); + } + } else { + return internal_err!( + "Null value is not supported in array_replace" + ); } - } else { - mutable.extend(0, i, i + 1); } - } else { + BooleanArray::from(bool_values) } - } - - let data = mutable.freeze(); - let replaced_array = arrow_array::make_array(data); - println!("replaced_array: {:?}", replaced_array); - - let v = arrow::compute::concat(&[&values, &replaced_array])?; - values = v; - offsets.push(last_offset + replaced_array.len() as i32); - } - None => { - offsets.push(last_offset); - } - } - } - - let field = Arc::new(Field::new("item", from_array.data_type().clone(), true)); - - let res = Arc::new(ListArray::try_new( - field, - OffsetBuffer::new(offsets.into()), - Arc::new(values), - None, - )?); - Ok(res) -} - -fn general_list_replace(args: &[ArrayRef], n: u32) -> Result { - let list_array = as_list_array(&args[0])?; - let from_array = &args[1]; - let to_array = &args[2]; - - let mut offsets: Vec = vec![0]; - // TODO: change to array value type - let mut values = new_empty_array(from_array.data_type()); - - for (row_index, arr) in list_array.iter().enumerate() { - let last_offset: i32 = offsets.last().copied().ok_or_else(|| { - DataFusionError::Internal(format!("offsets should not be empty")) - })?; - match arr { - Some(arr) => { - let indices = UInt32Array::from(vec![row_index as u32]); - let from_a = arrow::compute::take(from_array, &indices, None).unwrap(); - let from_a = as_list_array(&from_a)?.value(0); - - let list_arr = as_list_array(&arr)?; - - let mut bool_values = vec![]; - for a in list_arr.iter() { - if let Some(a) = a { - if a.eq(&from_a) { - bool_values.push(Some(true)); - } else { - bool_values.push(Some(false)); - } + _ => { + let from_a = Scalar::new(from_a); + arrow_ord::cmp::eq(&arr, &from_a)? } - } - let eq_array = BooleanArray::from(bool_values); - - // println!("arr: {:?}, from_a: {:?}", arr, from_a); - // let eq_array = arrow_ord::cmp::eq(&arr, &from_a).unwrap(); - println!("eq_array: {:?}", eq_array); + }; + // Use MutableArrayData to build the replaced array + // First array is the original array, second array is the element to replace with. let arrays = vec![arr, to_array.clone()]; let arrays_data = arrays .iter() @@ -1572,13 +1293,12 @@ fn general_list_replace(args: &[ArrayRef], n: u32) -> Result { MutableArrayData::with_capacities(arrays_data, false, capacity); let mut counter = 0; - - for (i, itm) in eq_array.iter().enumerate() { - if let Some(v) = itm { - if v { + for (i, to_replace) in eq_array.iter().enumerate() { + if let Some(to_replace) = to_replace { + if to_replace { mutable.extend(1, row_index, row_index + 1); counter += 1; - if counter == n { + if counter == *n { // extend the rest of the array mutable.extend(0, i + 1, eq_array.len()); break; @@ -1587,12 +1307,12 @@ fn general_list_replace(args: &[ArrayRef], n: u32) -> Result { mutable.extend(0, i, i + 1); } } else { + return internal_err!("eq_array should not contain None"); } } let data = mutable.freeze(); let replaced_array = arrow_array::make_array(data); - println!("replaced_array: {:?}", replaced_array); let v = arrow::compute::concat(&[&values, &replaced_array])?; values = v; @@ -1604,62 +1324,26 @@ fn general_list_replace(args: &[ArrayRef], n: u32) -> Result { } } - let field = Arc::new(Field::new("item", from_array.data_type().clone(), true)); - - let res = Arc::new(ListArray::try_new( - field, + Ok(Arc::new(ListArray::try_new( + Arc::new(Field::new("item", data_type, true)), OffsetBuffer::new(offsets.into()), - Arc::new(values), + values, None, - )?); - Ok(res) + )?)) } pub fn array_replace_v2(args: &[ArrayRef]) -> Result { - // let list_arr = as_list_array(&args[0])?; - // let from_arr = as_int64_array(&args[1])?; - // let to_arr = &args[2]; - - // let row_number = list_arr.len(); - // for i in 0..row_number { - // let arr = list_arr.value(i); - // match arr.data_type() { - // DataType::Int64 => { - // let from = from_arr.value(i); - // let arr = as_int64_array(&arr)?; - // let to_arr = as_int64_array(&to_arr)?; - - // let to = to_arr.value(i); - // println!("arr: {:?}, from: {}, to: {}", arr, from, to); - // } - // _ => return not_impl_err!("array_replace_v2 only support int64 array"), - // } - // } - - // let arr = Arc::new(list_arr.to_owned()) as ArrayRef; - // Ok(arr) - - let arr = as_list_array(&args[0])?; - let from = &args[1]; - let to = &args[2]; - let max = Int64Array::from_value(1, args[0].len()); + general_replace(args, vec![1; args[0].len()]) +} - let res = match arr.value_type() { - DataType::List(field) => { - // macro_rules! array_function { - // ($ARRAY_TYPE:ident) => { - // general_replace_list!(arr, from, to, max, $ARRAY_TYPE) - // }; - // } - // call_array_function!(field.data_type(), true) - return general_list_replace(args, 1); - } - data_type => { - return general_replace(args, 1); - } - }; +pub fn array_replace_n_v2(args: &[ArrayRef]) -> Result { + let arr = as_int64_array(&args[3])?; + let arr_n = arr.values().to_vec(); + general_replace(args, arr_n) +} - Ok(res) +pub fn array_replace_all_v2(args: &[ArrayRef]) -> Result { + general_replace(args, vec![i64::MAX; args[0].len()]) } macro_rules! to_string { @@ -2147,75 +1831,8 @@ pub fn string_to_array(args: &[ArrayRef]) -> Result(vec![ - Some(vec![Some(1), Some(2), Some(3)]), - Some(vec![Some(4), Some(5), Some(6)]), - ]); - let arr_b = ListArray::from_iter_primitive::(vec![ - Some(vec![Some(1), Some(2), Some(3)]), - ]); - // let b = arr_b.value(0); - let b = Arc::new(arr_b) as ArrayRef; - for a in arr_a.iter() { - println!("a: {:?}, b: {:?}", a, b.clone()); - if let Some(a) = a { - if a.eq(&b) { - println!("equal"); - } else { - println!("not equal"); - } - } else { - println!("none"); - } - } - // let a_f = Arc::new(a) as ArrayRef; - - } - - #[test] - fn test_eq() { - let a = Arc::new(Int64Array::from(vec![Some(1), Some(2), Some(3)])) as ArrayRef; - - // let b = Int64Array::new_scalar(2); - let b = Arc::new(Int64Array::from(vec![Some(2)])) as ArrayRef; - let b = Scalar::new(b); - let c = arrow_ord::cmp::eq(&a, &b).unwrap(); - let d = Arc::new(Int64Array::from(vec![Some(4)])) as ArrayRef; - - let arrays = vec![a.clone(), d.clone()]; - let a_data = a.into_data(); - let d_data = d.into_data(); - - let arrays = arrays.iter().map(|x| x.as_ref()).collect::>(); - let capacity = Capacities::Array(arrays.iter().map(|a| a.len()).sum()); - - let array_data = vec![a_data.clone(), d_data.clone()]; - let array_data = array_data.iter().collect::>(); - - let mut mutable = MutableArrayData::with_capacities(array_data, false, capacity); - - for (i, itm) in c.iter().enumerate() { - if let Some(v) = itm { - if v { - mutable.extend(1, 0, 1); - } else { - mutable.extend(0, i, i + 1); - } - } else { - } - } - - let data = mutable.freeze(); - let res = arrow_array::make_array(data); - println!("res: {:?}", res); - } - #[test] fn test_array() { // make_array(1, 2, 3) = [1, 2, 3] @@ -3093,7 +2710,7 @@ mod tests { fn test_array_replace() { // array_replace([3, 1, 2, 3, 2, 3], 3, 4) = [4, 1, 2, 3, 2, 3] let list_array = return_array_with_repeating_elements(); - let array = array_replace(&[ + let array = array_replace_v2(&[ list_array, Arc::new(Int64Array::from_value(3, 1)), Arc::new(Int64Array::from_value(4, 1)), @@ -3124,7 +2741,7 @@ mod tests { let list_array = return_nested_array_with_repeating_elements(); let from_array = return_array(); let to_array = return_extra_array(); - let array = array_replace(&[list_array, from_array, to_array]) + let array = array_replace_v2(&[list_array, from_array, to_array]) .expect("failed to initialize function array_replace"); let result = as_list_array(&array).expect("failed to initialize function array_replace"); @@ -3153,7 +2770,7 @@ mod tests { fn test_array_replace_n() { // array_replace_n([3, 1, 2, 3, 2, 3], 3, 4, 2) = [4, 1, 2, 4, 2, 3] let list_array = return_array_with_repeating_elements(); - let array = array_replace_n(&[ + let array = array_replace_n_v2(&[ list_array, Arc::new(Int64Array::from_value(3, 1)), Arc::new(Int64Array::from_value(4, 1)), @@ -3186,7 +2803,7 @@ mod tests { let list_array = return_nested_array_with_repeating_elements(); let from_array = return_array(); let to_array = return_extra_array(); - let array = array_replace_n(&[ + let array = array_replace_n_v2(&[ list_array, from_array, to_array, @@ -3220,7 +2837,7 @@ mod tests { fn test_array_replace_all() { // array_replace_all([3, 1, 2, 3, 2, 3], 3, 4) = [4, 1, 2, 4, 2, 4] let list_array = return_array_with_repeating_elements(); - let array = array_replace_all(&[ + let array = array_replace_all_v2(&[ list_array, Arc::new(Int64Array::from_value(3, 1)), Arc::new(Int64Array::from_value(4, 1)), @@ -3251,7 +2868,7 @@ mod tests { let list_array = return_nested_array_with_repeating_elements(); let from_array = return_array(); let to_array = return_extra_array(); - let array = array_replace_all(&[list_array, from_array, to_array]) + let array = array_replace_all_v2(&[list_array, from_array, to_array]) .expect("failed to initialize function array_replace_all"); let result = as_list_array(&array) .expect("failed to initialize function array_replace_all"); diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index fe5164678f23..c21ce14279be 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -517,10 +517,10 @@ pub fn create_physical_fun( make_scalar_function(array_expressions::array_replace_v2)(args) }), BuiltinScalarFunction::ArrayReplaceN => Arc::new(|args| { - make_scalar_function(array_expressions::array_replace_n)(args) + make_scalar_function(array_expressions::array_replace_n_v2)(args) }), BuiltinScalarFunction::ArrayReplaceAll => Arc::new(|args| { - make_scalar_function(array_expressions::array_replace_all)(args) + make_scalar_function(array_expressions::array_replace_all_v2)(args) }), BuiltinScalarFunction::ArraySlice => { Arc::new(|args| make_scalar_function(array_expressions::array_slice)(args)) From 0a5241b8d8fb5cb0e31cf0c77ae7368f251a0d86 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 4 Nov 2023 15:01:25 +0800 Subject: [PATCH 5/6] rename Signed-off-by: jayzhan211 --- .../physical-expr/src/array_expressions.rs | 18 +++++++++--------- datafusion/physical-expr/src/functions.rs | 10 +++++----- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 1b8a27c2b109..84d41caf947c 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -1332,17 +1332,17 @@ fn general_replace(args: &[ArrayRef], arr_n: Vec) -> Result { )?)) } -pub fn array_replace_v2(args: &[ArrayRef]) -> Result { +pub fn array_replace(args: &[ArrayRef]) -> Result { general_replace(args, vec![1; args[0].len()]) } -pub fn array_replace_n_v2(args: &[ArrayRef]) -> Result { +pub fn array_replace_n(args: &[ArrayRef]) -> Result { let arr = as_int64_array(&args[3])?; let arr_n = arr.values().to_vec(); general_replace(args, arr_n) } -pub fn array_replace_all_v2(args: &[ArrayRef]) -> Result { +pub fn array_replace_all(args: &[ArrayRef]) -> Result { general_replace(args, vec![i64::MAX; args[0].len()]) } @@ -2710,7 +2710,7 @@ mod tests { fn test_array_replace() { // array_replace([3, 1, 2, 3, 2, 3], 3, 4) = [4, 1, 2, 3, 2, 3] let list_array = return_array_with_repeating_elements(); - let array = array_replace_v2(&[ + let array = array_replace(&[ list_array, Arc::new(Int64Array::from_value(3, 1)), Arc::new(Int64Array::from_value(4, 1)), @@ -2741,7 +2741,7 @@ mod tests { let list_array = return_nested_array_with_repeating_elements(); let from_array = return_array(); let to_array = return_extra_array(); - let array = array_replace_v2(&[list_array, from_array, to_array]) + let array = array_replace(&[list_array, from_array, to_array]) .expect("failed to initialize function array_replace"); let result = as_list_array(&array).expect("failed to initialize function array_replace"); @@ -2770,7 +2770,7 @@ mod tests { fn test_array_replace_n() { // array_replace_n([3, 1, 2, 3, 2, 3], 3, 4, 2) = [4, 1, 2, 4, 2, 3] let list_array = return_array_with_repeating_elements(); - let array = array_replace_n_v2(&[ + let array = array_replace_n(&[ list_array, Arc::new(Int64Array::from_value(3, 1)), Arc::new(Int64Array::from_value(4, 1)), @@ -2803,7 +2803,7 @@ mod tests { let list_array = return_nested_array_with_repeating_elements(); let from_array = return_array(); let to_array = return_extra_array(); - let array = array_replace_n_v2(&[ + let array = array_replace_n(&[ list_array, from_array, to_array, @@ -2837,7 +2837,7 @@ mod tests { fn test_array_replace_all() { // array_replace_all([3, 1, 2, 3, 2, 3], 3, 4) = [4, 1, 2, 4, 2, 4] let list_array = return_array_with_repeating_elements(); - let array = array_replace_all_v2(&[ + let array = array_replace_all(&[ list_array, Arc::new(Int64Array::from_value(3, 1)), Arc::new(Int64Array::from_value(4, 1)), @@ -2868,7 +2868,7 @@ mod tests { let list_array = return_nested_array_with_repeating_elements(); let from_array = return_array(); let to_array = return_extra_array(); - let array = array_replace_all_v2(&[list_array, from_array, to_array]) + let array = array_replace_all(&[list_array, from_array, to_array]) .expect("failed to initialize function array_replace_all"); let result = as_list_array(&array) .expect("failed to initialize function array_replace_all"); diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index c21ce14279be..b66bac41014d 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -513,14 +513,14 @@ pub fn create_physical_fun( BuiltinScalarFunction::ArrayRemoveAll => Arc::new(|args| { make_scalar_function(array_expressions::array_remove_all)(args) }), - BuiltinScalarFunction::ArrayReplace => Arc::new(|args| { - make_scalar_function(array_expressions::array_replace_v2)(args) - }), + BuiltinScalarFunction::ArrayReplace => { + Arc::new(|args| make_scalar_function(array_expressions::array_replace)(args)) + } BuiltinScalarFunction::ArrayReplaceN => Arc::new(|args| { - make_scalar_function(array_expressions::array_replace_n_v2)(args) + make_scalar_function(array_expressions::array_replace_n)(args) }), BuiltinScalarFunction::ArrayReplaceAll => Arc::new(|args| { - make_scalar_function(array_expressions::array_replace_all_v2)(args) + make_scalar_function(array_expressions::array_replace_all)(args) }), BuiltinScalarFunction::ArraySlice => { Arc::new(|args| make_scalar_function(array_expressions::array_slice)(args)) From b320e0f07fb6c6a5e40351d3a7604ad9588f1439 Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Sat, 4 Nov 2023 15:21:34 +0800 Subject: [PATCH 6/6] cleanup Signed-off-by: jayzhan211 --- Cargo.toml | 2 +- datafusion/physical-expr/Cargo.toml | 2 +- .../physical-expr/src/array_expressions.rs | 23 ++++++++----------- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 85acd7dc1a06..05a865839966 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,8 +53,8 @@ arrow = { version = "48.0.0", features = ["prettyprint"] } arrow-array = { version = "48.0.0", default-features = false, features = ["chrono-tz"] } arrow-buffer = { version = "48.0.0", default-features = false } arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] } -arrow-schema = { version = "48.0.0", default-features = false } arrow-ord = { version = "48.0.0", default-features = false } +arrow-schema = { version = "48.0.0", default-features = false } parquet = { version = "48.0.0", features = ["arrow", "async", "object_store"] } sqlparser = { version = "0.39.0", features = ["visitor"] } chrono = { version = "0.4.31", default-features = false } diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 6f2536e1f11b..371a0aa50adf 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -44,8 +44,8 @@ ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] arrow = { workspace = true } arrow-array = { workspace = true } arrow-buffer = { workspace = true } -arrow-schema = { workspace = true } arrow-ord = { workspace = true } +arrow-schema = { workspace = true } base64 = { version = "0.21", optional = true } blake2 = { version = "^0.10.2", optional = true } blake3 = { version = "1.0", optional = true } diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 84d41caf947c..d51eb45e059b 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -1243,23 +1243,18 @@ fn general_replace(args: &[ArrayRef], arr_n: Vec) -> Result { match arr { Some(arr) => { let indices = UInt32Array::from(vec![row_index as u32]); - let from_a = arrow::compute::take(from_array, &indices, None)?; + let from_arr = arrow::compute::take(from_array, &indices, None)?; - let eq_array = match from_a.data_type() { - // Not found arrow_ord::cmp_eq for List, compare it by loop + let eq_array = match from_arr.data_type() { + // arrow_ord::cmp_eq does not support ListArray, so we need to compare it by loop DataType::List(_) => { - let from_a = as_list_array(&from_a)?.value(0); - + let from_a = as_list_array(&from_arr)?.value(0); let list_arr = as_list_array(&arr)?; let mut bool_values = vec![]; - for a in list_arr.iter() { - if let Some(a) = a { - if a.eq(&from_a) { - bool_values.push(Some(true)); - } else { - bool_values.push(Some(false)); - } + for arr in list_arr.iter() { + if let Some(a) = arr { + bool_values.push(Some(a.eq(&from_a))); } else { return internal_err!( "Null value is not supported in array_replace" @@ -1269,8 +1264,8 @@ fn general_replace(args: &[ArrayRef], arr_n: Vec) -> Result { BooleanArray::from(bool_values) } _ => { - let from_a = Scalar::new(from_a); - arrow_ord::cmp::eq(&arr, &from_a)? + let from_arr = Scalar::new(from_arr); + arrow_ord::cmp::eq(&arr, &from_arr)? } };