Skip to content

Commit

Permalink
fix transaction api for rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Sep 11, 2023
1 parent 3b1781d commit 6b14c40
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 24 deletions.
42 changes: 30 additions & 12 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,10 @@ def _commit(
_Dataset.commit(base_uri, operation._to_inner(), read_version, commit_lock)
return LanceDataset(base_uri)

@property
def optimize(self) -> "DatasetOptimizer":
return DatasetOptimizer(self)


# LanceOperation is a namespace for operations that can be applied to a dataset.
class LanceOperation:
Expand Down Expand Up @@ -816,17 +820,35 @@ def _to_inner(self):

@dataclass
class Rewrite(BaseOperation):
old_fragments: Iterable[FragmentMetadata]
new_fragments: Iterable[FragmentMetadata]
"""
Operation that rewrites fragments but does not change the data within them.
def __post_init__(self):
LanceOperation._validate_fragments(self.old_fragments)
LanceOperation._validate_fragments(self.new_fragments)
This is for rearranging the data.
The data are grouped, such that each group contains the old fragments
and the new fragments those are rewritten into.
"""

groups: Iterable[RewriteGroup]

@dataclass
class RewriteGroup:
old_fragments: Iterable[FragmentMetadata]
new_fragments: Iterable[FragmentMetadata]

def __post_init__(self):
LanceOperation._validate_fragments(self.old_fragments)
LanceOperation._validate_fragments(self.new_fragments)

def _to_inner(self):
raw_old_fragments = [f._metadata for f in self.old_fragments]
raw_new_fragments = [f._metadata for f in self.new_fragments]
return _Operation.rewrite(raw_old_fragments, raw_new_fragments)
groups = [
(
[f._metadata for f in g.old_fragments],
[f._metadata for f in g.new_fragments],
)
for g in self.groups
]
return _Operation.rewrite(groups)

@dataclass
class Merge(BaseOperation):
Expand All @@ -847,10 +869,6 @@ class Restore(BaseOperation):
def _to_inner(self):
return _Operation.restore(self.version)

@property
def optimize(self) -> "DatasetOptimizer":
return DatasetOptimizer(self)


class ScannerBuilder:
def __init__(self, ds: LanceDataset):
Expand Down
3 changes: 2 additions & 1 deletion python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,8 @@ def test_rewrite_with_commit(tmp_path: Path):
to_be_rewrote = [lf.metadata for lf in lance.dataset(base_dir).get_fragments()]

fragment = lance.fragment.LanceFragment.create(base_dir, combined)
rewrite = lance.LanceOperation.Rewrite(to_be_rewrote, [fragment])
group = lance.LanceOperation.Rewrite.RewriteGroup(to_be_rewrote, [fragment])
rewrite = lance.LanceOperation.Rewrite([group])

dataset = lance.LanceDataset._commit(base_dir, rewrite, read_version=1)

Expand Down
21 changes: 10 additions & 11 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use arrow_array::{Float32Array, RecordBatch};
use arrow_data::ArrayData;
use arrow_schema::Schema as ArrowSchema;
use lance::arrow::as_fixed_size_list_array;

use lance::dataset::transaction::RewriteGroup;
use lance::dataset::{
fragment::FileFragment as LanceFileFragment, scanner::Scanner as LanceScanner,
transaction::Operation as LanceOperation, Dataset as LanceDataset, ReadParams, Version,
Expand Down Expand Up @@ -116,16 +116,15 @@ impl Operation {
}

#[staticmethod]
fn rewrite(
old_fragments: Vec<FragmentMetadata>,
new_fragments: Vec<FragmentMetadata>,
) -> PyResult<Self> {
let old_fragments = into_fragments(old_fragments);
let new_fragments = into_fragments(new_fragments);
let op = LanceOperation::Rewrite {
old_fragments,
new_fragments,
};
fn rewrite(groups: Vec<(Vec<FragmentMetadata>, Vec<FragmentMetadata>)>) -> PyResult<Self> {
let groups = groups
.into_iter()
.map(|(old_fragments, new_fragments)| RewriteGroup {
old_fragments: into_fragments(old_fragments),
new_fragments: into_fragments(new_fragments),
})
.collect::<Vec<_>>();
let op = LanceOperation::Rewrite { groups };
Ok(Self(op))
}

Expand Down

0 comments on commit 6b14c40

Please sign in to comment.