Skip to content

Commit

Permalink
feat: add version tags (#2482)
Browse files Browse the repository at this point in the history
Closes #588

This PR takes a different course from what was proposed in #588 and the
subsequent PR #605. Rather than using the existing `tag` field in the
manifest, this PR adds a new directory at the root of the dataset called
`_refs/tags`. This new directory contains files, where each filename
maps to a JSON that contains the version number. When a user wants to
checkout a specific tag, we do a lookup to find the version number, and
then simply call `checkout_version` under-the-hood. By considering tags
outside of the manifests file, we get the following benefits (all of
which are consistent with the more familiar git tag):

- Allow users to freely create and delete tags retroactively.
- Allow multiple tags to point to the same underlying version.
- Easily enforce that tag names are unique.
- Allow users to re-use a previously deleted tags(so long as the name is
unique among all other tags at the time of reuse).

---------

Co-authored-by: Will Jones <[email protected]>
  • Loading branch information
dsgibbons and wjones127 authored Jul 30, 2024
1 parent d75b16f commit 2044585
Show file tree
Hide file tree
Showing 11 changed files with 861 additions and 56 deletions.
30 changes: 15 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ exclude = ["python"]
resolver = "2"

[workspace.package]
version = "0.15.1"
version = "0.16.0"
edition = "2021"
authors = ["Lance Devs <[email protected]>"]
license = "Apache-2.0"
Expand All @@ -44,20 +44,20 @@ categories = [
rust-version = "1.78"

[workspace.dependencies]
lance = { version = "=0.15.1", path = "./rust/lance" }
lance-arrow = { version = "=0.15.1", path = "./rust/lance-arrow" }
lance-core = { version = "=0.15.1", path = "./rust/lance-core" }
lance-datafusion = { version = "=0.15.1", path = "./rust/lance-datafusion" }
lance-datagen = { version = "=0.15.1", path = "./rust/lance-datagen" }
lance-encoding = { version = "=0.15.1", path = "./rust/lance-encoding" }
lance-encoding-datafusion = { version = "=0.15.1", path = "./rust/lance-encoding-datafusion" }
lance-file = { version = "=0.15.1", path = "./rust/lance-file" }
lance-index = { version = "=0.15.1", path = "./rust/lance-index" }
lance-io = { version = "=0.15.1", path = "./rust/lance-io" }
lance-linalg = { version = "=0.15.1", path = "./rust/lance-linalg" }
lance-table = { version = "=0.15.1", path = "./rust/lance-table" }
lance-test-macros = { version = "=0.15.1", path = "./rust/lance-test-macros" }
lance-testing = { version = "=0.15.1", path = "./rust/lance-testing" }
lance = { version = "=0.16.0", path = "./rust/lance" }
lance-arrow = { version = "=0.16.0", path = "./rust/lance-arrow" }
lance-core = { version = "=0.16.0", path = "./rust/lance-core" }
lance-datafusion = { version = "=0.16.0", path = "./rust/lance-datafusion" }
lance-datagen = { version = "=0.16.0", path = "./rust/lance-datagen" }
lance-encoding = { version = "=0.16.0", path = "./rust/lance-encoding" }
lance-encoding-datafusion = { version = "=0.16.0", path = "./rust/lance-encoding-datafusion" }
lance-file = { version = "=0.16.0", path = "./rust/lance-file" }
lance-index = { version = "=0.16.0", path = "./rust/lance-index" }
lance-io = { version = "=0.16.0", path = "./rust/lance-io" }
lance-linalg = { version = "=0.16.0", path = "./rust/lance-linalg" }
lance-table = { version = "=0.16.0", path = "./rust/lance-table" }
lance-test-macros = { version = "=0.16.0", path = "./rust/lance-test-macros" }
lance-testing = { version = "=0.16.0", path = "./rust/lance-testing" }
approx = "0.5.1"
# Note that this one does not include pyarrow
arrow = { version = "52.2", optional = false, features = ["prettyprint"] }
Expand Down
1 change: 1 addition & 0 deletions benchmarks/sift/perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# limitations under the License.

"""Microbenchmark for performance"""

import shutil
import time

Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pylance"
version = "0.15.1"
version = "0.16.0"
edition = "2021"
authors = ["Lance Devs <[email protected]>"]
rust-version = "1.65"
Expand Down
75 changes: 72 additions & 3 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ def uri(self) -> str:
"""
return self._uri

@property
def tags(self) -> Tags:
return Tags(self._ds)

def list_indices(self) -> List[Dict[str, Any]]:
if getattr(self, "_list_indices_res", None) is None:
self._list_indices_res = self._ds.load_indices()
Expand Down Expand Up @@ -908,7 +912,7 @@ def drop_columns(self, columns: List[str]):
This is a metadata-only operation and does not remove the data from the
underlying storage. In order to remove the data, you must subsequently
call ``compact_files`` to rewrite the data without the removed columns and
then call ``cleanup_files`` to remove the old files.
then call ``cleanup_old_versions`` to remove the old files.
Examples
--------
Expand Down Expand Up @@ -1081,13 +1085,23 @@ def latest_version(self) -> int:
"""
return self._ds.latest_version()

def checkout_version(self, version) -> "LanceDataset":
def checkout_version(self, version: int | str) -> "LanceDataset":
"""
Load the given version of the dataset.
Unlike the :func:`dataset` constructor, this will re-use the
current cache.
This is a no-op if the dataset is already at the given version.
Parameters
----------
version: int | str,
The version to check out. A version number (`int`) or a tag
(`str`) can be provided.
Returns
-------
LanceDataset
"""
ds = copy.copy(self)
if version != ds.version:
Expand All @@ -1107,6 +1121,7 @@ def cleanup_old_versions(
older_than: Optional[timedelta] = None,
*,
delete_unverified: bool = False,
error_if_tagged_old_versions: bool = True,
) -> CleanupStats:
"""
Cleans up old versions of the dataset.
Expand Down Expand Up @@ -1135,11 +1150,19 @@ def cleanup_old_versions(
This should only be set to True if you can guarantee that no other process
is currently working on this dataset. Otherwise the dataset could be put
into a corrupted state.
error_if_tagged_old_versions: bool, default True
Some versions may have tags associated with them. Tagged versions will
not be cleaned up, regardless of how old they are. If this argument
is set to `True` (the default), an exception will be raised if any
tagged versions match the parameters. Otherwise, tagged versions will
be ignored without any error and only untagged versions will be
cleaned up.
"""
if older_than is None:
older_than = timedelta(days=14)
return self._ds.cleanup_old_versions(
td_to_micros(older_than), delete_unverified
td_to_micros(older_than), delete_unverified, error_if_tagged_old_versions
)

def create_scalar_index(
Expand Down Expand Up @@ -2519,6 +2542,52 @@ def optimize_indices(self, **kwargs):
self._dataset._ds.optimize_indices(**kwargs)


class Tags:
"""
Dataset tag manager.
"""

def __init__(self, dataset: _Dataset):
self._ds = dataset

def list(self) -> dict[str, int]:
"""
List all dataset tags.
Returns
-------
dict[str, int]
A dictionary mapping tag names to version numbers.
"""
return self._ds.tags()

def create(self, tag: str, version: int) -> None:
"""
Create a tag for a given dataset version.
Parameters
----------
tag: str,
The name of the tag to create. This name must be unique among all tag
names for the dataset.
version: int,
The dataset version to tag.
"""
self._ds.create_tag(tag, version)

def delete(self, tag: str) -> None:
"""
Delete tag from the dataset.
Parameters
----------
tag: str,
The name of the tag to delete.
"""
self._ds.delete_tag(tag)


class DatasetStats(TypedDict):
num_deleted_rows: int
num_fragments: int
Expand Down
96 changes: 96 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,40 @@ def test_asof_checkout(tmp_path: Path):
assert len(ds.to_table()) == 9


def test_tag(tmp_path: Path):
table = pa.Table.from_pydict({"colA": [1, 2, 3], "colB": [4, 5, 6]})
base_dir = tmp_path / "test"

lance.write_dataset(table, base_dir)
ds = lance.write_dataset(table, base_dir, mode="append")

assert len(ds.tags.list()) == 0

with pytest.raises(ValueError):
ds.tags.create("tag1", 3)

with pytest.raises(ValueError):
ds.tags.delete("tag1")

ds.tags.create("tag1", 1)
assert len(ds.tags.list()) == 1

with pytest.raises(ValueError):
ds.tags.create("tag1", 1)

ds.tags.delete("tag1")

ds.tags.create("tag1", 1)
ds.tags.create("tag2", 1)

assert len(ds.tags.list()) == 2

with pytest.raises(OSError):
ds.checkout_version("tag3")

assert ds.checkout_version("tag1").version == 1


def test_sample(tmp_path: Path):
table1 = pa.Table.from_pydict({"x": [0, 10, 20, 30, 40, 50], "y": range(6)})
base_dir = tmp_path / "test"
Expand Down Expand Up @@ -594,6 +628,68 @@ def test_cleanup_old_versions(tmp_path):
assert stats.old_versions == 1


def test_cleanup_error_when_tagged_old_versions(tmp_path):
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
base_dir = tmp_path / "test"
lance.write_dataset(table, base_dir)
lance.write_dataset(table, base_dir, mode="overwrite")
moment = datetime.now()
lance.write_dataset(table, base_dir, mode="overwrite")

dataset = lance.dataset(base_dir)
dataset.tags.create("old-tag", 1)
dataset.tags.create("another-old-tag", 2)

with pytest.raises(OSError):
dataset.cleanup_old_versions(older_than=(datetime.now() - moment))
assert len(dataset.versions()) == 3

dataset.tags.delete("old-tag")
with pytest.raises(OSError):
dataset.cleanup_old_versions(older_than=(datetime.now() - moment))
assert len(dataset.versions()) == 3

dataset.tags.delete("another-old-tag")
stats = dataset.cleanup_old_versions(older_than=(datetime.now() - moment))
assert stats.bytes_removed > 0
assert stats.old_versions == 2
assert len(dataset.versions()) == 1


def test_cleanup_around_tagged_old_versions(tmp_path):
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
base_dir = tmp_path / "test"
lance.write_dataset(table, base_dir)
lance.write_dataset(table, base_dir, mode="overwrite")
moment = datetime.now()
lance.write_dataset(table, base_dir, mode="overwrite")

dataset = lance.dataset(base_dir)
dataset.tags.create("old-tag", 1)
dataset.tags.create("another-old-tag", 2)
dataset.tags.create("tag-latest", 3)

stats = dataset.cleanup_old_versions(
older_than=(datetime.now() - moment), error_if_tagged_old_versions=False
)
assert stats.bytes_removed == 0
assert stats.old_versions == 0

dataset.tags.delete("old-tag")
stats = dataset.cleanup_old_versions(
older_than=(datetime.now() - moment), error_if_tagged_old_versions=False
)
assert stats.bytes_removed > 0
assert stats.old_versions == 1

dataset.tags.delete("another-old-tag")
stats = dataset.cleanup_old_versions(
older_than=(datetime.now() - moment), error_if_tagged_old_versions=False
)
assert stats.bytes_removed > 0
assert stats.old_versions == 1


def test_create_from_commit(tmp_path: Path):
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
base_dir = tmp_path / "test"
Expand Down
Loading

0 comments on commit 2044585

Please sign in to comment.