Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix offsets for Binary/Lists/LargeLists #727

Merged
merged 7 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion rust/src/encodings/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ impl<'a> BinaryEncoder<'a> {
};
self.writer.write_all(b).await?;

let start_offset = offsets[0].as_usize();
offsets
.iter()
.skip(1)
.map(|b| b.as_usize() + last_offset)
.map(|b| b.as_usize() - start_offset + last_offset)
.for_each(|o| pos_builder.append_value(o as i64));
last_offset = pos_builder.values_slice()[pos_builder.len() - 1 as usize] as usize;
}
Expand Down Expand Up @@ -390,6 +391,7 @@ mod tests {
use super::*;
use arrow_select::concat::concat;

use arrow_array::cast::as_string_array;
use arrow_array::{
new_empty_array, types::GenericStringType, GenericStringArray, LargeStringArray,
OffsetSizeTrait, StringArray,
Expand Down Expand Up @@ -464,6 +466,13 @@ mod tests {
.await;
}

#[tokio::test]
async fn test_write_binary_data_with_offset() {
let slice = StringArray::from(vec![Some("d"), Some("e")]).slice(1, 1);
let array = as_string_array(slice.as_ref());
test_round_trips(&[array]).await;
}

#[tokio::test]
async fn test_range_query() {
let data = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]);
Expand Down
55 changes: 53 additions & 2 deletions rust/src/io/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,8 +577,8 @@ mod tests {
builder::{Int32Builder, ListBuilder, StringBuilder},
cast::{as_primitive_array, as_string_array, as_struct_array},
types::UInt8Type,
Array, DictionaryArray, Float32Array, Int64Array, NullArray, RecordBatchReader,
StringArray, StructArray, UInt32Array, UInt8Array,
Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, NullArray,
RecordBatchReader, StringArray, StructArray, UInt32Array, UInt8Array,
};
use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
use futures::StreamExt;
Expand Down Expand Up @@ -1131,4 +1131,55 @@ mod tests {
&expected_large_list
);
}

#[tokio::test]
async fn test_list_array_with_offsets() {
let arrow_schema = ArrowSchema::new(vec![
ArrowField::new(
"l",
DataType::List(Box::new(ArrowField::new("item", DataType::Int32, true))),
false,
),
ArrowField::new(
"ll",
DataType::LargeList(Box::new(ArrowField::new("item", DataType::Int32, true))),
false,
),
]);

let store = ObjectStore::memory();
let path = Path::from("/lists");

let list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(1), Some(2)]),
Some(vec![Some(3), Some(4)]),
Some((0..1_000_000).map(|n| Some(n)).collect::<Vec<_>>()),
])
.slice(1, 1);
let large_list_array = LargeListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(10), Some(11)]),
Some(vec![Some(12), Some(13)]),
Some((0..1_000_000).map(|n| Some(n)).collect::<Vec<_>>()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test to actually write multiple arrays, wanna just make sure the cummulation of offsets is correct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly, but test_read_struct_of_list_arrays creates multiple structs that contains lists and write_struct_array should group them together.

])
.slice(1, 1);

let batch = RecordBatch::try_new(
Arc::new(arrow_schema.clone()),
vec![Arc::new(list_array), Arc::new(large_list_array)],
)
.unwrap();

let schema: Schema = (&arrow_schema).try_into().unwrap();
let mut file_writer = FileWriter::try_new(&store, &path, &schema).await.unwrap();
file_writer.write(&[&batch.clone()]).await.unwrap();
file_writer.finish().await.unwrap();

// Make sure the big array was not written to the file
let file_size_bytes = store.size(&path).await.unwrap();
assert!(file_size_bytes < 1_000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we just need to make 2000 values instead of 1M?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just writing an array that is so big that it will be obvious if the assert doesn't match (I think the actual file size is ~ 250 bytes)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, i mean 2000 elements is good enough for test. and it is faster in CI


let reader = FileReader::try_new(&store, &path).await.unwrap();
let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap();
assert_eq!(batch, actual_batch);
}
}
37 changes: 25 additions & 12 deletions rust/src/io/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,58 +273,71 @@ impl<'a> FileWriter<'a> {

async fn write_list_array(&mut self, field: &Field, arrs: &[&dyn Array]) -> Result<()> {
let capacity: usize = arrs.iter().map(|a| a.len()).sum();
let mut list_arrs: Vec<&ArrayRef> = Vec::new();
let mut list_arrs: Vec<ArrayRef> = Vec::new();
let mut pos_builder: PrimitiveBuilder<Int32Type> =
PrimitiveBuilder::with_capacity(capacity);

let mut last_offset: usize = 0;
pos_builder.append_value(last_offset as i32);
for array in arrs.iter() {
let list_arr = as_list_array(*array);
list_arrs.push(list_arr.values());

let offsets = list_arr.value_offsets();

assert!(!offsets.is_empty());
let start_offset = offsets[0].as_usize();
let end_offset = offsets[offsets.len() - 1].as_usize();

let list_values = list_arr.values();
let sliced_values = list_values.slice(start_offset, end_offset - start_offset);
list_arrs.push(sliced_values);

offsets
.iter()
.skip(1)
.map(|b| b.as_usize() + last_offset)
.map(|b| b.as_usize() - start_offset + last_offset)
.for_each(|o| pos_builder.append_value(o as i32));
last_offset = pos_builder.values_slice()[pos_builder.len() - 1 as usize] as usize;
}

let positions: &dyn Array = &pos_builder.finish();
self.write_fixed_stride_array(field, &[positions]).await?;
self.write_array(&field.children[0], list_arrs.as_slice())
.await
let arrs = list_arrs.iter().collect::<Vec<_>>();
self.write_array(&field.children[0], arrs.as_slice()).await
}

async fn write_large_list_array(&mut self, field: &Field, arrs: &[&dyn Array]) -> Result<()> {
let capacity: usize = arrs.iter().map(|a| a.len()).sum();
let mut list_arrs: Vec<&ArrayRef> = Vec::new();
let mut list_arrs: Vec<ArrayRef> = Vec::new();
let mut pos_builder: PrimitiveBuilder<Int64Type> =
PrimitiveBuilder::with_capacity(capacity);

let mut last_offset: usize = 0;
pos_builder.append_value(last_offset as i64);
for array in arrs.iter() {
let list_arr = as_large_list_array(*array);
list_arrs.push(list_arr.values());

let offsets = list_arr.value_offsets();

assert!(!offsets.is_empty());
let start_offset = offsets[0].as_usize();
let end_offset = offsets[offsets.len() - 1].as_usize();

let sliced_values = list_arr
.values()
.slice(start_offset, end_offset - start_offset);
list_arrs.push(sliced_values);

offsets
.iter()
.skip(1)
.map(|b| b.as_usize() + last_offset)
.map(|b| b.as_usize() - start_offset + last_offset)
.for_each(|o| pos_builder.append_value(o as i64));
last_offset = pos_builder.values_slice()[pos_builder.len() - 1 as usize] as usize;
}

let positions: &dyn Array = &pos_builder.finish();
self.write_fixed_stride_array(field, &[positions]).await?;
self.write_array(&field.children[0], list_arrs.as_slice())
.await
let arrs = list_arrs.iter().collect::<Vec<_>>();
self.write_array(&field.children[0], arrs.as_slice()).await
}

async fn write_footer(&mut self) -> Result<()> {
Expand Down