Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add file statistics #3232

Merged
merged 14 commits into from
Dec 13, 2024
Merged
5 changes: 5 additions & 0 deletions python/python/lance/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
LanceBufferDescriptor,
LanceColumnMetadata,
LanceFileMetadata,
LanceFileStatistics,
LancePageMetadata,
)
from .lance import (
Expand Down Expand Up @@ -146,6 +147,9 @@ def metadata(self) -> LanceFileMetadata:
"""
return self._reader.metadata()

def file_statistics(self) -> LanceFileStatistics:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a docstring for this method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

return self._reader.file_statistics()

def read_global_buffer(self, index: int) -> bytes:
"""
Read a global buffer from the file at a given index
Expand Down Expand Up @@ -289,4 +293,5 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"LanceColumnMetadata",
"LancePageMetadata",
"LanceBufferDescriptor",
"LanceFileStatistics",
]
7 changes: 7 additions & 0 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ class LanceFileMetadata:
global_buffers: List[LanceBufferDescriptor]
columns: List[LanceColumnMetadata]

class LanceFileStatistics:
columns: List[LanceColumnStatistics]

class LanceColumnStatistics:
num_pages: int
size_bytes: int

class _Session:
def size_bytes(self) -> int: ...

Expand Down
29 changes: 29 additions & 0 deletions python/python/tests/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import os

import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
Expand Down Expand Up @@ -214,6 +215,34 @@ def test_metadata(tmp_path):
assert len(page.encoding) > 0


def test_file_stat(tmp_path):
path = tmp_path / "foo.lance"
schema = pa.schema(
[pa.field("a", pa.int64()), pa.field("b", pa.list_(pa.float64(), 8))]
)

num_rows = 1_000_000

data1 = pa.array(range(num_rows))

# Create a fixed-size list of float64 with dimension 8
fixed_size_list = [np.random.rand(8).tolist() for _ in range(num_rows)]
data2 = pa.array(fixed_size_list, type=pa.list_(pa.float64(), 8))

with LanceFileWriter(str(path), schema) as writer:
writer.write_batch(pa.table({"a": data1, "b": data2}))
reader = LanceFileReader(str(path))
file_stat = reader.file_statistics()

assert len(file_stat.columns) == 2

assert file_stat.columns[0].num_pages == 1
assert file_stat.columns[0].size_bytes == 8_000_000

assert file_stat.columns[1].num_pages == 2
assert file_stat.columns[1].size_bytes == 64_000_000


def test_round_trip_parquet(tmp_path):
pq_path = tmp_path / "foo.parquet"
table = pa.table({"int": [1, 2], "list_str": [["x", "yz", "abc"], ["foo", "bar"]]})
Expand Down
34 changes: 33 additions & 1 deletion python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use lance_core::cache::FileMetadataCache;
use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
use lance_file::{
v2::{
reader::{BufferDescriptor, CachedFileMetadata, FileReader, FileReaderOptions},
reader::{BufferDescriptor, CachedFileMetadata, FileReader, FileReaderOptions, FileStatistics},
writer::{FileWriter, FileWriterOptions},
},
version::LanceFileVersion,
Expand Down Expand Up @@ -113,6 +113,33 @@ impl LanceColumnMetadata {
}
}

#[pyclass(get_all)]
#[derive(Clone, Debug, Serialize)]
pub struct LanceFileStatistics {
broccoliSpicy marked this conversation as resolved.
Show resolved Hide resolved
columns: Vec<LanceColumnStatistics>,
wjones127 marked this conversation as resolved.
Show resolved Hide resolved
}

#[pyclass(get_all)]
#[derive(Clone, Debug, Serialize)]
pub struct LanceColumnStatistics {
num_pages: usize,
size_bytes: usize,
}

impl LanceFileStatistics {
fn new(inner: &FileStatistics) -> Self {
let columns = inner
.columns
.iter()
.map(|column_stat| LanceColumnStatistics {
num_pages: column_stat.num_pages,
size_bytes: column_stat.size_bytes,
})
.collect();
Self { columns }
}
}

