Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add file compaction #1095

Merged
merged 4 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def setup(app):
napoleon_include_private_with_doc = False
napoleon_include_special_with_doc = False

autodoc_typehints = "signature"

# Add any paths that contain templates here, relative to this directory.
templates_path = ["_templates"]

Expand Down
40 changes: 40 additions & 0 deletions docs/read_and_write.rst
Original file line number Diff line number Diff line change
Expand Up @@ -354,3 +354,43 @@ The ability to achieve fast random access to individual rows plays a crucial rol
such as random sampling and shuffling in ML training.
Additionally, it empowers users to construct secondary indices,
enabling swift execution of queries for enhanced performance.


Table Maintenance
-----------------

Some operations over time will cause a Lance dataset to have a poor layout. For
example, many small appends will lead to a large number of small fragments. Or
deleting many rows will lead to slower queries due to the need to filter out
deleted rows.

To address this, Lance provides methods for optimizing dataset layout.

Compact data files
~~~~~~~~~~~~~~~~~~

Data files can be rewritten so there are fewer files. When passing a
``target_rows_per_fragment`` to :py:meth:`lance.dataset.DatasetOptimizer.compact_files`,
Lance will skip any fragments that are already above that row count, and rewrite
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This results to a new version , right? Old dataset still exist (and wait for later GC to clean them up).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Adding a clarification for this.

others. Fragments will be merged according to their fragment ids, so the inherent
ordering of the data will be preserved.

.. note::

Compaction creates a new version of the table. It does not delete the old
version of the table and the files referenced by it.

.. code-block:: python

import lance

dataset = lance.dataset("./alice_and_bob.lance")
dataset.optimize.compact_files(target_rows_per_fragment=1024 * 1024)

During compaction, Lance can also remove deleted rows. Rewritten fragments will
not have deletion files. This can improve scan performance since the soft deleted
westonpace marked this conversation as resolved.
Show resolved Hide resolved
rows don't have to be skipped during the scan.

When files are rewritten, the original row ids are invalidated. This means the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious whether we should have a more thoughtful design for row ids

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking about it but haven't come up with anything promising. I think the best route is finding ways to remap the row ids in the indices so we can quickly update them during compaction.

affected files are no longer part of any ANN index if they were before. Because
of this, it's recommended to rewrite files before re-building indices.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
of this, it's recommended to rewrite files before re-building indices.
of this, it's recommended to rebuild indices after rewriting files.

I'm not sure if you are trying to say "after you rewrite you need to make sure and rebuild indices" or "if you're going to build indices, you should probably rewrite first so you don't lose the index data later" but I think the former is more essential to communicate (though both could also be said if you want)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I say it this way because I don't think you have to rebuild the index after compaction. If the part that was compacted was data that wasn't indexed in the first place (because it was recently appended), you don't necessarily have to rebuild the index. But it is always a waste to build indices and then do compaction, if you are planning on doing both.

