Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add the repetition index to the miniblock write path #3208

Merged
merged 2 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,21 @@ message MiniBlockLayout {
ArrayEncoding dictionary = 4;
// The meaning of each repdef layer, used to interpret repdef buffers correctly
repeated RepDefLayer layers = 5;
// The depth of the repetition index.
//
// If there is repetition then the depth must be at least 1. If there are many layers
// of repetition then deeper repetition indices will support deeper nested random access. For
// example, given 5 layers of repetition then the repetition index depth must be at least
// 3 to support access like rows[50][17][3].
//
// We require `repetition_index_depth + 1` u64 values per mini-block to store the repetition
// index if the `repetition_index_depth` is greater than 0. The +1 is because we need to store
// the number of "leftover items" at the end of the chunk. Otherwise, we wouldn't have any way
// to know if the final item in a chunk is valid or not.
uint32 repetition_index_depth = 6;
// The page already records how many rows are in the page. For mini-block we also need to know how
// many "items" are in the page. A row and an item are the same thing unless the page has lists.
uint64 num_items = 7;
}

/// A layout used for pages where the data is large
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-encoding-datafusion/src/zone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ impl FieldEncoder for ZoneMapsFieldEncoder {
external_buffers: &mut OutOfLineBuffers,
repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<lance_encoding::encoder::EncodeTask>> {
// TODO: If we do the zone map calculation as part of the encoding task then we can
// parallelize statistics gathering. Could be faster too since the encoding task is
Expand All @@ -619,7 +620,7 @@ impl FieldEncoder for ZoneMapsFieldEncoder {
// to improve write speed.
self.update(&array)?;
self.items_encoder
.maybe_encode(array, external_buffers, repdef, row_number)
.maybe_encode(array, external_buffers, repdef, row_number, num_rows)
}

fn flush(
Expand Down
14 changes: 13 additions & 1 deletion rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ use crate::data::DataBlock;
use crate::encoder::{values_column_encoding, EncodedBatch};
use crate::encodings::logical::binary::BinaryFieldScheduler;
use crate::encodings::logical::blob::BlobFieldScheduler;
use crate::encodings::logical::list::{ListFieldScheduler, OffsetPageInfo};
use crate::encodings::logical::list::{
ListFieldScheduler, OffsetPageInfo, StructuralListScheduler,
};
use crate::encodings::logical::primitive::{
PrimitiveFieldScheduler, StructuralPrimitiveFieldScheduler,
};
Expand Down Expand Up @@ -777,6 +779,16 @@ impl CoreFieldDecoderStrategy {
column_infos.next_top_level();
Ok(scheduler)
}
DataType::List(_) | DataType::LargeList(_) => {
let child = field
.children
.first()
.expect("List field must have a child");
let child_scheduler =
self.create_structural_field_scheduler(child, column_infos)?;
Ok(Box::new(StructuralListScheduler::new(child_scheduler))
as Box<dyn StructuralFieldScheduler>)
}
_ => todo!(),
}
}
Expand Down
25 changes: 22 additions & 3 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::buffer::LanceBuffer;
use crate::data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock};
use crate::decoder::PageEncoding;
use crate::encodings::logical::blob::BlobFieldEncoder;
use crate::encodings::logical::list::ListStructuralEncoder;
use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
use crate::encodings::logical::r#struct::StructFieldEncoder;
use crate::encodings::logical::r#struct::StructStructuralEncoder;
Expand Down Expand Up @@ -370,12 +371,21 @@ pub trait FieldEncoder: Send {
/// than a single disk page.
///
/// It could also return an empty Vec if there is not enough data yet to encode any pages.
///
/// The `row_number` must be passed which is the top-level row number currently being encoded
/// This is stored in any pages produced by this call so that we can know the priority of the
/// page.
///
/// The `num_rows` is the number of top level rows. It is initially the same as `array.len()`
/// however it is passed seprately because array will become flattened over time (if there is
/// repetition) and we need to know the original number of rows for various purposes.
fn maybe_encode(
&mut self,
array: ArrayRef,
external_buffers: &mut OutOfLineBuffers,
repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>>;
/// Flush any remaining data from the buffers into encoding tasks
///
Expand Down Expand Up @@ -1204,8 +1214,15 @@ impl FieldEncodingStrategy for StructuralEncodingStrategy {
)?))
} else {
match data_type {
DataType::List(_child) | DataType::LargeList(_child) => {
todo!()
DataType::List(_) | DataType::LargeList(_) => {
let child = field.children.first().expect("List should have a child");
let child_encoder = self.create_field_encoder(
_encoding_strategy_root,
child,
column_index,
options,
)?;
Ok(Box::new(ListStructuralEncoder::new(child_encoder)))
}
DataType::Struct(_) => {
let field_metadata = &field.metadata;
Expand Down Expand Up @@ -1369,7 +1386,9 @@ pub async fn encode_batch(
OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
let repdef = RepDefBuilder::default();
let encoder = encoder.as_mut();
let mut tasks = encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0)?;
let num_rows = arr.len() as u64;
let mut tasks =
encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0, num_rows)?;
tasks.extend(encoder.flush(&mut external_buffers)?);
for buffer in external_buffers.take_buffers() {
data_buffer.extend_from_slice(&buffer);
Expand Down
10 changes: 8 additions & 2 deletions rust/lance-encoding/src/encodings/logical/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,16 @@ impl FieldEncoder for BlobFieldEncoder {
external_buffers: &mut OutOfLineBuffers,
repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>> {
let descriptions = Self::write_bins(array, external_buffers)?;
self.description_encoder
.maybe_encode(descriptions, external_buffers, repdef, row_number)
self.description_encoder.maybe_encode(
descriptions,
external_buffers,
repdef,
row_number,
num_rows,
)
}

// If there is any data left in the buffer then create an encode task from it
Expand Down
Loading
Loading