From 2f8fc094eb38c263fa7d746c52e56691aee01531 Mon Sep 17 00:00:00 2001 From: gsilvestrin Date: Fri, 17 Mar 2023 19:04:17 -0700 Subject: [PATCH 1/2] Bugfix for BinaryEncoder positions --- rust/src/encodings/binary.rs | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/rust/src/encodings/binary.rs b/rust/src/encodings/binary.rs index d7280a9113..31f727242f 100644 --- a/rust/src/encodings/binary.rs +++ b/rust/src/encodings/binary.rs @@ -20,7 +20,7 @@ use std::ops::{Range, RangeFrom, RangeFull, RangeTo}; use std::sync::Arc; use arrow_arith::arithmetic::subtract_scalar; -use arrow_array::builder::{ArrayBuilder, PrimitiveBuilder}; +use arrow_array::builder::PrimitiveBuilder; use arrow_array::{ cast::as_primitive_array, new_empty_array, @@ -61,7 +61,7 @@ impl<'a> BinaryEncoder<'a> { PrimitiveBuilder::with_capacity(capacity); let mut value_offset: usize = self.writer.tell(); - for array in arrs { + for (a_idx, array) in arrs.iter().enumerate() { let arr = array .as_any() .downcast_ref::>() @@ -81,11 +81,19 @@ impl<'a> BinaryEncoder<'a> { let start_offset = offsets[0]; // Did not use `add_scalar(positions, value_offset)`, so we can save a memory copy. - offsets.iter().for_each(|o| { + offsets.iter().enumerate().for_each(|(o_idx, o)| { let new_offset = ((*o - start_offset).as_usize() + value_offset) as i64; - pos_builder.append_value(new_offset); + + // Each GenericByteArray of len X has a corresponding offset array of len X + 1 + // Since we are merging multiple arrays together, we only need to copy the "extra" + // element of the last array + if (offsets.len() != o_idx + 1) || (arrs.len() == a_idx + 1) { + pos_builder.append_value(new_offset); + } else { + // The next offset array should start where this one ends + value_offset = new_offset as usize; + } }); - value_offset = (pos_builder.values_slice()[pos_builder.len() - 1] + 1) as usize; } for buf in buffers { @@ -447,7 +455,12 @@ mod tests { async fn test_write_binary_data() { test_round_trips(&[&StringArray::from(vec!["a", "b", "cd", "efg"])]).await; test_round_trips(&[&StringArray::from(vec![Some("a"), None, Some("cd"), None])]).await; - + test_round_trips(&[ + &StringArray::from(vec![Some("a"), None, Some("cd"), None]), + &StringArray::from(vec![Some("f"), None, Some("gh"), None]), + &StringArray::from(vec![Some("t"), None, Some("uv"), None]), + ]) + .await; test_round_trips(&[&LargeStringArray::from(vec!["a", "b", "cd", "efg"])]).await; test_round_trips(&[&LargeStringArray::from(vec![ Some("a"), From 46ef95e0045da23cd3011960d51c7e1125ad9171 Mon Sep 17 00:00:00 2001 From: gsilvestrin Date: Fri, 17 Mar 2023 22:34:42 -0700 Subject: [PATCH 2/2] Bugfix for list / large list encoding. --- rust/src/io/reader.rs | 52 +++++++++++++++++++++++++--------- rust/src/io/writer.rs | 66 ++++++++++++++++++++++++++++--------------- 2 files changed, 81 insertions(+), 37 deletions(-) diff --git a/rust/src/io/reader.rs b/rust/src/io/reader.rs index 9697ad6e66..3c6e0c6f8f 100644 --- a/rust/src/io/reader.rs +++ b/rust/src/io/reader.rs @@ -796,17 +796,25 @@ mod tests { let store = ObjectStore::memory(); let path = Path::from("/null_strings"); - let (arrow_schema, struct_array) = make_struct_of_list_array(10, 10); + let arrow_schema = make_schema_of_list_array(); let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); - let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap(); + + let batches = (0..3) + .map(|_| { + let struct_array = make_struct_of_list_array(10, 10); + RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap() + }) + .collect::>(); + let batches_ref = batches.iter().collect::>(); let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); - file_writer.write(&[&batch]).await.unwrap(); + file_writer.write(batches_ref.as_slice()).await.unwrap(); file_writer.finish().await.unwrap(); let reader = FileReader::try_new(&store, &path).await.unwrap(); let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap(); - assert_eq!(batch, actual_batch); + let expected = concat_batches(&arrow_schema, batches_ref).unwrap(); + assert_eq!(expected, actual_batch); } #[tokio::test] @@ -814,7 +822,8 @@ mod tests { let store = ObjectStore::memory(); let path = Path::from("/null_strings"); - let (arrow_schema, struct_array) = make_struct_of_list_array(3, 10); + let arrow_schema = make_schema_of_list_array(); + let struct_array = make_struct_of_list_array(3, 10); let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap(); @@ -848,11 +857,8 @@ mod tests { assert_eq!(expected_batch, slice_of_batch); } - fn make_struct_of_list_array( - rows: i32, - num_items: i32, - ) -> (Arc, Arc) { - let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + fn make_schema_of_list_array() -> Arc { + Arc::new(ArrowSchema::new(vec![ArrowField::new( "s", DataType::Struct(vec![ ArrowField::new( @@ -865,23 +871,34 @@ mod tests { DataType::List(Box::new(ArrowField::new("item", DataType::Utf8, true))), true, ), + ArrowField::new( + "ll", + DataType::LargeList(Box::new(ArrowField::new("item", DataType::Int32, true))), + false, + ), ]), true, - )])); + )])) + } + fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc { let mut li_builder = ListBuilder::new(Int32Builder::new()); let mut ls_builder = ListBuilder::new(StringBuilder::new()); + let ll_value_builder = Int32Builder::new(); + let mut large_list_builder = LargeListBuilder::new(ll_value_builder); for i in 0..rows { for j in 0..num_items { li_builder.values().append_value(i * 10 + j); ls_builder .values() .append_value(format!("str-{}", i * 10 + j)); + large_list_builder.values().append_value(i * 10 + j); } li_builder.append(true); ls_builder.append(true); + large_list_builder.append(true); } - let struct_array = Arc::new(StructArray::from(vec![ + Arc::new(StructArray::from(vec![ ( ArrowField::new( "li", @@ -898,8 +915,15 @@ mod tests { ), Arc::new(ls_builder.finish()) as ArrayRef, ), - ])); - (arrow_schema, struct_array) + ( + ArrowField::new( + "ll", + DataType::LargeList(Box::new(ArrowField::new("item", DataType::Int32, true))), + false, + ), + Arc::new(large_list_builder.finish()) as ArrayRef, + ), + ])) } #[tokio::test] diff --git a/rust/src/io/writer.rs b/rust/src/io/writer.rs index 94d34f12f5..5588183cb1 100644 --- a/rust/src/io/writer.rs +++ b/rust/src/io/writer.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Deref; use std::sync::Arc; -use arrow_arith::arithmetic::subtract_scalar; +use arrow_array::builder::PrimitiveBuilder; use arrow_array::cast::{as_large_list_array, as_list_array, as_struct_array}; -use arrow_array::{Array, ArrayRef, Int32Array, Int64Array, RecordBatch, StructArray}; +use arrow_array::types::{Int32Type, Int64Type}; +use arrow_array::{Array, ArrayRef, RecordBatch, StructArray}; use arrow_schema::DataType; use async_recursion::async_recursion; use object_store::path::Path; @@ -271,43 +271,63 @@ impl<'a> FileWriter<'a> { } async fn write_list_array(&mut self, field: &Field, arrs: &[&dyn Array]) -> Result<()> { + let capacity: usize = arrs.iter().map(|a| a.len()).sum(); let mut list_arrs: Vec<&ArrayRef> = Vec::new(); - let mut offsets: Vec = Vec::new(); + let mut pos_builder: PrimitiveBuilder = + PrimitiveBuilder::with_capacity(capacity); - for array in arrs { + let mut initial_offset: usize = 0; + for (a_idx, array) in arrs.iter().enumerate() { let list_arr = as_list_array(*array); - let value_offsets: Int32Array = list_arr.value_offsets().iter().copied().collect(); - assert!(!value_offsets.is_empty()); - let offsets_arr = - Arc::new(subtract_scalar(&value_offsets, value_offsets.value(0))?) as ArrayRef; list_arrs.push(list_arr.values()); - offsets.push(offsets_arr); + + let value_offsets = list_arr.value_offsets(); + assert!(!value_offsets.is_empty()); + let start_offset = value_offsets[0]; + value_offsets.iter().enumerate().for_each(|(o_idx, o)| { + let new_offset = ((*o - start_offset) + initial_offset as i32) as i32; + + if (value_offsets.len() != o_idx + 1) || (arrs.len() == a_idx + 1) { + pos_builder.append_value(new_offset); + } else { + initial_offset = new_offset as usize; + } + }); } - let offsets_ref = offsets.iter().map(|l| l.deref()).collect::>(); - self.write_fixed_stride_array(field, offsets_ref.as_slice()) - .await?; + let positions: &dyn Array = &pos_builder.finish(); + self.write_fixed_stride_array(field, &[positions]).await?; self.write_array(&field.children[0], list_arrs.as_slice()) .await } async fn write_large_list_array(&mut self, field: &Field, arrs: &[&dyn Array]) -> Result<()> { + let capacity: usize = arrs.iter().map(|a| a.len()).sum(); let mut list_arrs: Vec<&ArrayRef> = Vec::new(); - let mut offsets: Vec = Vec::new(); + let mut pos_builder: PrimitiveBuilder = + PrimitiveBuilder::with_capacity(capacity); - for array in arrs { + let mut initial_offset: usize = 0; + for (a_idx, array) in arrs.iter().enumerate() { let list_arr = as_large_list_array(*array); - let value_offsets: Int64Array = list_arr.value_offsets().iter().copied().collect(); - assert!(!value_offsets.is_empty()); - let offsets_arr = - Arc::new(subtract_scalar(&value_offsets, value_offsets.value(0))?) as ArrayRef; list_arrs.push(list_arr.values()); - offsets.push(offsets_arr); + + let value_offsets = list_arr.value_offsets(); + assert!(!value_offsets.is_empty()); + let start_offset = value_offsets[0]; + value_offsets.iter().enumerate().for_each(|(o_idx, o)| { + let new_offset = (*o - start_offset) + initial_offset as i64; + + if (value_offsets.len() != o_idx + 1) || (arrs.len() == a_idx + 1) { + pos_builder.append_value(new_offset); + } else { + initial_offset = new_offset as usize; + } + }); } - let offsets_ref = offsets.iter().map(|l| l.deref()).collect::>(); - self.write_fixed_stride_array(field, offsets_ref.as_slice()) - .await?; + let positions: &dyn Array = &pos_builder.finish(); + self.write_fixed_stride_array(field, &[positions]).await?; self.write_array(&field.children[0], list_arrs.as_slice()) .await }