Skip to content

Commit

Permalink
feat: add the repetition index to the miniblock write path (#3208)
Browse files Browse the repository at this point in the history
The repetition index is what will give us random access support when we
have list data. At a high level it stores the number of top-level rows
in each mini-block chunk. We can use this later to figure out which
chunks we need to read.

In reality things are a little more complicated because we don't mandate
that each chunk starts with a brand new row (e.g. a row can span
multiple mini-block chunks). This is useful because we eventually want
to support arbitrarily deep nested access. If we create not-so-mini
blocks in the presence of large lists then we introduce read
amplification we'd like to avoid.
  • Loading branch information
westonpace authored Dec 10, 2024
1 parent 5ff966d commit 10c31b3
Show file tree
Hide file tree
Showing 11 changed files with 484 additions and 59 deletions.
15 changes: 15 additions & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,21 @@ message MiniBlockLayout {
ArrayEncoding dictionary = 4;
// The meaning of each repdef layer, used to interpret repdef buffers correctly
repeated RepDefLayer layers = 5;
// The depth of the repetition index.
//
// If there is repetition then the depth must be at least 1. If there are many layers
// of repetition then deeper repetition indices will support deeper nested random access. For
// example, given 5 layers of repetition then the repetition index depth must be at least
// 3 to support access like rows[50][17][3].
//
// We require `repetition_index_depth + 1` u64 values per mini-block to store the repetition
// index if the `repetition_index_depth` is greater than 0. The +1 is because we need to store
// the number of "leftover items" at the end of the chunk. Otherwise, we wouldn't have any way
// to know if the final item in a chunk is valid or not.
uint32 repetition_index_depth = 6;
// The page already records how many rows are in the page. For mini-block we also need to know how
// many "items" are in the page. A row and an item are the same thing unless the page has lists.
uint64 num_items = 7;
}

/// A layout used for pages where the data is large
Expand Down
3 changes: 2 additions & 1 deletion rust/lance-encoding-datafusion/src/zone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ impl FieldEncoder for ZoneMapsFieldEncoder {
external_buffers: &mut OutOfLineBuffers,
repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<lance_encoding::encoder::EncodeTask>> {
// TODO: If we do the zone map calculation as part of the encoding task then we can
// parallelize statistics gathering. Could be faster too since the encoding task is
Expand All @@ -619,7 +620,7 @@ impl FieldEncoder for ZoneMapsFieldEncoder {
// to improve write speed.
self.update(&array)?;
self.items_encoder
.maybe_encode(array, external_buffers, repdef, row_number)
.maybe_encode(array, external_buffers, repdef, row_number, num_rows)
}

fn flush(
Expand Down
14 changes: 13 additions & 1 deletion rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ use crate::data::DataBlock;
use crate::encoder::{values_column_encoding, EncodedBatch};
use crate::encodings::logical::binary::BinaryFieldScheduler;
use crate::encodings::logical::blob::BlobFieldScheduler;
use crate::encodings::logical::list::{ListFieldScheduler, OffsetPageInfo};
use crate::encodings::logical::list::{
ListFieldScheduler, OffsetPageInfo, StructuralListScheduler,
};
use crate::encodings::logical::primitive::{
PrimitiveFieldScheduler, StructuralPrimitiveFieldScheduler,
};
Expand Down Expand Up @@ -777,6 +779,16 @@ impl CoreFieldDecoderStrategy {
column_infos.next_top_level();
Ok(scheduler)
}
DataType::List(_) | DataType::LargeList(_) => {
let child = field
.children
.first()
.expect("List field must have a child");
let child_scheduler =
self.create_structural_field_scheduler(child, column_infos)?;
Ok(Box::new(StructuralListScheduler::new(child_scheduler))
as Box<dyn StructuralFieldScheduler>)
}
_ => todo!(),
}
}
Expand Down
25 changes: 22 additions & 3 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::buffer::LanceBuffer;
use crate::data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock};
use crate::decoder::PageEncoding;
use crate::encodings::logical::blob::BlobFieldEncoder;
use crate::encodings::logical::list::ListStructuralEncoder;
use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
use crate::encodings::logical::r#struct::StructFieldEncoder;
use crate::encodings::logical::r#struct::StructStructuralEncoder;
Expand Down Expand Up @@ -370,12 +371,21 @@ pub trait FieldEncoder: Send {
/// than a single disk page.
///
/// It could also return an empty Vec if there is not enough data yet to encode any pages.
///
/// The `row_number` must be passed which is the top-level row number currently being encoded
/// This is stored in any pages produced by this call so that we can know the priority of the
/// page.
///
/// The `num_rows` is the number of top level rows. It is initially the same as `array.len()`
/// however it is passed seprately because array will become flattened over time (if there is
/// repetition) and we need to know the original number of rows for various purposes.
fn maybe_encode(
&mut self,
array: ArrayRef,
external_buffers: &mut OutOfLineBuffers,
repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>>;
/// Flush any remaining data from the buffers into encoding tasks
///
Expand Down Expand Up @@ -1204,8 +1214,15 @@ impl FieldEncodingStrategy for StructuralEncodingStrategy {
)?))
} else {
match data_type {
DataType::List(_child) | DataType::LargeList(_child) => {
todo!()
DataType::List(_) | DataType::LargeList(_) => {
let child = field.children.first().expect("List should have a child");
let child_encoder = self.create_field_encoder(
_encoding_strategy_root,
child,
column_index,
options,
)?;
Ok(Box::new(ListStructuralEncoder::new(child_encoder)))
}
DataType::Struct(_) => {
let field_metadata = &field.metadata;
Expand Down Expand Up @@ -1369,7 +1386,9 @@ pub async fn encode_batch(
OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
let repdef = RepDefBuilder::default();
let encoder = encoder.as_mut();
let mut tasks = encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0)?;
let num_rows = arr.len() as u64;
let mut tasks =
encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0, num_rows)?;
tasks.extend(encoder.flush(&mut external_buffers)?);
for buffer in external_buffers.take_buffers() {
data_buffer.extend_from_slice(&buffer);
Expand Down
10 changes: 8 additions & 2 deletions rust/lance-encoding/src/encodings/logical/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,16 @@ impl FieldEncoder for BlobFieldEncoder {
external_buffers: &mut OutOfLineBuffers,
repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>> {
let descriptions = Self::write_bins(array, external_buffers)?;
self.description_encoder
.maybe_encode(descriptions, external_buffers, repdef, row_number)
self.description_encoder.maybe_encode(
descriptions,
external_buffers,
repdef,
row_number,
num_rows,
)
}

// If there is any data left in the buffer then create an encode task from it
Expand Down
Loading

0 comments on commit 10c31b3

Please sign in to comment.