diff --git a/rust/src/encodings/binary.rs b/rust/src/encodings/binary.rs index 4c6ff15cf0..c68e949bfc 100644 --- a/rust/src/encodings/binary.rs +++ b/rust/src/encodings/binary.rs @@ -79,10 +79,11 @@ impl<'a> BinaryEncoder<'a> { }; self.writer.write_all(b).await?; + let start_offset = offsets[0].as_usize(); offsets .iter() .skip(1) - .map(|b| b.as_usize() + last_offset) + .map(|b| b.as_usize() - start_offset + last_offset) .for_each(|o| pos_builder.append_value(o as i64)); last_offset = pos_builder.values_slice()[pos_builder.len() - 1 as usize] as usize; } @@ -390,6 +391,7 @@ mod tests { use super::*; use arrow_select::concat::concat; + use arrow_array::cast::as_string_array; use arrow_array::{ new_empty_array, types::GenericStringType, GenericStringArray, LargeStringArray, OffsetSizeTrait, StringArray, @@ -464,6 +466,13 @@ mod tests { .await; } + #[tokio::test] + async fn test_write_binary_data_with_offset() { + let slice = StringArray::from(vec![Some("d"), Some("e")]).slice(1, 1); + let array = as_string_array(slice.as_ref()); + test_round_trips(&[array]).await; + } + #[tokio::test] async fn test_range_query() { let data = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]); diff --git a/rust/src/io/reader.rs b/rust/src/io/reader.rs index 3c6e0c6f8f..2bf52604cb 100644 --- a/rust/src/io/reader.rs +++ b/rust/src/io/reader.rs @@ -577,8 +577,8 @@ mod tests { builder::{Int32Builder, ListBuilder, StringBuilder}, cast::{as_primitive_array, as_string_array, as_struct_array}, types::UInt8Type, - Array, DictionaryArray, Float32Array, Int64Array, NullArray, RecordBatchReader, - StringArray, StructArray, UInt32Array, UInt8Array, + Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, NullArray, + RecordBatchReader, StringArray, StructArray, UInt32Array, UInt8Array, }; use arrow_schema::{Field as ArrowField, Schema as ArrowSchema}; use futures::StreamExt; @@ -1131,4 +1131,55 @@ mod tests { &expected_large_list ); } + + #[tokio::test] + async fn test_list_array_with_offsets() { + let arrow_schema = ArrowSchema::new(vec![ + ArrowField::new( + "l", + DataType::List(Box::new(ArrowField::new("item", DataType::Int32, true))), + false, + ), + ArrowField::new( + "ll", + DataType::LargeList(Box::new(ArrowField::new("item", DataType::Int32, true))), + false, + ), + ]); + + let store = ObjectStore::memory(); + let path = Path::from("/lists"); + + let list_array = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2)]), + Some(vec![Some(3), Some(4)]), + Some((0..2_000).map(|n| Some(n)).collect::>()), + ]) + .slice(1, 1); + let large_list_array = LargeListArray::from_iter_primitive::(vec![ + Some(vec![Some(10), Some(11)]), + Some(vec![Some(12), Some(13)]), + Some((0..2_000).map(|n| Some(n)).collect::>()), + ]) + .slice(1, 1); + + let batch = RecordBatch::try_new( + Arc::new(arrow_schema.clone()), + vec![Arc::new(list_array), Arc::new(large_list_array)], + ) + .unwrap(); + + let schema: Schema = (&arrow_schema).try_into().unwrap(); + let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap(); + file_writer.write(&[&batch.clone()]).await.unwrap(); + file_writer.finish().await.unwrap(); + + // Make sure the big array was not written to the file + let file_size_bytes = store.size(&path).await.unwrap(); + assert!(file_size_bytes < 1_000); + + let reader = FileReader::try_new(&store, &path).await.unwrap(); + let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap(); + assert_eq!(batch, actual_batch); + } } diff --git a/rust/src/io/writer.rs b/rust/src/io/writer.rs index 6600caa9f1..d1c26cacbc 100644 --- a/rust/src/io/writer.rs +++ b/rust/src/io/writer.rs @@ -273,7 +273,7 @@ impl<'a> FileWriter<'a> { async fn write_list_array(&mut self, field: &Field, arrs: &[&dyn Array]) -> Result<()> { let capacity: usize = arrs.iter().map(|a| a.len()).sum(); - let mut list_arrs: Vec<&ArrayRef> = Vec::new(); + let mut list_arrs: Vec = Vec::new(); let mut pos_builder: PrimitiveBuilder = PrimitiveBuilder::with_capacity(capacity); @@ -281,27 +281,33 @@ impl<'a> FileWriter<'a> { pos_builder.append_value(last_offset as i32); for array in arrs.iter() { let list_arr = as_list_array(*array); - list_arrs.push(list_arr.values()); - let offsets = list_arr.value_offsets(); + assert!(!offsets.is_empty()); + let start_offset = offsets[0].as_usize(); + let end_offset = offsets[offsets.len() - 1].as_usize(); + + let list_values = list_arr.values(); + let sliced_values = list_values.slice(start_offset, end_offset - start_offset); + list_arrs.push(sliced_values); + offsets .iter() .skip(1) - .map(|b| b.as_usize() + last_offset) + .map(|b| b.as_usize() - start_offset + last_offset) .for_each(|o| pos_builder.append_value(o as i32)); last_offset = pos_builder.values_slice()[pos_builder.len() - 1 as usize] as usize; } let positions: &dyn Array = &pos_builder.finish(); self.write_fixed_stride_array(field, &[positions]).await?; - self.write_array(&field.children[0], list_arrs.as_slice()) - .await + let arrs = list_arrs.iter().collect::>(); + self.write_array(&field.children[0], arrs.as_slice()).await } async fn write_large_list_array(&mut self, field: &Field, arrs: &[&dyn Array]) -> Result<()> { let capacity: usize = arrs.iter().map(|a| a.len()).sum(); - let mut list_arrs: Vec<&ArrayRef> = Vec::new(); + let mut list_arrs: Vec = Vec::new(); let mut pos_builder: PrimitiveBuilder = PrimitiveBuilder::with_capacity(capacity); @@ -309,22 +315,29 @@ impl<'a> FileWriter<'a> { pos_builder.append_value(last_offset as i64); for array in arrs.iter() { let list_arr = as_large_list_array(*array); - list_arrs.push(list_arr.values()); - let offsets = list_arr.value_offsets(); + assert!(!offsets.is_empty()); + let start_offset = offsets[0].as_usize(); + let end_offset = offsets[offsets.len() - 1].as_usize(); + + let sliced_values = list_arr + .values() + .slice(start_offset, end_offset - start_offset); + list_arrs.push(sliced_values); + offsets .iter() .skip(1) - .map(|b| b.as_usize() + last_offset) + .map(|b| b.as_usize() - start_offset + last_offset) .for_each(|o| pos_builder.append_value(o as i64)); last_offset = pos_builder.values_slice()[pos_builder.len() - 1 as usize] as usize; } let positions: &dyn Array = &pos_builder.finish(); self.write_fixed_stride_array(field, &[positions]).await?; - self.write_array(&field.children[0], list_arrs.as_slice()) - .await + let arrs = list_arrs.iter().collect::>(); + self.write_array(&field.children[0], arrs.as_slice()).await } async fn write_footer(&mut self) -> Result<()> {