Skip to content

Commit

Permalink
Merge branch 'main' into feature/block-compress-level
Browse files Browse the repository at this point in the history
  • Loading branch information
broccoliSpicy authored Oct 25, 2024
2 parents 0346801 + b1abfff commit 8687239
Show file tree
Hide file tree
Showing 51 changed files with 4,378 additions and 1,886 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/run_integtests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ runs:
working-directory: python
shell: bash
run: |
pip3 install $(ls target/wheels/pylance-*.whl)[tests]
pip3 install $(ls target/wheels/pylance-*.whl)[tests,ray]
- name: Run python tests
shell: bash
working-directory: python
Expand Down
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,8 @@ repos:
rev: v1.0
hooks:
- id: fmt

- repo: https://github.com/crate-ci/typos
rev: v1.26.0
hooks:
- id: typos
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ rand = { version = "0.8.3", features = ["small_rng"] }
rangemap = { version = "1.0" }
rayon = "1.10"
roaring = "0.10.1"
rstest = "0.19.0"
rustc_version = "0.4"
serde = { version = "^1" }
serde_json = { version = "1" }
Expand Down
5 changes: 5 additions & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,13 @@ message MiniBlockLayout {
ArrayEncoding value_compression = 3;
}

message AllNullLayout {

}

message PageLayout {
oneof layout {
MiniBlockLayout mini_block_layout = 1;
AllNullLayout all_null_layout = 2;
}
}
4 changes: 2 additions & 2 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "pylance"
dependencies = ["pyarrow>=12", "numpy>=1.22,<2"]
dependencies = ["pyarrow>=12", "numpy>=1.22"]
description = "python wrapper for Lance columnar format"
authors = [{ name = "Lance Devs", email = "[email protected]" }]
license = { file = "LICENSE" }
Expand Down Expand Up @@ -62,7 +62,7 @@ benchmarks = ["pytest-benchmark"]
torch = ["torch"]
cuvs-cu11 = ["cuvs-cu11", "pylibraft-cu11"]
cuvs-cu12 = ["cuvs-cu12", "pylibraft-cu12"]
ray = ["ray[data]; python_version<'3.12'"]
ray = ["ray[data]<2.38; python_version<'3.12'"]

[tool.ruff]
lint.select = ["F", "E", "W", "I", "G", "TCH", "PERF", "B019"]
Expand Down
222 changes: 222 additions & 0 deletions python/python/benchmarks/test_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors
from pathlib import Path

import pyarrow as pa
import pytest
from lance.file import LanceFileReader, LanceFileWriter
from lance.tracing import trace_to_chrome

trace_to_chrome(level="debug", file="/tmp/trace.json")

NUM_ROWS = 10_000_000
ROWS_TO_SAMPLE = 10


@pytest.mark.parametrize(
"version",
["2.0", "2.1"],
ids=["2_0", "2_1"],
)
@pytest.mark.benchmark(group="scan_single_column")
def test_scan_integer(tmp_path: Path, benchmark, version):
schema = pa.schema([pa.field("values", pa.uint64(), True)])

def gen_data():
remaining = NUM_ROWS
offset = 0
while remaining > 0:
to_take = min(remaining, 10000)
values = pa.array(range(offset, offset + to_take))
batch = pa.table({"values": values}).to_batches()[0]
yield batch
remaining -= to_take
offset += to_take

with LanceFileWriter(
str(tmp_path / "file.lance"), schema, version=version
) as writer:
for batch in gen_data():
writer.write_batch(batch)

def read_all():
reader = LanceFileReader(str(tmp_path / "file.lance"))
return reader.read_all(batch_size=16 * 1024).to_table()

result = benchmark.pedantic(read_all, rounds=1, iterations=1)

assert result.num_rows == NUM_ROWS


@pytest.mark.parametrize(
"version",
["2.0", "2.1"],
ids=["2_0", "2_1"],
)
@pytest.mark.benchmark(group="scan_single_column")
def test_scan_nullable_integer(tmp_path: Path, benchmark, version):
schema = pa.schema([pa.field("values", pa.uint64(), True)])

