Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add file statistics #3232

Merged
merged 14 commits into from
Dec 13, 2024
Merged
8 changes: 8 additions & 0 deletions python/python/lance/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
LanceBufferDescriptor,
LanceColumnMetadata,
LanceFileMetadata,
LanceFileStatistics,
LancePageMetadata,
)
from .lance import (
Expand Down Expand Up @@ -146,6 +147,12 @@ def metadata(self) -> LanceFileMetadata:
"""
return self._reader.metadata()

def file_statistics(self) -> LanceFileStatistics:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a docstring for this method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

"""
Return file statistics of the file
"""
return self._reader.file_statistics()

def read_global_buffer(self, index: int) -> bytes:
"""
Read a global buffer from the file at a given index
Expand Down Expand Up @@ -289,4 +296,5 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"LanceColumnMetadata",
"LancePageMetadata",
"LanceBufferDescriptor",
"LanceFileStatistics",
]
7 changes: 7 additions & 0 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ class LanceFileMetadata:
global_buffers: List[LanceBufferDescriptor]
columns: List[LanceColumnMetadata]

class LanceFileStatistics:
columns: List[LanceColumnStatistics]

class LanceColumnStatistics:
num_pages: int
size_bytes: int

class _Session:
def size_bytes(self) -> int: ...

Expand Down
29 changes: 29 additions & 0 deletions python/python/tests/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import os

import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
Expand Down Expand Up @@ -214,6 +215,34 @@ def test_metadata(tmp_path):
assert len(page.encoding) > 0


def test_file_stat(tmp_path):
path = tmp_path / "foo.lance"
schema = pa.schema(
[pa.field("a", pa.int64()), pa.field("b", pa.list_(pa.float64(), 8))]
)

num_rows = 1_000_000

data1 = pa.array(range(num_rows))

# Create a fixed-size list of float64 with dimension 8
fixed_size_list = [np.random.rand(8).tolist() for _ in range(num_rows)]
data2 = pa.array(fixed_size_list, type=pa.list_(pa.float64(), 8))

with LanceFileWriter(str(path), schema) as writer:
writer.write_batch(pa.table({"a": data1, "b": data2}))
reader = LanceFileReader(str(path))
file_stat = reader.file_statistics()

assert len(file_stat.columns) == 2

assert file_stat.columns[0].num_pages == 1
assert file_stat.columns[0].size_bytes == 8_000_000

assert file_stat.columns[1].num_pages == 2
assert file_stat.columns[1].size_bytes == 64_000_000


def test_round_trip_parquet(tmp_path):
pq_path = tmp_path / "foo.parquet"
table = pa.table({"int": [1, 2], "list_str": [["x", "yz", "abc"], ["foo", "bar"]]})
Expand Down
123 changes: 122 additions & 1 deletion python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use lance_core::cache::FileMetadataCache;
use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
use lance_file::{
v2::{
reader::{BufferDescriptor, CachedFileMetadata, FileReader, FileReaderOptions},
reader::{
BufferDescriptor, CachedFileMetadata, FileReader, FileReaderOptions, FileStatistics,
},
writer::{FileWriter, FileWriterOptions},
},
version::LanceFileVersion,
Expand Down Expand Up @@ -113,6 +115,58 @@ impl LanceColumnMetadata {
}
}

/// Statistics summarize some of the file metadata for quick summary info
#[pyclass(get_all)]
#[derive(Clone, Debug, Serialize)]
pub struct LanceFileStatistics {
broccoliSpicy marked this conversation as resolved.
Show resolved Hide resolved
/// Statistics about each of the columns in the file
columns: Vec<LanceColumnStatistics>,
}

#[pymethods]
impl LanceFileStatistics {
fn __repr__(&self) -> String {
let column_reprs: Vec<String> = self.columns.iter().map(|col| col.__repr__()).collect();
format!("FileStatistics(columns=[{}])", column_reprs.join(", "))
}
}

/// Summary information describing a column
#[pyclass(get_all)]
#[derive(Clone, Debug, Serialize)]
pub struct LanceColumnStatistics {
/// The number of pages in the column
num_pages: usize,
/// The total number of data & metadata bytes in the column
///
/// This is the compressed on-disk size
size_bytes: u64,
}

#[pymethods]
impl LanceColumnStatistics {
fn __repr__(&self) -> String {
format!(
"ColumnStatistics(num_pages={}, size_bytes={})",
self.num_pages, self.size_bytes
)
}
}

impl LanceFileStatistics {
fn new(inner: &FileStatistics) -> Self {
let columns = inner
.columns
.iter()
.map(|column_stat| LanceColumnStatistics {
num_pages: column_stat.num_pages,
size_bytes: column_stat.size_bytes,
})
.collect();
Self { columns }
}
}

