Skip to content

Commit

Permalink
shuffle around buffers in v2 format to allow for read-on-demand stati…
Browse files Browse the repository at this point in the history
…stics
  • Loading branch information
westonpace committed May 27, 2024
1 parent 68b45c3 commit 4a5b79b
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 93 deletions.
95 changes: 59 additions & 36 deletions protos/file2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ syntax = "proto3";
package lance.file.v2;

import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";

// # Lance v0.2 File Format
//
Expand All @@ -27,13 +28,17 @@ import "google/protobuf/any.proto";
//
// Data buffers can be placed in three different places:
//
// * Global data buffers are placed in the file footer. These should contain
// data that is common across the file (such as the file schema or a common
// dictionary)
// * Column metadata buffers are placed in the column metadata block. These
// should contain data that is common across all pages within the column.
// * Page data buffers are written in the data pages section of the file. It
// * Page data buffers are the first things written to the file. It
// is expected that these will contain the vast majority of the file's data.
// These are written throughout the writing process as column encoders fill
// up and need to flush data.
// * Column metadata buffers are placed after page buffers. These typically
// contain data that is common across all pages within the column. For
// example, column statistics or column dictionaries. These ar written when
// the file is finished.
// * Global data buffers after the column metadata buffers. These should contain
// data that is common across the file (such as the file schema or a common
// dictionary). These are written when the file is finished.
//
// ## File Layout
//
Expand Down Expand Up @@ -63,39 +68,45 @@ import "google/protobuf/any.proto";
// | ... |
// | Page PN, Buffer PN_N* |
// ├──────────────────────────────────┤
// | Column Metadatas |
// | |A| Column 0 Metadata* |
// | Column 0 Meta Buffer 0* |
// | Column Meta Buffers |
// | |A| Column 0, Buffer 0* |
// | ... |
// | Column 0 Meta Buffer C0_N* |
// | Column 0, Buffer C0_N* |
// | Column 1, Buffer 0* |
// | ... |
// | Column CN, Buffer CN_N* |
// ├──────────────────────────────────┤
// | Global Buffers |
// | |B| Global Meta Buffer 0* |
// | ... |
// | Global Meta Buffer GN* |
// ├──────────────────────────────────┤
// | Column Metadatas |
// | |C| Column 0 Metadata* |
// | Column 1 Metadata* |
// | ... |
// | Column CN Meta Buffer CN_N* |
// | Column CN Metadata* |
// ├──────────────────────────────────┤
// | Column Metadata Offset Table |
// | |B| Column 0 Metadata Position* |
// | |D| Column 0 Metadata Position* |
// | Column 0 Metadata Size |
// | ... |
// | Column CN Metadata Position |
// | Column CN Metadata Size |
// ├──────────────────────────────────┤
// | Global Buffers |
// | |C| Global Meta Buffer 0* |
// | ... |
// | Global Meta Buffer GN* |
// ├──────────────────────────────────┤
// | Global Buffers Offset Table |
// | |D| Global Buffer 0 Position* |
// | |E| Global Buffer 0 Position* |
// | Global Buffer 0 Size |
// | ... |
// | Global Buffer GN Position |
// | Global Buffer GN Size |
// ├──────────────────────────────────┤
// | Footer |
// | A u64: Offset to C0 metadata |
// | B u64: Offset to CMO table |
// | C u64: Offset to global buf 0 |
// | D u64: Offset to GBO table |
// | A u64: Offset to col meta buf 0 |
// | B u64: Offset to global buf 0 |
// | C u64: Offset to column meta 0 |
// | D u64: Offset to CMO table |
// | E u64: Offset to GBO table |
// | u32: Number of global bufs |
// | u32: Number of columns |
// | u16: Major version |
Expand All @@ -119,31 +130,39 @@ import "google/protobuf/any.proto";
message DeferredEncoding {
// Location of the buffer containing the encoding.
//
// This buffer will contain a protobuf encoded `DirectEncoding` message.
//
// * If sharing encodings across columns then this will be in a global buffer
// * If sharing encodings across pages within a column this could be in a
// column metadata buffer.
// * This format doesn't prevent this from being a page buffer. However, that
// would mean the reader needs to do an extra IOP before it can even begin
// decoding the page and we don't imagine this being useful.
// * This could also be a page buffer if the encoding is not shared, needs
// to be written before the file ends, and the encoding is too large to load
// unless we first determine the page needs to be read. This combination
// seems unusual.
uint64 buffer_location = 1;
uint64 buffer_length = 2;
}

