Skip to content

Commit

Permalink
feat: add 2.1 read path (#2968)
Browse files Browse the repository at this point in the history
Unlike the write path we were not able to get away with subtle changes
to the existing traits. Most of the read traits needed to be duplicated.
On the bright side, there is very little impact to the existing reader
code though :)
  • Loading branch information
westonpace authored Oct 25, 2024
1 parent f17d88d commit b1abfff
Show file tree
Hide file tree
Showing 40 changed files with 3,996 additions and 1,805 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ rand = { version = "0.8.3", features = ["small_rng"] }
rangemap = { version = "1.0" }
rayon = "1.10"
roaring = "0.10.1"
rstest = "0.19.0"
rustc_version = "0.4"
serde = { version = "^1" }
serde_json = { version = "1" }
Expand Down
5 changes: 5 additions & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,13 @@ message MiniBlockLayout {
ArrayEncoding value_compression = 3;
}

message AllNullLayout {

}

message PageLayout {
oneof layout {
MiniBlockLayout mini_block_layout = 1;
AllNullLayout all_null_layout = 2;
}
}
222 changes: 222 additions & 0 deletions python/python/benchmarks/test_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors
from pathlib import Path

import pyarrow as pa
import pytest
from lance.file import LanceFileReader, LanceFileWriter
from lance.tracing import trace_to_chrome

trace_to_chrome(level="debug", file="/tmp/trace.json")

NUM_ROWS = 10_000_000
ROWS_TO_SAMPLE = 10


@pytest.mark.parametrize(
"version",
["2.0", "2.1"],
ids=["2_0", "2_1"],
)
@pytest.mark.benchmark(group="scan_single_column")
def test_scan_integer(tmp_path: Path, benchmark, version):
schema = pa.schema([pa.field("values", pa.uint64(), True)])

def gen_data():
remaining = NUM_ROWS
offset = 0
while remaining > 0:
to_take = min(remaining, 10000)
values = pa.array(range(offset, offset + to_take))
batch = pa.table({"values": values}).to_batches()[0]
yield batch
remaining -= to_take
offset += to_take

with LanceFileWriter(
str(tmp_path / "file.lance"), schema, version=version
) as writer:
for batch in gen_data():
writer.write_batch(batch)

def read_all():
reader = LanceFileReader(str(tmp_path / "file.lance"))
return reader.read_all(batch_size=16 * 1024).to_table()

result = benchmark.pedantic(read_all, rounds=1, iterations=1)

assert result.num_rows == NUM_ROWS


@pytest.mark.parametrize(
"version",
["2.0", "2.1"],
ids=["2_0", "2_1"],
)
@pytest.mark.benchmark(group="scan_single_column")
def test_scan_nullable_integer(tmp_path: Path, benchmark, version):
schema = pa.schema([pa.field("values", pa.uint64(), True)])

def gen_data():
remaining = NUM_ROWS
offset = 0
while remaining > 0:
to_take = min(remaining, 10000)
values = pa.array(
[None if i % 2 == 0 else i for i in range(offset, offset + to_take)]
)
batch = pa.table({"values": values}).to_batches()[0]
yield batch
remaining -= to_take
offset += to_take

with LanceFileWriter(
str(tmp_path / "file.lance"), schema, version=version
) as writer:
for batch in gen_data():
writer.write_batch(batch)

def read_all():
reader = LanceFileReader(str(tmp_path / "file.lance"))
return reader.read_all(batch_size=16 * 1024).to_table()

result = benchmark.pedantic(read_all, rounds=1, iterations=1)

assert result.num_rows == NUM_ROWS


@pytest.mark.benchmark(group="scan_single_column")
def test_scan_nested_integer(tmp_path: Path, benchmark):
def get_val(i: int):
if i % 4 == 0:
return None
elif i % 4 == 1:
return {"outer": None}
elif i % 4 == 2:
return {"outer": {"inner": None}}
else:
return {"outer": {"inner": i}}

dtype = pa.struct(
[pa.field("outer", pa.struct([pa.field("inner", pa.uint64(), True)]), True)]
)
schema = pa.schema(
[
pa.field(
"values",
dtype,
True,
)
]
)

def gen_data():
remaining = NUM_ROWS
offset = 0
while remaining > 0:
to_take = min(remaining, 10000)
values = pa.array([get_val(i) for i in range(offset, offset + to_take)])
batch = pa.table({"values": values}).to_batches()[0]
yield batch
remaining -= to_take
offset += to_take

with LanceFileWriter(str(tmp_path / "file.lance"), schema, version="2.1") as writer:
for batch in gen_data():
writer.write_batch(batch)

def read_all():
reader = LanceFileReader(str(tmp_path / "file.lance"))
return reader.read_all(batch_size=16 * 1024).to_table()

result = benchmark.pedantic(read_all, rounds=1, iterations=1)

assert result.num_rows == NUM_ROWS