#[pyclass(get_all)]
#[derive(Clone, Debug, Serialize)]
pub struct LanceFileMetadata {
Expand Down Expand Up @@ -445,6 +499,11 @@ impl LanceFileReader {
LanceFileMetadata::new(inner_meta, py)
}

pub fn file_statistics(&self) -> LanceFileStatistics {
let inner_stat = self.inner.file_statistics();
LanceFileStatistics::new(&inner_stat)
}

pub fn read_global_buffer(&mut self, index: u32) -> PyResult<Vec<u8>> {
let buffer_bytes = RT
.runtime
Expand All @@ -453,3 +512,65 @@ impl LanceFileReader {
Ok(buffer_bytes.to_vec())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_lance_file_statistics_repr_empty() {
let stats = LanceFileStatistics { columns: vec![] };

let repr_str = stats.__repr__();
assert_eq!(repr_str, "FileStatistics(columns=[])");
}

#[test]
fn test_lance_file_statistics_repr_single_column() {
let stats = LanceFileStatistics {
columns: vec![LanceColumnStatistics {
num_pages: 5,
size_bytes: 1024,
}],
};

let repr_str = stats.__repr__();
assert_eq!(
repr_str,
"FileStatistics(columns=[ColumnStatistics(num_pages=5, size_bytes=1024)])"
);
}

#[test]
fn test_lance_file_statistics_repr_multiple_columns() {
let stats = LanceFileStatistics {
columns: vec![
LanceColumnStatistics {
num_pages: 5,
size_bytes: 1024,
},
LanceColumnStatistics {
num_pages: 3,
size_bytes: 512,
},
],
};

let repr_str = stats.__repr__();
assert_eq!(
repr_str,
"FileStatistics(columns=[ColumnStatistics(num_pages=5, size_bytes=1024), ColumnStatistics(num_pages=3, size_bytes=512)])"
);
}

#[test]
fn test_lance_column_statistics_repr() {
let column_stats = LanceColumnStatistics {
num_pages: 10,
size_bytes: 2048,
};

let repr_str = column_stats.__repr__();
assert_eq!(repr_str, "ColumnStatistics(num_pages=10, size_bytes=2048)");
}
}
3 changes: 2 additions & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use dataset::MergeInsertBuilder;
use env_logger::Env;
use file::{
LanceBufferDescriptor, LanceColumnMetadata, LanceFileMetadata, LanceFileReader,
LanceFileWriter, LancePageMetadata,
LanceFileStatistics, LanceFileWriter, LancePageMetadata,
};
use futures::StreamExt;
use lance_index::DatasetIndexExt;
Expand Down Expand Up @@ -120,6 +120,7 @@ fn lance(py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<LanceFileReader>()?;
m.add_class::<LanceFileWriter>()?;
m.add_class::<LanceFileMetadata>()?;
m.add_class::<LanceFileStatistics>()?;
m.add_class::<LanceColumnMetadata>()?;
m.add_class::<LancePageMetadata>()?;
m.add_class::<LanceBufferDescriptor>()?;
Expand Down
42 changes: 42 additions & 0 deletions rust/lance-file/src/v2/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@ pub struct BufferDescriptor {
pub size: u64,
}

/// Statistics summarize some of the file metadata for quick summary info
#[derive(Debug)]
pub struct FileStatistics {
/// Statistics about each of the columns in the file
pub columns: Vec<ColumnStatistics>,
}

/// Summary information describing a column
#[derive(Debug)]
pub struct ColumnStatistics {
/// The number of pages in the column
pub num_pages: usize,
/// The total number of data & metadata bytes in the column
///
/// This is the compressed on-disk size
pub size_bytes: u64,
}

// TODO: Caching
#[derive(Debug)]
pub struct CachedFileMetadata {
Expand Down Expand Up @@ -313,6 +331,30 @@ impl FileReader {
&self.metadata
}

pub fn file_statistics(&self) -> FileStatistics {
let column_metadatas = &self.metadata().column_metadatas;

let column_stats = column_metadatas
.iter()
.map(|col_metadata| {
let num_pages = col_metadata.pages.len();
let size_bytes = col_metadata
.pages
.iter()
.map(|page| page.buffer_sizes.iter().sum::<u64>())
.sum::<u64>();
ColumnStatistics {
num_pages,
size_bytes,
}
})
.collect();

FileStatistics {
columns: column_stats,
}
}

pub async fn read_global_buffer(&self, index: u32) -> Result<Bytes> {
let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?;
self.scheduler
Expand Down
Loading