#[pyclass(get_all)]
#[derive(Clone, Debug, Serialize)]
pub struct LanceFileMetadata {
Expand Down Expand Up @@ -445,6 +472,11 @@ impl LanceFileReader {
LanceFileMetadata::new(inner_meta, py)
}

pub fn file_statistics(&self) -> LanceFileStatistics {
let inner_stat = self.inner.file_statistics();
LanceFileStatistics::new(inner_stat)
}

pub fn read_global_buffer(&mut self, index: u32) -> PyResult<Vec<u8>> {
let buffer_bytes = RT
.runtime
Expand Down
3 changes: 2 additions & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use dataset::optimize::{
use dataset::MergeInsertBuilder;
use env_logger::Env;
use file::{
LanceBufferDescriptor, LanceColumnMetadata, LanceFileMetadata, LanceFileReader,
LanceBufferDescriptor, LanceColumnMetadata, LanceFileMetadata, LanceFileReader, LanceFileStatistics,
LanceFileWriter, LancePageMetadata,
};
use futures::StreamExt;
Expand Down Expand Up @@ -120,6 +120,7 @@ fn lance(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<LanceFileReader>()?;
m.add_class::<LanceFileWriter>()?;
m.add_class::<LanceFileMetadata>()?;
m.add_class::<LanceFileStatistics>()?;
m.add_class::<LanceColumnMetadata>()?;
m.add_class::<LancePageMetadata>()?;
m.add_class::<LanceBufferDescriptor>()?;
Expand Down
40 changes: 40 additions & 0 deletions rust/lance-file/src/v2/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ pub struct BufferDescriptor {
pub size: u64,
}

#[derive(Debug)]
pub struct FileStatistics {
pub columns: Vec<ColumnStatistics>,
}

// Currently we report the `num_pages` and `size_bytes` of each column in Lance file.
#[derive(Debug)]
pub struct ColumnStatistics {
pub num_pages: usize,
pub size_bytes: usize,
wjones127 marked this conversation as resolved.
Show resolved Hide resolved
}

// TODO: Caching
#[derive(Debug)]
pub struct CachedFileMetadata {
Expand All @@ -79,6 +91,9 @@ pub struct CachedFileMetadata {
pub num_footer_bytes: u64,
pub major_version: u16,
pub minor_version: u16,

// The file statistics.
pub file_statistics: Arc<FileStatistics>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably calculate these on demand. I think 90% of the time users won't be accessing these and the cost to calculate them is trivial.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to calculate on demand

}

impl DeepSizeOf for CachedFileMetadata {
Expand Down Expand Up @@ -313,6 +328,10 @@ impl FileReader {
&self.metadata
}

pub fn file_statistics(&self) -> &Arc<FileStatistics> {
&self.metadata().file_statistics
}

pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
self.scheduler
Expand Down Expand Up @@ -545,6 +564,26 @@ impl FileReader {

let column_infos = Self::meta_to_col_infos(column_metadatas.as_slice(), file_version);

let column_stats = column_metadatas
.iter()
.map(|col_metadata| {
let num_pages = col_metadata.pages.len();
let size_bytes = col_metadata
.pages
.iter()
.map(|page| page.buffer_sizes.iter().sum::<u64>())
.sum::<u64>();
ColumnStatistics {
num_pages,
size_bytes: size_bytes as usize,
}
})
.collect();

let file_stat = FileStatistics {
columns: column_stats,
};

Ok(CachedFileMetadata {
file_schema: Arc::new(schema),
column_metadatas,
Expand All @@ -557,6 +596,7 @@ impl FileReader {
file_buffers: gbo_table,
major_version: footer.major_version,
minor_version: footer.minor_version,
file_statistics: Arc::new(file_stat),
})
}

Expand Down
Loading