def gen_data():
remaining = NUM_ROWS
offset = 0
while remaining > 0:
to_take = min(remaining, 10000)
values = pa.array(
[None if i % 2 == 0 else i for i in range(offset, offset + to_take)]
)
batch = pa.table({"values": values}).to_batches()[0]
yield batch
remaining -= to_take
offset += to_take

with LanceFileWriter(
str(tmp_path / "file.lance"), schema, version=version
) as writer:
for batch in gen_data():
writer.write_batch(batch)

def read_all():
reader = LanceFileReader(str(tmp_path / "file.lance"))
return reader.read_all(batch_size=16 * 1024).to_table()

result = benchmark.pedantic(read_all, rounds=1, iterations=1)

assert result.num_rows == NUM_ROWS


@pytest.mark.benchmark(group="scan_single_column")
def test_scan_nested_integer(tmp_path: Path, benchmark):
def get_val(i: int):
if i % 4 == 0:
return None
elif i % 4 == 1:
return {"outer": None}
elif i % 4 == 2:
return {"outer": {"inner": None}}
else:
return {"outer": {"inner": i}}

dtype = pa.struct(
[pa.field("outer", pa.struct([pa.field("inner", pa.uint64(), True)]), True)]
)
schema = pa.schema(
[
pa.field(
"values",
dtype,
True,
)
]
)

def gen_data():
remaining = NUM_ROWS
offset = 0
while remaining > 0:
to_take = min(remaining, 10000)
values = pa.array([get_val(i) for i in range(offset, offset + to_take)])
batch = pa.table({"values": values}).to_batches()[0]
yield batch
remaining -= to_take
offset += to_take

with LanceFileWriter(str(tmp_path / "file.lance"), schema, version="2.1") as writer:
for batch in gen_data():
writer.write_batch(batch)

def read_all():
reader = LanceFileReader(str(tmp_path / "file.lance"))
return reader.read_all(batch_size=16 * 1024).to_table()

result = benchmark.pedantic(read_all, rounds=1, iterations=1)

assert result.num_rows == NUM_ROWS


@pytest.mark.parametrize(
"version",
["2.0", "2.1"],
ids=["2_0", "2_1"],
)
@pytest.mark.benchmark(group="sample_single_column")
def test_sample_integer(tmp_path: Path, benchmark, version):
schema = pa.schema([pa.field("values", pa.uint64(), True)])

def gen_data():
remaining = NUM_ROWS
offset = 0
while remaining > 0:
to_take = min(remaining, 10000)
values = pa.array(range(offset, offset + to_take))
batch = pa.table({"values": values}).to_batches()[0]
yield batch
remaining -= to_take
offset += to_take

with LanceFileWriter(
str(tmp_path / "file.lance"), schema, version=version
) as writer:
for batch in gen_data():
writer.write_batch(batch)