// Speicifc encodings are not part of the minimal file format. They are stored
// as google.protobuf.Any and should be provided by extensions.
// The encoding is placed directly in the metadata section
message DirectEncoding {
// The encoding
google.protobuf.Any encoding = 1;
// The bytes that make up the encoding embedded directly in the metadata
//
// This is the most common approach.
bytes encoding = 1;
}

// An encoding stores the information needed to decode a column or page
//
// For example, it could describe if the page is using bit packing, and how many bits
// there are in each individual value.
//
// At the column level it can be used to wrap columns with dictionaries or statistics.
message Encoding {
oneof style {
oneof location {
// The encoding is stored elsewhere and not part of this protobuf message
DeferredEncoding deferred = 1;
DeferredEncoding indirect = 1;
// The encoding is stored within this protobuf message
DirectEncoding direct = 2;
// There is no encoding information
google.protobuf.Empty none = 3;
}
}

Expand Down Expand Up @@ -171,17 +190,21 @@ message ColumnMetadata {
// The encoding used to encode the page
Encoding encoding = 4;
}
// Encoding information about the column itself. This typically describes
// how to interpret the column metadata buffers. For example, it could
// describe how statistics or dictionaries are stored in the column metadata.
Encoding encoding = 1;
// The pages in the column
repeated Page pages = 1;
repeated Page pages = 2;
// The file offsets of each of the column metadata buffers
//
// There may be zero buffers.
repeated uint64 buffer_offsets = 2;
repeated uint64 buffer_offsets = 3;
// The size (in bytes) of each of the column metadata buffers
//
// This field will have the same length as `buffer_offsets` and
// may be empty.
repeated uint64 buffer_sizes = 3;
repeated uint64 buffer_sizes = 4;
}

