-
Notifications
You must be signed in to change notification settings - Fork 245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add file compaction #1095
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -354,3 +354,43 @@ The ability to achieve fast random access to individual rows plays a crucial rol | |||||
such as random sampling and shuffling in ML training. | ||||||
Additionally, it empowers users to construct secondary indices, | ||||||
enabling swift execution of queries for enhanced performance. | ||||||
|
||||||
|
||||||
Table Maintenance | ||||||
----------------- | ||||||
|
||||||
Some operations over time will cause a Lance dataset to have a poor layout. For | ||||||
example, many small appends will lead to a large number of small fragments. Or | ||||||
deleting many rows will lead to slower queries due to the need to filter out | ||||||
deleted rows. | ||||||
|
||||||
To address this, Lance provides methods for optimizing dataset layout. | ||||||
|
||||||
Compact data files | ||||||
~~~~~~~~~~~~~~~~~~ | ||||||
|
||||||
Data files can be rewritten so there are fewer files. When passing a | ||||||
``target_rows_per_fragment`` to :py:meth:`lance.dataset.DatasetOptimizer.compact_files`, | ||||||
Lance will skip any fragments that are already above that row count, and rewrite | ||||||
others. Fragments will be merged according to their fragment ids, so the inherent | ||||||
ordering of the data will be preserved. | ||||||
|
||||||
.. note:: | ||||||
|
||||||
Compaction creates a new version of the table. It does not delete the old | ||||||
version of the table and the files referenced by it. | ||||||
|
||||||
.. code-block:: python | ||||||
|
||||||
import lance | ||||||
|
||||||
dataset = lance.dataset("./alice_and_bob.lance") | ||||||
dataset.optimize.compact_files(target_rows_per_fragment=1024 * 1024) | ||||||
|
||||||
During compaction, Lance can also remove deleted rows. Rewritten fragments will | ||||||
not have deletion files. This can improve scan performance since the soft deleted | ||||||
westonpace marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
rows don't have to be skipped during the scan. | ||||||
|
||||||
When files are rewritten, the original row ids are invalidated. This means the | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious whether we should have a more thoughtful design for row ids There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've been thinking about it but haven't come up with anything promising. I think the best route is finding ways to remap the row ids in the indices so we can quickly update them during compaction. |
||||||
affected files are no longer part of any ANN index if they were before. Because | ||||||
of this, it's recommended to rewrite files before re-building indices. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I'm not sure if you are trying to say "after you rewrite you need to make sure and rebuild indices" or "if you're going to build indices, you should probably rewrite first so you don't lose the index data later" but I think the former is more essential to communicate (though both could also be said if you want) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess I say it this way because I don't think you have to rebuild the index after compaction. If the part that was compacted was data that wasn't indexed in the first place (because it was recently appended), you don't necessarily have to rebuild the index. But it is always a waste to build indices and then do compaction, if you are planning on doing both. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,11 +27,13 @@ | |
import numpy as np | ||
import pyarrow as pa | ||
import pyarrow.dataset | ||
from lance.optimize import Compaction | ||
from pyarrow import RecordBatch, Schema | ||
from pyarrow._compute import Expression | ||
|
||
from .commit import CommitLock | ||
from .fragment import FragmentMetadata, LanceFragment | ||
from .lance import CompactionMetrics as CompactionMetrics | ||
from .lance import __version__ as __version__ | ||
from .lance import _Dataset, _Operation, _Scanner, _write_dataset | ||
|
||
|
@@ -749,6 +751,10 @@ def _commit( | |
_Dataset.commit(base_uri, operation._to_inner(), read_version, commit_lock) | ||
return LanceDataset(base_uri) | ||
|
||
@property | ||
def optimize(self) -> "DatasetOptimizer": | ||
return DatasetOptimizer(self) | ||
|
||
|
||
# LanceOperation is a namespace for operations that can be applied to a dataset. | ||
class LanceOperation: | ||
|
@@ -814,17 +820,35 @@ def _to_inner(self): | |
|
||
@dataclass | ||
class Rewrite(BaseOperation): | ||
old_fragments: Iterable[FragmentMetadata] | ||
new_fragments: Iterable[FragmentMetadata] | ||
""" | ||
Operation that rewrites fragments but does not change the data within them. | ||
|
||
def __post_init__(self): | ||
LanceOperation._validate_fragments(self.old_fragments) | ||
LanceOperation._validate_fragments(self.new_fragments) | ||
This is for rearranging the data. | ||
|
||
The data are grouped, such that each group contains the old fragments | ||
and the new fragments those are rewritten into. | ||
""" | ||
|
||
groups: Iterable[RewriteGroup] | ||
|
||
@dataclass | ||
class RewriteGroup: | ||
old_fragments: Iterable[FragmentMetadata] | ||
new_fragments: Iterable[FragmentMetadata] | ||
|
||
def __post_init__(self): | ||
LanceOperation._validate_fragments(self.old_fragments) | ||
LanceOperation._validate_fragments(self.new_fragments) | ||
|
||
def _to_inner(self): | ||
raw_old_fragments = [f._metadata for f in self.old_fragments] | ||
raw_new_fragments = [f._metadata for f in self.new_fragments] | ||
return _Operation.rewrite(raw_old_fragments, raw_new_fragments) | ||
groups = [ | ||
( | ||
[f._metadata for f in g.old_fragments], | ||
[f._metadata for f in g.new_fragments], | ||
) | ||
for g in self.groups | ||
] | ||
return _Operation.rewrite(groups) | ||
|
||
@dataclass | ||
class Merge(BaseOperation): | ||
|
@@ -1081,6 +1105,70 @@ def count_rows(self): | |
return self._scanner.count_rows() | ||
|
||
|
||
class DatasetOptimizer: | ||
def __init__(self, dataset: LanceDataset): | ||
self._dataset = dataset | ||
|
||
def compact_files( | ||
self, | ||
*, | ||
target_rows_per_fragment: int = 1024 * 1024, | ||
max_rows_per_group: int = 1024, | ||
materialize_deletions: bool = True, | ||
materialize_deletions_threshold: float = 0.1, | ||
num_threads: Optional[int] = None, | ||
) -> CompactionMetrics: | ||
"""Compacts small files in the dataset, reducing total number of files. | ||
|
||
This does a few things: | ||
* Removes deleted rows from fragments | ||
* Removes dropped columns from fragments | ||
* Merges small fragments into larger ones | ||
|
||
This method preserves the insertion order of the dataset. This may mean | ||
it leaves small fragments in the dataset if they are not adjacent to | ||
other fragments that need compaction. For example, if you have fragments | ||
with row counts 5 million, 100, and 5 million, the middle fragment will | ||
not be compacted because the fragments it is adjacent to do not need | ||
compaction. | ||
|
||
Parameters | ||
---------- | ||
target_rows_per_fragment: int, default 1024*1024 | ||
The target number of rows per fragment. This is the number of rows | ||
that will be in each fragment after compaction. | ||
max_rows_per_group: int, default 1024 | ||
Max number of rows per group. This does not affect which fragments | ||
need compaction, but does affect how they are re-written if selected. | ||
materialize_deletions: bool, default True | ||
Whether to compact fragments with soft deleted rows so they are no | ||
longer present in the file. | ||
materialize_deletions_threshold: float, default 0.1 | ||
The fraction of original rows that are soft deleted in a fragment | ||
before the fragment is a candidate for compaction. | ||
Comment on lines
+1146
to
+1148
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I would think a simple limit (e.g. 100 soft deleted rows) would be fine here. Why go with proportion? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can't say I have great justification, without having any benchmarks. I've heard engineers on similar projects say 10% was the threshold they saw deteriorations in scan performance, so I guess here's a little cargo-culting :) I guess I could setup a benchmark parametrized by scale and proportion deleted to see how scan performance is affected. And we can see if a fixed value or a proportional value allows for the most broadly applicable default value.
westonpace marked this conversation as resolved.
Show resolved
Hide resolved
|
||
num_threads: int, optional | ||
The number of threads to use when performing compaction. If not | ||
specified, defaults to the number of cores on the machine. | ||
|
||
Returns | ||
------- | ||
CompactionMetrics | ||
Metrics about the compaction process | ||
|
||
See Also | ||
-------- | ||
lance.optimize.Compaction | ||
""" | ||
opts = dict( | ||
target_rows_per_fragment=target_rows_per_fragment, | ||
max_rows_per_group=max_rows_per_group, | ||
materialize_deletions=materialize_deletions, | ||
materialize_deletions_threshold=materialize_deletions_threshold, | ||
num_threads=num_threads, | ||
) | ||
return Compaction.execute(self._dataset, opts) | ||
|
||
|
||
def write_dataset( | ||
data_obj: ReaderLike, | ||
uri: Union[str, Path], | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
# Copyright (c) 2023. Lance Developers | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from typing import Optional, TypedDict | ||
|
||
# Re-exported from native module. See src/dataset/optimize.rs for implementation. | ||
from .lance import Compaction as Compaction | ||
from .lance import CompactionMetrics as CompactionMetrics | ||
from .lance import CompactionTask as CompactionTask | ||
from .lance import RewriteResult as RewriteResult | ||
|
||
|
||
class CompactionOptions(TypedDict): | ||
"""Options for compaction.""" | ||
|
||
target_rows_per_fragment: Optional[int] | ||
""" | ||
The target number of rows per fragment. This is the number of rows | ||
that will be in each fragment after compaction. (default: 1024*1024) | ||
""" | ||
max_rows_per_group: Optional[int] | ||
""" | ||
Max number of rows per group. This does not affect which fragments | ||
need compaction, but does affect how they are re-written if selected. | ||
(default: 1024) | ||
""" | ||
materialize_deletions: Optional[bool] | ||
""" | ||
Whether to compact fragments with soft deleted rows so they are no | ||
longer present in the file. (default: True) | ||
""" | ||
materialize_deletions_threadhold: Optional[float] | ||
""" | ||
The fraction of original rows that are soft deleted in a fragment | ||
before the fragment is a candidate for compaction. | ||
(default: 0.1 = 10%) | ||
""" | ||
num_threads: Optional[int] | ||
""" | ||
The number of threads to use when performing compaction. If not | ||
specified, defaults to the number of cores on the machine. | ||
""" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
from typing import List | ||
|
||
from lance import Dataset | ||
from lance.fragment import FragmentMetadata | ||
from lance.optimize import CompactionOptions | ||
|
||
class CompactionMetrics: | ||
fragments_removed: int | ||
fragments_added: int | ||
files_removed: int | ||
files_added: int | ||
|
||
class RewriteResult: | ||
read_version: int | ||
metrics: CompactionMetrics | ||
old_fragments: List["FragmentMetadata"] | ||
new_fragments: List["FragmentMetadata"] | ||
|
||
class CompactionTask: | ||
read_version: int | ||
fragments: List["FragmentMetadata"] | ||
|
||
def execute(self, dataset: "Dataset") -> RewriteResult: ... | ||
|
||
class CompactionPlan: | ||
read_version: int | ||
tasks: List[CompactionTask] | ||
|
||
def num_tasks(self) -> int: ... | ||
|
||
class Compaction: | ||
@staticmethod | ||
def execute( | ||
dataset: "Dataset", options: CompactionOptions | ||
) -> CompactionMetrics: ... | ||
@staticmethod | ||
def plan(dataset: "Dataset", options: CompactionOptions) -> CompactionPlan: ... | ||
@staticmethod | ||
def commit( | ||
dataset: "Dataset", rewrites: List[RewriteResult] | ||
) -> CompactionMetrics: ... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This results to a new version , right? Old dataset still exist (and wait for later GC to clean them up).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Adding a clarification for this.