Skip to content

Commit

Permalink
Fix offsets for Binary/Lists/LargeLists (#727)
Browse files Browse the repository at this point in the history
  • Loading branch information
gsilvestrin authored Mar 24, 2023
1 parent 94c4fd6 commit 9200ed6
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 15 deletions.
11 changes: 10 additions & 1 deletion rust/src/encodings/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ impl<'a> BinaryEncoder<'a> {
};
self.writer.write_all(b).await?;

let start_offset = offsets[0].as_usize();
offsets
.iter()
.skip(1)
.map(|b| b.as_usize() + last_offset)
.map(|b| b.as_usize() - start_offset + last_offset)
.for_each(|o| pos_builder.append_value(o as i64));
last_offset = pos_builder.values_slice()[pos_builder.len() - 1 as usize] as usize;
}
Expand Down Expand Up @@ -390,6 +391,7 @@ mod tests {
use super::*;
use arrow_select::concat::concat;

use arrow_array::cast::as_string_array;
use arrow_array::{
new_empty_array, types::GenericStringType, GenericStringArray, LargeStringArray,
OffsetSizeTrait, StringArray,
Expand Down Expand Up @@ -464,6 +466,13 @@ mod tests {
.await;
}

#[tokio::test]
async fn test_write_binary_data_with_offset() {
let slice = StringArray::from(vec![Some("d"), Some("e")]).slice(1, 1);
let array = as_string_array(slice.as_ref());
test_round_trips(&[array]).await;
}

#[tokio::test]
async fn test_range_query() {
let data = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
Expand Down
55 changes: 53 additions & 2 deletions rust/src/io/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,8 +577,8 @@ mod tests {
builder::{Int32Builder, ListBuilder, StringBuilder},
cast::{as_primitive_array, as_string_array, as_struct_array},
types::UInt8Type,
Array, DictionaryArray, Float32Array, Int64Array, NullArray, RecordBatchReader,
StringArray, StructArray, UInt32Array, UInt8Array,
Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, NullArray,
RecordBatchReader, StringArray, StructArray, UInt32Array, UInt8Array,
};
use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
use futures::StreamExt;
Expand Down Expand Up @@ -1131,4 +1131,55 @@ mod tests {
&expected_large_list
);
}

#[tokio::test]
async fn test_list_array_with_offsets() {
let arrow_schema = ArrowSchema::new(vec![
ArrowField::new(
"l",
DataType::List(Box::new(ArrowField::new("item", DataType::Int32, true))),
false,
),
ArrowField::new(
"ll",
DataType::LargeList(Box::new(ArrowField::new("item", DataType::Int32, true))),
false,
),
]);

let store = ObjectStore::memory();
let path = Path::from("/lists");

let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(1), Some(2)]),
Some(vec![Some(3), Some(4)]),
Some((0..2_000).map(|n| Some(n)).collect::<Vec<_>>()),
])
.slice(1, 1);
let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(10), Some(11)]),
Some(vec![Some(12), Some(13)]),
Some((0..2_000).map(|n| Some(n)).collect::<Vec<_>>()),
])
.slice(1, 1);

let batch = RecordBatch::try_new(
Arc::new(arrow_schema.clone()),
vec![Arc::new(list_array), Arc::new(large_list_array)],
)
.unwrap();

let schema: Schema = (&arrow_schema).try_into().unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
file_writer.write(&[&batch.clone()]).await.unwrap();
file_writer.finish().await.unwrap();

// Make sure the big array was not written to the file
let file_size_bytes = store.size(&path).await.unwrap();
assert!(file_size_bytes < 1_000);

let reader = FileReader::try_new(&store, &path).await.unwrap();
let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
assert_eq!(batch, actual_batch);
}
}
37 changes: 25 additions & 12 deletions rust/src/io/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,58 +273,71 @@ impl<'a> FileWriter<'a> {

async fn write_list_array(&mut self, field: &Field, arrs: &[&dyn Array]) -> Result<()> {
let capacity: usize = arrs.iter().map(|a| a.len()).sum();
let mut list_arrs: Vec<&ArrayRef> = Vec::new();
let mut list_arrs: Vec<ArrayRef> = Vec::new();
let mut pos_builder: PrimitiveBuilder<Int32Type> =
PrimitiveBuilder::with_capacity(capacity);

let mut last_offset: usize = 0;
pos_builder.append_value(last_offset as i32);
for array in arrs.iter() {
let list_arr = as_list_array(*array);
list_arrs.push(list_arr.values());

let offsets = list_arr.value_offsets();

assert!(!offsets.is_empty());
let start_offset = offsets[0].as_usize();
let end_offset = offsets[offsets.len() - 1].as_usize();

let list_values = list_arr.values();
let sliced_values = list_values.slice(start_offset, end_offset - start_offset);
list_arrs.push(sliced_values);

offsets
.iter()
.skip(1)
.map(|b| b.as_usize() + last_offset)
.map(|b| b.as_usize() - start_offset + last_offset)
.for_each(|o| pos_builder.append_value(o as i32));
last_offset = pos_builder.values_slice()[pos_builder.len() - 1 as usize] as usize;
}

let positions: &dyn Array = &pos_builder.finish();
self.write_fixed_stride_array(field, &[positions]).await?;
self.write_array(&field.children[0], list_arrs.as_slice())
.await
let arrs = list_arrs.iter().collect::<Vec<_>>();
self.write_array(&field.children[0], arrs.as_slice()).await
}

async fn write_large_list_array(&mut self, field: &Field, arrs: &[&dyn Array]) -> Result<()> {
let capacity: usize = arrs.iter().map(|a| a.len()).sum();
let mut list_arrs: Vec<&ArrayRef> = Vec::new();
let mut list_arrs: Vec<ArrayRef> = Vec::new();
let mut pos_builder: PrimitiveBuilder<Int64Type> =
PrimitiveBuilder::with_capacity(capacity);

let mut last_offset: usize = 0;
pos_builder.append_value(last_offset as i64);
for array in arrs.iter() {
let list_arr = as_large_list_array(*array);
list_arrs.push(list_arr.values());

let offsets = list_arr.value_offsets();

assert!(!offsets.is_empty());
let start_offset = offsets[0].as_usize();
let end_offset = offsets[offsets.len() - 1].as_usize();

let sliced_values = list_arr
.values()
.slice(start_offset, end_offset - start_offset);
list_arrs.push(sliced_values);

offsets
.iter()
.skip(1)
.map(|b| b.as_usize() + last_offset)
.map(|b| b.as_usize() - start_offset + last_offset)
.for_each(|o| pos_builder.append_value(o as i64));
last_offset = pos_builder.values_slice()[pos_builder.len() - 1 as usize] as usize;
}

let positions: &dyn Array = &pos_builder.finish();
self.write_fixed_stride_array(field, &[positions]).await?;
self.write_array(&field.children[0], list_arrs.as_slice())
.await
let arrs = list_arrs.iter().collect::<Vec<_>>();
self.write_array(&field.children[0], arrs.as_slice()).await
}

async fn write_footer(&mut self) -> Result<()> {
Expand Down

0 comments on commit 9200ed6

Please sign in to comment.