Skip to content

Commit

Permalink
fix: fix v2 error that can happen when writing list<struct<...>> with…
Browse files Browse the repository at this point in the history
… many empty lists (lancedb#2762)

If the list offsets are bigger than the struct items (easy if there are
many lists) and there are multiple pages of offsets then the list
encoding needs to force a page from the items page. This causes `flush`
to be called multiple times which the struct encoder wasn't expecting.
This would lead the struct header column to have multiple pages which,
while not technically a problem, causes an assertion to fail on read.

We could relax the assertion but it is simpler to just not write that
kind of file in the first place.
  • Loading branch information
westonpace authored Aug 30, 2024
1 parent cf0e6cb commit 99f554f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 23 deletions.
23 changes: 22 additions & 1 deletion rust/lance-encoding/src/encodings/logical/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,7 @@ mod tests {
use arrow::array::StringBuilder;
use arrow_array::{
builder::{Int32Builder, ListBuilder},
ArrayRef, BooleanArray, ListArray,
Array, ArrayRef, BooleanArray, ListArray, StructArray, UInt64Array,
};
use arrow_buffer::{OffsetBuffer, ScalarBuffer};
use arrow_schema::{DataType, Field, Fields};
Expand Down Expand Up @@ -1260,6 +1260,27 @@ mod tests {
check_round_trip_encoding_random(field, HashMap::new()).await;
}

#[test_log::test(tokio::test)]
async fn test_list_struct_empty() {
let fields = Fields::from(vec![Field::new("inner", DataType::UInt64, true)]);
let items = UInt64Array::from(Vec::<u64>::new());
let structs = StructArray::new(fields, vec![Arc::new(items)], None);
let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0; 2 * 1024 * 1024 + 1]));
let lists = ListArray::new(
Arc::new(Field::new("item", structs.data_type().clone(), true)),
offsets,
Arc::new(structs),
None,
);

check_round_trip_encoding_of_data(
vec![Arc::new(lists)],
&TestCases::default(),
HashMap::new(),
)
.await;
}

#[test_log::test(tokio::test)]
async fn test_simple_list() {
let items_builder = Int32Builder::new();
Expand Down
38 changes: 16 additions & 22 deletions rust/lance-encoding/src/encodings/logical/struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,27 +550,7 @@ impl FieldEncoder for StructFieldEncoder {
.iter_mut()
.map(|encoder| encoder.flush())
.collect::<Result<Vec<_>>>()?;
let mut child_tasks = child_tasks.into_iter().flatten().collect::<Vec<_>>();
let num_rows_seen = self.num_rows_seen;
let column_index = self.column_index;
// In this "simple struct / no nulls" case we emit a single header page at
// the very end which covers the entire struct.
child_tasks.push(
std::future::ready(Ok(EncodedPage {
array: EncodedArray {
buffers: vec![],
encoding: pb::ArrayEncoding {
array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
pb::SimpleStruct {},
)),
},
},
num_rows: num_rows_seen,
column_idx: column_index,
}))
.boxed(),
);
Ok(child_tasks)
Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
}

fn num_columns(&self) -> u32 {
Expand All @@ -585,7 +565,21 @@ impl FieldEncoder for StructFieldEncoder {
async move {
let mut columns = Vec::new();
// Add a column for the struct header
columns.push(EncodedColumn::default());
let mut header = EncodedColumn::default();
header.final_pages.push(EncodedPage {
array: EncodedArray {
buffers: vec![],
encoding: pb::ArrayEncoding {
array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
pb::SimpleStruct {},
)),
},
},
num_rows: self.num_rows_seen,
column_idx: self.column_index,
});
columns.push(header);
// Now run finish on the children
for child in self.children.iter_mut() {
columns.extend(child.finish().await?);
}
Expand Down

0 comments on commit 99f554f

Please sign in to comment.