// ## Where is the rest?
Expand Down
99 changes: 56 additions & 43 deletions rust/lance-file/src/v2/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ pub struct CachedFileMetadata {
pub file_buffers: Vec<BufferDescriptor>,
/// The number of bytes contained in the data page section of the file
pub num_data_bytes: u64,
/// The number of bytes contained in the column metadata section of the file
/// The number of bytes contained in column metadata buffers (page statistics, dictionaries, etc.)
pub num_column_metadata_buf_bytes: u64,
/// The number of bytes contained in the column metadata (encoding descriptions, etc.)
pub num_column_metadata_bytes: u64,
/// The number of bytes contained in the global buffer section of the file
pub num_global_buffer_bytes: u64,
Expand Down Expand Up @@ -100,6 +102,8 @@ pub struct FileReader {
}

struct Footer {
#[allow(dead_code)]
column_meta_bufs_start: u64,
column_meta_start: u64,
// We don't use this today because we always load metadata for every column
// and don't yet support "metadata projection"
Expand All @@ -113,7 +117,7 @@ struct Footer {
minor_version: u16,
}

const FOOTER_LEN: usize = 48;
const FOOTER_LEN: usize = 56;

impl FileReader {
pub fn metadata(&self) -> &Arc<CachedFileMetadata> {
Expand Down Expand Up @@ -146,9 +150,10 @@ impl FileReader {
}
let mut cursor = Cursor::new(footer_bytes.slice(len - FOOTER_LEN..));

let column_meta_bufs_start = cursor.read_u64::<LittleEndian>()?;
let global_buff_start = cursor.read_u64::<LittleEndian>()?;
let column_meta_start = cursor.read_u64::<LittleEndian>()?;
let column_meta_offsets_start = cursor.read_u64::<LittleEndian>()?;
let global_buff_start = cursor.read_u64::<LittleEndian>()?;
let global_buff_offsets_start = cursor.read_u64::<LittleEndian>()?;
let num_global_buffers = cursor.read_u32::<LittleEndian>()?;
let num_columns = cursor.read_u32::<LittleEndian>()?;
Expand Down Expand Up @@ -176,6 +181,7 @@ impl FileReader {
));
}
Ok(Footer {
column_meta_bufs_start,
column_meta_start,
column_meta_offsets_start,
global_buff_start,
Expand All @@ -189,16 +195,10 @@ impl FileReader {

// TODO: Once we have coalesced I/O we should only read the column metadatas that we need
async fn read_all_column_metadata(
scheduler: &FileScheduler,
column_metadata_bytes: Bytes,
footer: &Footer,
) -> Result<Vec<pbfile::ColumnMetadata>> {
let column_metadata_start = footer.column_meta_start;
// This range includes both the offsets table and all of the column metadata
// We can't just grab col_meta_start..cmo_table_start because there may be padding
// between the last column and the start of the cmo table.
let column_metadata_range = column_metadata_start..footer.global_buff_start;
let column_metadata_bytes = scheduler.submit_single(column_metadata_range, 0).await?;

// cmo == column_metadata_offsets
let cmo_table_size = 16 * footer.num_columns as usize;
let cmo_table = column_metadata_bytes.slice(column_metadata_bytes.len() - cmo_table_size..);
Expand All @@ -223,14 +223,14 @@ impl FileReader {
scheduler: &FileScheduler,
footer: &Footer,
) -> Result<Bytes> {
let num_bytes_needed = (file_len - footer.column_meta_start) as usize;
let num_bytes_needed = (file_len - footer.global_buff_start) as usize;
if tail_bytes.len() >= num_bytes_needed {
Ok(tail_bytes.slice(tail_bytes.len() - num_bytes_needed..))
} else {
let num_bytes_missing = (num_bytes_needed - tail_bytes.len()) as u64;
let missing_bytes = scheduler
.submit_single(
footer.column_meta_start..footer.column_meta_start + num_bytes_missing,
footer.global_buff_start..footer.global_buff_start + num_bytes_missing,
0,
)
.await;
Expand Down Expand Up @@ -272,9 +272,12 @@ impl FileReader {
let (tail_bytes, file_len) = Self::read_tail(scheduler).await?;
let footer = Self::decode_footer(&tail_bytes)?;

// By default we read all global buffers and all column metadatas. We do NOT read
// the column metadata buffers at this point. We only want to read the column metadata
// for columns we are actually loading.
let all_metadata_bytes =
Self::get_all_meta_bytes(tail_bytes, file_len, scheduler, &footer).await?;
let meta_offset = footer.column_meta_start;
let meta_offset = footer.global_buff_start;

// 2. read any global buffers (just the schema right now)
let global_bufs_table_nbytes = footer.num_global_buffers as usize * 16;
Expand All @@ -290,12 +293,21 @@ impl FileReader {
let (num_rows, schema) = Self::decode_schema(schema_bytes)?;

// Next, read the metadata for the columns
let column_metadatas = Self::read_all_column_metadata(scheduler, &footer).await?;
// This is both the column metadata and the CMO table
let column_metadata_start = (footer.column_meta_start - meta_offset) as usize;
let column_metadata_end = (footer.global_buff_offsets_start - meta_offset) as usize;
let column_metadata_bytes =
all_metadata_bytes.slice(column_metadata_start..column_metadata_end);
let column_metadatas =
Self::read_all_column_metadata(column_metadata_bytes, &footer).await?;

let footer_start = file_len - FOOTER_LEN as u64;
let num_data_bytes = footer.column_meta_start;
let num_column_metadata_bytes = footer.global_buff_start - footer.column_meta_start;
let num_global_buffer_bytes = footer_start - footer.global_buff_start;
let num_data_bytes = footer.column_meta_bufs_start;
let num_column_metadata_buf_bytes =
footer.global_buff_start - footer.column_meta_bufs_start;
let num_global_buffer_bytes = footer.column_meta_start - footer.global_buff_start
+ (footer_start - footer.global_buff_offsets_start);
let num_column_metadata_bytes = footer.global_buff_offsets_start - footer.column_meta_start;

let global_bufs_table_nbytes = footer.num_global_buffers as usize * 16;
let global_bufs_table_start = (footer.global_buff_offsets_start - meta_offset) as usize;
Expand Down Expand Up @@ -323,6 +335,7 @@ impl FileReader {
num_rows,
num_data_bytes,
num_column_metadata_bytes,
num_column_metadata_buf_bytes,
num_global_buffer_bytes,
file_buffers: global_buffers,
major_version: footer.major_version,
Expand Down Expand Up @@ -393,14 +406,14 @@ impl FileReader {
}

fn fetch_encoding(encoding: &pbfile::Encoding) -> pbenc::ArrayEncoding {
match &encoding.style {
Some(pbfile::encoding::Style::Deferred(_)) => todo!(),
Some(pbfile::encoding::Style::Direct(encoding)) => encoding
.encoding
.as_ref()
.unwrap()
.to_msg::<pbenc::ArrayEncoding>()
.unwrap(),
match &encoding.location {
Some(pbfile::encoding::Location::Indirect(_)) => todo!(),
Some(pbfile::encoding::Location::Direct(encoding)) => {
let encoding_buf = Bytes::from(encoding.encoding.clone());
let encoding_any = prost_types::Any::decode(encoding_buf).unwrap();
encoding_any.to_msg::<pbenc::ArrayEncoding>().unwrap()
}
Some(pbfile::encoding::Location::None(_)) => panic!(),
None => panic!(),
}
}
Expand Down Expand Up @@ -782,33 +795,33 @@ impl FileReader {
/// Inspects a page and returns a String describing the page's encoding
pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String {
if let Some(encoding) = &page.encoding {
if let Some(style) = &encoding.style {
if let Some(style) = &encoding.location {
match style {
pbfile::encoding::Style::Deferred(deferred) => {
pbfile::encoding::Location::Indirect(indirect) => {
format!(
"DeferredEncoding(pos={},size={})",
deferred.buffer_location, deferred.buffer_length
"IndirectEncoding(pos={},size={})",
indirect.buffer_location, indirect.buffer_length
)
}
pbfile::encoding::Style::Direct(direct) => {
if let Some(encoding) = &direct.encoding {
if encoding.type_url == "/lance.encodings.ArrayEncoding" {
let encoding = encoding.to_msg::<pbenc::ArrayEncoding>();
match encoding {
Ok(encoding) => {
format!("{:#?}", encoding)
}
Err(err) => {
format!("Unsupported(decode_err={})", err)
}
pbfile::encoding::Location::Direct(direct) => {
let encoding_any =
prost_types::Any::decode(Bytes::from(direct.encoding.clone()))
.expect("failed to deserialize encoding as protobuf");
if encoding_any.type_url == "/lance.encodings.ArrayEncoding" {
let encoding = encoding_any.to_msg::<pbenc::ArrayEncoding>();
match encoding {
Ok(encoding) => {
format!("{:#?}", encoding)
}
Err(err) => {
format!("Unsupported(decode_err={})", err)
}
} else {
format!("Unrecognized(type_url={})", encoding.type_url)
}
} else {
"MISSING DIRECT VALUE".to_string()
format!("Unrecognized(type_url={})", encoding_any.type_url)
}
}
pbfile::encoding::Location::None(_) => "NoEncodingDescription".to_string(),
}
} else {
"MISSING STYLE".to_string()
Expand Down
Loading

0 comments on commit 4a5b79b

Please sign in to comment.