Skip to content

Commit

Permalink
Python api and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Aug 14, 2023
1 parent 6bfc02b commit aaef7bb
Show file tree
Hide file tree
Showing 12 changed files with 486 additions and 139 deletions.
2 changes: 2 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def setup(app):
napoleon_include_private_with_doc = False
napoleon_include_special_with_doc = False

autodoc_typehints = "signature"

# Add any paths that contain templates here, relative to this directory.
templates_path = ["_templates"]

Expand Down
1 change: 1 addition & 0 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pyo3 = { version = "0.19", features = ["extension-module", "abi3-py38"] }
tokio = { version = "1.23", features = ["rt-multi-thread"] }
uuid = "1.3.0"
serde_json = "1"
num_cpus = "1"

[build-dependencies]
prost-build = "0.11"
14 changes: 7 additions & 7 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import numpy as np
import pyarrow as pa
import pyarrow.dataset
from lance.optimize import Compaction
from pyarrow import RecordBatch, Schema
from pyarrow._compute import Expression

Expand All @@ -32,7 +33,6 @@
from .lance import CompactionMetrics as CompactionMetrics
from .lance import __version__ as __version__
from .lance import _Dataset, _Scanner, _write_dataset
from .dataset.optimize import Compaction

try:
import pandas as pd
Expand Down Expand Up @@ -960,7 +960,7 @@ def compact_files(
max_rows_per_group: int = 1024,
materialize_deletions: bool = True,
materialize_deletions_threshold: float = 0.1,
num_concurrent_jobs: Optional[int] = None,
num_threads: Optional[int] = None,
) -> CompactionMetrics:
"""Compacts small files in the dataset, reducing total number of files.
Expand All @@ -985,8 +985,8 @@ def compact_files(
materialize_deletions_threshold: float, default 0.1
The fraction of original rows that are soft deleted in a fragment
before the fragment is a candidate for compaction.
num_concurrent_jobs: int, optional
The number of concurrent jobs to use for compaction. If not
num_threads: int, optional
The number of threads to use when performing compaction. If not
specified, defaults to the number of cores on the machine.
Returns
Expand All @@ -996,16 +996,16 @@ def compact_files(
See Also
--------
lance.dataset.optimize.Compaction
lance.optimize.Compaction
"""
opts = dict(
target_rows_per_fragment=target_rows_per_fragment,
max_rows_per_group=max_rows_per_group,
materialize_deletions=materialize_deletions,
materialize_deletions_threshold=materialize_deletions_threshold,
num_concurrent_jobs=num_concurrent_jobs,
num_threads=num_threads,
)
return Compaction.execute(self._dataset._ds, opts)
return Compaction.execute(self._dataset, opts)