18 changes: 18 additions & 0 deletions protos/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,30 @@ message Transaction {
message Rewrite {
// The old fragments that are being replaced
//
// DEPRECATED: use groups instead.
//
// These should all have existing fragment IDs.
repeated DataFragment old_fragments = 1;
// The new fragments
//
// DEPRECATED: use groups instead.
//
// These fragments IDs are not yet assigned.
repeated DataFragment new_fragments = 2;

// A group of rewrite files that are all part of the same rewrite.
message RewriteGroup {
// The old fragment that is being replaced
//
// This should have an existing fragment ID.
repeated DataFragment old_fragments = 1;
// The new fragment
//
// This fragment ID is not yet assigned.
repeated DataFragment new_fragments = 2;
}

repeated RewriteGroup groups = 3;
}

// An operation that merges in a new column, altering the schema.
Expand Down
1 change: 1 addition & 0 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pyo3 = { version = "0.19", features = ["extension-module", "abi3-py38"] }
tokio = { version = "1.23", features = ["rt-multi-thread"] }
uuid = "1.3.0"
serde_json = "1"
num_cpus = "1"

[build-dependencies]
prost-build = "0.11"
Expand Down
104 changes: 96 additions & 8 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import numpy as np
import pyarrow as pa
import pyarrow.dataset
from lance.optimize import Compaction
from pyarrow import RecordBatch, Schema
from pyarrow._compute import Expression

from .commit import CommitLock
from .fragment import FragmentMetadata, LanceFragment
from .lance import CompactionMetrics as CompactionMetrics
from .lance import __version__ as __version__
from .lance import _Dataset, _Operation, _Scanner, _write_dataset

Expand Down Expand Up @@ -749,6 +751,10 @@ def _commit(
_Dataset.commit(base_uri, operation._to_inner(), read_version, commit_lock)
return LanceDataset(base_uri)

@property
def optimize(self) -> "DatasetOptimizer":
return DatasetOptimizer(self)


# LanceOperation is a namespace for operations that can be applied to a dataset.
class LanceOperation:
Expand Down Expand Up @@ -814,17 +820,35 @@ def _to_inner(self):

@dataclass
class Rewrite(BaseOperation):
old_fragments: Iterable[FragmentMetadata]
new_fragments: Iterable[FragmentMetadata]
"""
Operation that rewrites fragments but does not change the data within them.

def __post_init__(self):
LanceOperation._validate_fragments(self.old_fragments)
LanceOperation._validate_fragments(self.new_fragments)
This is for rearranging the data.

The data are grouped, such that each group contains the old fragments
and the new fragments those are rewritten into.
"""

groups: Iterable[RewriteGroup]

@dataclass
class RewriteGroup:
old_fragments: Iterable[FragmentMetadata]
new_fragments: Iterable[FragmentMetadata]

def __post_init__(self):
LanceOperation._validate_fragments(self.old_fragments)
LanceOperation._validate_fragments(self.new_fragments)

def _to_inner(self):
raw_old_fragments = [f._metadata for f in self.old_fragments]
raw_new_fragments = [f._metadata for f in self.new_fragments]
return _Operation.rewrite(raw_old_fragments, raw_new_fragments)
groups = [
(
[f._metadata for f in g.old_fragments],
[f._metadata for f in g.new_fragments],
)
for g in self.groups
]
return _Operation.rewrite(groups)

@dataclass
class Merge(BaseOperation):
Expand Down Expand Up @@ -1081,6 +1105,70 @@ def count_rows(self):
return self._scanner.count_rows()


class DatasetOptimizer:
def __init__(self, dataset: LanceDataset):
self._dataset = dataset

def compact_files(
self,
*,
target_rows_per_fragment: int = 1024 * 1024,
max_rows_per_group: int = 1024,
materialize_deletions: bool = True,
materialize_deletions_threshold: float = 0.1,
num_threads: Optional[int] = None,
) -> CompactionMetrics:
"""Compacts small files in the dataset, reducing total number of files.

This does a few things:
* Removes deleted rows from fragments
* Removes dropped columns from fragments
* Merges small fragments into larger ones

This method preserves the insertion order of the dataset. This may mean
it leaves small fragments in the dataset if they are not adjacent to
other fragments that need compaction. For example, if you have fragments
with row counts 5 million, 100, and 5 million, the middle fragment will
not be compacted because the fragments it is adjacent to do not need
compaction.

Parameters
----------
target_rows_per_fragment: int, default 1024*1024
The target number of rows per fragment. This is the number of rows
that will be in each fragment after compaction.
max_rows_per_group: int, default 1024
Max number of rows per group. This does not affect which fragments
need compaction, but does affect how they are re-written if selected.
materialize_deletions: bool, default True
Whether to compact fragments with soft deleted rows so they are no
longer present in the file.
materialize_deletions_threshold: float, default 0.1
The fraction of original rows that are soft deleted in a fragment
before the fragment is a candidate for compaction.
Comment on lines +1146 to +1148
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I would think a simple limit (e.g. 100 soft deleted rows) would be fine here. Why go with proportion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't say I have great justification, without having any benchmarks. I've heard engineers on similar projects say 10% was the threshold they saw deteriorations in scan performance, so I guess here's a little cargo-culting :)

I guess I could setup a benchmark parametrized by scale and proportion deleted to see how scan performance is affected. And we can see if a fixed value or a proportional value allows for the most broadly applicable default value.

westonpace marked this conversation as resolved.
Show resolved Hide resolved
num_threads: int, optional
The number of threads to use when performing compaction. If not
specified, defaults to the number of cores on the machine.

Returns
-------
CompactionMetrics
Metrics about the compaction process

See Also
--------
lance.optimize.Compaction
"""
opts = dict(
target_rows_per_fragment=target_rows_per_fragment,
max_rows_per_group=max_rows_per_group,
materialize_deletions=materialize_deletions,
materialize_deletions_threshold=materialize_deletions_threshold,
num_threads=num_threads,
)
return Compaction.execute(self._dataset, opts)


def write_dataset(
data_obj: ReaderLike,
uri: Union[str, Path],
Expand Down
6 changes: 3 additions & 3 deletions python/python/lance/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import json
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, Optional, Union
from typing import TYPE_CHECKING, Callable, Iterator, Optional, Union

try:
import pandas as pd
Expand Down Expand Up @@ -55,12 +55,12 @@ def __eq__(self, other: object) -> bool:
return False
return self._metadata.__eq__(other._metadata)

def to_json(self) -> Dict[str, Any]:
def to_json(self) -> str:
"""Serialize :class:`FragmentMetadata` to a JSON blob"""
return json.loads(self._metadata.json())

@staticmethod
def from_json(json_data: Dict[str, Any]) -> FragmentMetadata:
def from_json(json_data: str) -> FragmentMetadata:
"""Reconstruct :class:`FragmentMetadata` from a JSON blob"""
return FragmentMetadata(json_data)

Expand Down
6 changes: 6 additions & 0 deletions python/python/lance/lance.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,9 @@ def infer_tfrecord_schema(
string_features: Optional[List[str]] = None,
) -> pa.Schema: ...
def read_tfrecord(uri: str, schema: pa.Schema) -> pa.RecordBatchReader: ...

class CompactionMetrics:
fragments_removed: int
fragments_added: int
files_removed: int
files_added: int
53 changes: 53 additions & 0 deletions python/python/lance/optimize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright (c) 2023. Lance Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional, TypedDict

# Re-exported from native module. See src/dataset/optimize.rs for implementation.
from .lance import Compaction as Compaction
from .lance import CompactionMetrics as CompactionMetrics
from .lance import CompactionTask as CompactionTask
from .lance import RewriteResult as RewriteResult


class CompactionOptions(TypedDict):
"""Options for compaction."""

target_rows_per_fragment: Optional[int]
"""
The target number of rows per fragment. This is the number of rows
that will be in each fragment after compaction. (default: 1024*1024)
"""
max_rows_per_group: Optional[int]
"""
Max number of rows per group. This does not affect which fragments
need compaction, but does affect how they are re-written if selected.
(default: 1024)
"""
materialize_deletions: Optional[bool]
"""
Whether to compact fragments with soft deleted rows so they are no
longer present in the file. (default: True)
"""
materialize_deletions_threadhold: Optional[float]
"""
The fraction of original rows that are soft deleted in a fragment
before the fragment is a candidate for compaction.
(default: 0.1 = 10%)
"""
num_threads: Optional[int]
"""
The number of threads to use when performing compaction. If not
specified, defaults to the number of cores on the machine.
"""
41 changes: 41 additions & 0 deletions python/python/lance/optimize.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from typing import List

from lance import Dataset
from lance.fragment import FragmentMetadata
from lance.optimize import CompactionOptions

class CompactionMetrics:
fragments_removed: int
fragments_added: int
files_removed: int
files_added: int

class RewriteResult:
read_version: int
metrics: CompactionMetrics
old_fragments: List["FragmentMetadata"]
new_fragments: List["FragmentMetadata"]

class CompactionTask:
read_version: int
fragments: List["FragmentMetadata"]

def execute(self, dataset: "Dataset") -> RewriteResult: ...

class CompactionPlan:
read_version: int
tasks: List[CompactionTask]

def num_tasks(self) -> int: ...

class Compaction:
@staticmethod
def execute(
dataset: "Dataset", options: CompactionOptions
) -> CompactionMetrics: ...
@staticmethod
def plan(dataset: "Dataset", options: CompactionOptions) -> CompactionPlan: ...
@staticmethod
def commit(
dataset: "Dataset", rewrites: List[RewriteResult]
) -> CompactionMetrics: ...
Loading