@pytest.mark.parametrize(
"version",
["2.0", "2.1"],
ids=["2_0", "2_1"],
)
@pytest.mark.benchmark(group="sample_single_column")
def test_sample_integer(tmp_path: Path, benchmark, version):
schema = pa.schema([pa.field("values", pa.uint64(), True)])

def gen_data():
remaining = NUM_ROWS
offset = 0
while remaining > 0:
to_take = min(remaining, 10000)
values = pa.array(range(offset, offset + to_take))
batch = pa.table({"values": values}).to_batches()[0]
yield batch
remaining -= to_take
offset += to_take

with LanceFileWriter(
str(tmp_path / "file.lance"), schema, version=version
) as writer:
for batch in gen_data():
writer.write_batch(batch)

reader = LanceFileReader(str(tmp_path / "file.lance"))
indices = list(range(0, NUM_ROWS, NUM_ROWS // ROWS_TO_SAMPLE))

def sample():
return reader.take_rows(indices).to_table()

result = benchmark.pedantic(sample, rounds=30, iterations=1)

assert result.num_rows == NUM_ROWS


@pytest.mark.benchmark(group="sample_single_column")
def test_sample_nested_integer(tmp_path: Path, benchmark):
def get_val(i: int):
if i % 4 == 0:
return None
elif i % 4 == 1:
return {"outer": None}
elif i % 4 == 2:
return {"outer": {"inner": None}}
else:
return {"outer": {"inner": i}}

dtype = pa.struct(
[pa.field("outer", pa.struct([pa.field("inner", pa.uint64(), True)]), True)]
)
schema = pa.schema(
[
pa.field(
"values",
dtype,
True,
)
]
)

def gen_data():
remaining = NUM_ROWS
offset = 0
while remaining > 0:
to_take = min(remaining, 10000)
values = pa.array([get_val(i) for i in range(offset, offset + to_take)])
batch = pa.table({"values": values}).to_batches()[0]
yield batch
remaining -= to_take
offset += to_take

with LanceFileWriter(str(tmp_path / "file.lance"), schema, version="2.1") as writer:
for batch in gen_data():
writer.write_batch(batch)

reader = LanceFileReader(str(tmp_path / "file.lance"))
indices = list(range(0, NUM_ROWS, NUM_ROWS // ROWS_TO_SAMPLE))

def sample():
return reader.take_rows(indices).to_table()

result = benchmark.pedantic(sample, rounds=30, iterations=1)

assert result.num_rows == NUM_ROWS
16 changes: 7 additions & 9 deletions python/python/tests/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,13 @@ def test_version(tmp_path):
assert metadata.major_version == 0
assert metadata.minor_version == 3

# TODO: Temporarily disabled until read path for 2.1 is added
#
# path = tmp_path / "foo2.lance"
# with LanceFileWriter(str(path), schema, version="2.1") as writer:
# writer.write_batch(pa.table({"a": [1, 2, 3]}))
# reader = LanceFileReader(str(path))
# metadata = reader.metadata()
# assert metadata.major_version == 2
# assert metadata.minor_version == 1
path = tmp_path / "foo2.lance"
with LanceFileWriter(str(path), schema, version="2.1") as writer:
writer.write_batch(pa.table({"a": [1, 2, 3]}))
reader = LanceFileReader(str(path))
metadata = reader.metadata()
assert metadata.major_version == 2
assert metadata.minor_version == 1


def test_take(tmp_path):
Expand Down
7 changes: 4 additions & 3 deletions python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use bytes::Bytes;
use futures::stream::StreamExt;
use lance::io::{ObjectStore, RecordBatchStream};
use lance_core::cache::FileMetadataCache;
use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression};
use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
use lance_file::{
v2::{
reader::{BufferDescriptor, CachedFileMetadata, FileReader},
reader::{BufferDescriptor, CachedFileMetadata, FileReader, FileReaderOptions},
writer::{FileWriter, FileWriterOptions},
},
version::LanceFileVersion,
Expand Down Expand Up @@ -335,8 +335,9 @@ impl LanceFileReader {
let inner = FileReader::try_open(
file,
None,
Arc::<DecoderMiddlewareChain>::default(),
Arc::<DecoderPlugins>::default(),
&FileMetadataCache::no_cache(),
FileReaderOptions::default(),
)
.await
.infer_error()?;
Expand Down
1 change: 1 addition & 0 deletions rust/lance-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ lazy_static::lazy_static! {
]);
pub static ref BLOB_DESC_FIELD: ArrowField =
ArrowField::new("description", DataType::Struct(BLOB_DESC_FIELDS.clone()), false);
pub static ref BLOB_DESC_LANCE_FIELD: Field = Field::try_from(&*BLOB_DESC_FIELD).unwrap();
}

/// LogicalType is a string presentation of arrow type.
Expand Down
Loading

0 comments on commit b1abfff

Please sign in to comment.