def write_dataset(
Expand Down
21 changes: 0 additions & 21 deletions python/python/lance/dataset/optimize.py

This file was deleted.

8 changes: 4 additions & 4 deletions python/python/lance/fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import json
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, Optional, Union
from typing import TYPE_CHECKING, Callable, Iterator, Optional, Union

try:
import pandas as pd
Expand Down Expand Up @@ -54,12 +54,12 @@ def __eq__(self, other: object) -> bool:
return False
return self._metadata.__eq__(other._metadata)

def to_json(self) -> Dict[str, Any]:
def to_json(self) -> str:
"""Serialize :class:`FragmentMetadata` to a JSON blob"""
return json.loads(self._metadata.json())

@staticmethod
def from_json(json_data: Dict[str, Any]) -> FragmentMetadata:
def from_json(json_data: str) -> FragmentMetadata:
"""Reconstruct :class:`FragmentMetadata` from a JSON blob"""
return FragmentMetadata(json_data)

Expand Down Expand Up @@ -328,4 +328,4 @@ def metadata(self) -> FragmentMetadata:
FragmentMetadata
"""

return FragmentMetadata(self._fragment.metadata())
return FragmentMetadata(self._fragment.metadata().json())
5 changes: 0 additions & 5 deletions python/python/lance/lance.pyi

This file was deleted.

53 changes: 53 additions & 0 deletions python/python/lance/optimize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright (c) 2023. Lance Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional, TypedDict

# Re-exported from native module. See src/dataset/optimize.rs for implementation.
from .lance import Compaction as Compaction
from .lance import CompactionMetrics as CompactionMetrics
from .lance import CompactionTask as CompactionTask
from .lance import RewriteResult as RewriteResult


class CompactionOptions(TypedDict):
"""Options for compaction."""

target_rows_per_fragment: Optional[int]
"""
The target number of rows per fragment. This is the number of rows
that will be in each fragment after compaction. (default: 1024*1024)
"""
max_rows_per_group: Optional[int]
"""
Max number of rows per group. This does not affect which fragments
need compaction, but does affect how they are re-written if selected.
(default: 1024)
"""
materialize_deletions: Optional[bool]
"""
Whether to compact fragments with soft deleted rows so they are no
longer present in the file. (default: True)
"""
materialize_deletions_threadhold: Optional[float]
"""
The fraction of original rows that are soft deleted in a fragment
before the fragment is a candidate for compaction.
(default: 0.1 = 10%)
"""
num_threads: Optional[int]
"""
The number of threads to use when performing compaction. If not
specified, defaults to the number of cores on the machine.
"""
41 changes: 41 additions & 0 deletions python/python/lance/optimize.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from typing import List

from lance import Dataset
from lance.fragment import FragmentMetadata
from lance.optimize import CompactionOptions

class CompactionMetrics:
fragments_removed: int
fragments_added: int
files_removed: int
files_added: int

class RewriteResult:
read_version: int
metrics: CompactionMetrics
old_fragments: List["FragmentMetadata"]
new_fragments: List["FragmentMetadata"]

class CompactionTask:
read_version: int
fragments: List["FragmentMetadata"]

def execute(self, dataset: "Dataset") -> RewriteResult: ...

class CompactionPlan:
read_version: int
tasks: List[CompactionTask]

def num_tasks(self) -> int: ...

class Compaction:
@staticmethod
def execute(
dataset: "Dataset", options: CompactionOptions
) -> CompactionMetrics: ...
@staticmethod
def plan(dataset: "Dataset", options: CompactionOptions) -> CompactionPlan: ...
@staticmethod
def commit(
dataset: "Dataset", rewrites: List[RewriteResult]
) -> CompactionMetrics: ...
43 changes: 38 additions & 5 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import pyarrow.parquet as pq
import pytest
from lance.commit import CommitConflictError
from lance.lance import Compaction

# Various valid inputs for write_dataset
input_schema = pa.schema([pa.field("a", pa.float64()), pa.field("b", pa.int64())])
Expand Down Expand Up @@ -607,25 +608,57 @@ def test_dataset_optimize(tmp_path: Path):
data = pa.table({"a": range(1000), "b": range(1000)})

dataset = lance.write_dataset(data, base_dir, max_rows_per_file=100)
assert dataset.version == 1
assert len(dataset.get_fragments()) == 10

metrics = dataset.optimize.compact_files(
target_rows_per_fragment=1000,
materialize_deletions=False,
num_concurrent_jobs=1,
num_threads=1,
)

assert metrics.fragments_removed == 10
assert metrics.fragments_added == 1
assert metrics.files_removed == 10
assert metrics.files_added == 1

assert dataset.version == 2


def test_dataset_distributed_optimize(tmp_path: Path):
base_dir = tmp_path / "dataset"
data = pa.table({"a": range(1000), "b": range(1000)})
data = pa.table({"a": range(800), "b": range(800)})

dataset = lance.write_dataset(data, base_dir, max_rows_per_file=100)
assert len(dataset.get_fragments()) == 10
dataset = lance.write_dataset(data, base_dir, max_rows_per_file=200)
fragments = dataset.get_fragments()
assert len(fragments) == 4

plan = Compaction.plan(dataset)
plan = Compaction.plan(
dataset, options=dict(target_rows_per_fragment=400, num_threads=None)
)
assert plan.read_version == 1
assert plan.num_tasks() == 2
assert plan.tasks[0].fragments == [frag.metadata for frag in fragments[0:2]]
assert plan.tasks[1].fragments == [frag.metadata for frag in fragments[2:4]]
assert repr(plan) == "CompactionPlan(read_version=1, tasks=<2 compaction tasks>)"

pickled_task = pickle.dumps(plan.tasks[0])
task = pickle.loads(pickled_task)
assert task == plan.tasks[0]

result1 = plan.tasks[0].execute(dataset)
result1.metrics.fragments_removed == 2
result1.metrics.fragments_added == 1

pickled_result = pickle.dumps(result1)
result = pickle.loads(pickled_result)
assert result == result1
assert re.match(
r"RewriteResult\(read_version=1, new_fragments=\[.+\], old_fragments=\[.+\]\)",
repr(result),
)

metrics = Compaction.commit(dataset, [result1])
assert metrics.fragments_removed == 2
assert metrics.fragments_added == 1
assert dataset.version == 2
Loading

0 comments on commit aaef7bb

Please sign in to comment.