reader = LanceFileReader(str(tmp_path / "file.lance"))
indices = list(range(0, NUM_ROWS, NUM_ROWS // ROWS_TO_SAMPLE))

def sample():
return reader.take_rows(indices).to_table()

result = benchmark.pedantic(sample, rounds=30, iterations=1)

assert result.num_rows == NUM_ROWS


@pytest.mark.benchmark(group="sample_single_column")
def test_sample_nested_integer(tmp_path: Path, benchmark):
def get_val(i: int):
if i % 4 == 0:
return None
elif i % 4 == 1:
return {"outer": None}
elif i % 4 == 2:
return {"outer": {"inner": None}}
else:
return {"outer": {"inner": i}}

dtype = pa.struct(
[pa.field("outer", pa.struct([pa.field("inner", pa.uint64(), True)]), True)]
)
schema = pa.schema(
[
pa.field(
"values",
dtype,
True,
)
]
)

def gen_data():
remaining = NUM_ROWS
offset = 0
while remaining > 0:
to_take = min(remaining, 10000)
values = pa.array([get_val(i) for i in range(offset, offset + to_take)])
batch = pa.table({"values": values}).to_batches()[0]
yield batch
remaining -= to_take
offset += to_take

with LanceFileWriter(str(tmp_path / "file.lance"), schema, version="2.1") as writer:
for batch in gen_data():
writer.write_batch(batch)

reader = LanceFileReader(str(tmp_path / "file.lance"))
indices = list(range(0, NUM_ROWS, NUM_ROWS // ROWS_TO_SAMPLE))

def sample():
return reader.take_rows(indices).to_table()

result = benchmark.pedantic(sample, rounds=30, iterations=1)

assert result.num_rows == NUM_ROWS
15 changes: 13 additions & 2 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1992,6 +1992,7 @@ def commit(
commit_lock: Optional[CommitLock] = None,
storage_options: Optional[Dict[str, str]] = None,
enable_v2_manifest_paths: Optional[bool] = None,
detached: Optional[bool] = False,
) -> LanceDataset:
"""Create a new version of dataset
Expand Down Expand Up @@ -2037,6 +2038,13 @@ def commit(
:meth:`migrate_manifest_paths_v2` method. Default is False. WARNING:
turning this on will make the dataset unreadable for older versions
of Lance (prior to 0.17.0).
detached : bool, optional
If True, then the commit will not be part of the dataset lineage. It will
never show up as the latest dataset and the only way to check it out in the
future will be to specifically check it out by version. The version will be
a random version that is only unique amongst detached commits. The caller
should store this somewhere as there will be no other way to obtain it in
the future.
Returns
-------
Expand Down Expand Up @@ -2074,15 +2082,18 @@ def commit(
f"commit_lock must be a function, got {type(commit_lock)}"
)

_Dataset.commit(
new_ds = _Dataset.commit(
base_uri,
operation._to_inner(),
read_version,
commit_lock,
storage_options=storage_options,
enable_v2_manifest_paths=enable_v2_manifest_paths,
detached=detached,
)
return LanceDataset(
base_uri, version=new_ds.version(), storage_options=storage_options
)
return LanceDataset(base_uri, storage_options=storage_options)

def validate(self):
"""
Expand Down
51 changes: 51 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2594,3 +2594,54 @@ def test_default_storage_version(tmp_path: Path):
sample_file = frag.to_json()["files"][0]
assert sample_file["file_major_version"] == EXPECTED_MAJOR_VERSION
assert sample_file["file_minor_version"] == EXPECTED_MINOR_VERSION


def test_no_detached_v1(tmp_path: Path):
table = pa.table({"x": [0]})
dataset = lance.write_dataset(table, tmp_path)

# Make a detached append
table = pa.table({"x": [1]})
frag = lance.LanceFragment.create(dataset.uri, table)
op = lance.LanceOperation.Append([frag])
with pytest.raises(OSError, match="v1 manifest paths"):
dataset.commit(dataset.uri, op, read_version=dataset.version, detached=True)


def test_detached_commits(tmp_path: Path):
table = pa.table({"x": [0]})
dataset = lance.write_dataset(table, tmp_path, enable_v2_manifest_paths=True)

# Make a detached append
table = pa.table({"x": [1]})
frag = lance.LanceFragment.create(dataset.uri, table)
op = lance.LanceOperation.Append([frag])
detached = dataset.commit(
dataset.uri, op, read_version=dataset.version, detached=True
)
assert (detached.version & 0x8000000000000000) != 0

assert detached.to_table() == pa.table({"x": [0, 1]})
# Detached commit should not show up in the dataset
dataset = lance.dataset(tmp_path)
assert dataset.to_table() == pa.table({"x": [0]})

# We can make more commits to dataset and they don't affect attached
table = pa.table({"x": [2]})
dataset = lance.write_dataset(table, tmp_path, mode="append")
assert dataset.to_table() == pa.table({"x": [0, 2]})

# We can check out the detached commit
detached = dataset.checkout_version(detached.version)
assert detached.to_table() == pa.table({"x": [0, 1]})

# Detached commit can use detached commit as read version
table = pa.table({"x": [3]})
frag = lance.LanceFragment.create(detached.uri, table)
op = lance.LanceOperation.Append([frag])

detached2 = dataset.commit(
dataset.uri, op, read_version=detached.version, detached=True
)

assert detached2.to_table() == pa.table({"x": [0, 1, 3]})
Loading

0 comments on commit 8687239

Please sign in to comment.