Skip to content

Commit

Permalink
Move file options to the main options class
Browse files Browse the repository at this point in the history
  • Loading branch information
coufon committed Feb 3, 2024
1 parent 9d51277 commit 5f40859
Show file tree
Hide file tree
Showing 18 changed files with 58 additions and 61 deletions.
3 changes: 1 addition & 2 deletions python/src/space/core/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import pyarrow as pa
from substrait.algebra_pb2 import ReadRel, Rel

from space.core.ops.utils import FileOptions
from space.core.options import JoinOptions, ReadOptions
from space.core.options import FileOptions, JoinOptions, ReadOptions
from space.core.runners import LocalRunner
from space.core.storage import Storage
from space.core.transform.plans import LogicalPlanBuilder
Expand Down
15 changes: 0 additions & 15 deletions python/src/space/core/fs/array_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,11 @@
#
"""ArrayRecord file utilities."""

from dataclasses import dataclass
from typing import List, Optional

from space.core.utils.lazy_imports_utils import array_record_module as ar


# pylint: disable=line-too-long
@dataclass
class ArrayRecordOptions:
"""Options of ArrayRecord file writer."""
# Max uncompressed bytes per file.
max_uncompressed_file_bytes = 100 * 1024 * 1024
# ArrayRecord lib options.
# See https://github.com/google/array_record/blob/2ac1d904f6be31e5aa2f09549774af65d84bff5a/cpp/array_record_writer.h#L83
# Group size 1 maximizes random read performance.
# Match the options of TFDS:
# https://github.com/tensorflow/datasets/blob/92ebd18102b62cf85557ba4b905c970203d8914d/tensorflow_datasets/core/sequential_writer.py#L108
options: str = "group_size:1"


def read_record_file(file_path: str,
positions: Optional[List[int]] = None) -> List[bytes]:
"""Read records of an ArrayRecord file.
Expand Down
10 changes: 0 additions & 10 deletions python/src/space/core/fs/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,12 @@
#
"""Parquet file utilities."""

from dataclasses import dataclass
from typing import List

import pyarrow as pa
import pyarrow.parquet as pq


@dataclass
class ParquetWriterOptions:
"""Options of Parquet file writer."""
# Max uncompressed bytes per row group.
max_uncompressed_row_group_bytes = 100 * 1024
# Max uncompressed bytes per file.
max_uncompressed_file_bytes: int = 1 * 1024 * 1024


def write_parquet_file(file_path: str, schema: pa.Schema,
data: List[pa.Table]) -> pq.FileMetaData:
"""Materialize a single Parquet file."""
Expand Down
2 changes: 1 addition & 1 deletion python/src/space/core/loaders/array_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as rt
from space.core.ops import utils
from space.core.ops.utils import FileOptions
from space.core.ops.append import LocalAppendOp
from space.core.options import FileOptions
from space.core.schema import arrow
from space.core.serializers import DictSerializer
from space.core.utils.paths import StoragePathsMixin
Expand Down
2 changes: 1 addition & 1 deletion python/src/space/core/ops/append.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
from space.core.manifests import IndexManifestWriter
from space.core.manifests import RecordManifestWriter
from space.core.ops import utils
from space.core.ops.utils import FileOptions
from space.core.ops.base import BaseOp, InputData
from space.core.options import FileOptions
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as rt
from space.core.schema import arrow
Expand Down
2 changes: 1 addition & 1 deletion python/src/space/core/ops/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
from pyroaring import BitMap # type: ignore[import-not-found]

from space.core.ops import utils
from space.core.ops.utils import FileOptions
from space.core.ops.append import LocalAppendOp
from space.core.ops.base import BaseOp
from space.core.options import FileOptions
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as rt
from space.core.utils import errors
Expand Down
3 changes: 1 addition & 2 deletions python/src/space/core/ops/insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
from space.core.ops.base import BaseOp, InputData
from space.core.ops.delete import FileSetDeleteOp
from space.core.ops.read import FileSetReadOp
from space.core.ops.utils import FileOptions
from space.core.options import ReadOptions
from space.core.options import FileOptions, ReadOptions
import space.core.proto.metadata_pb2 as meta
import space.core.proto.runtime_pb2 as rt
from space.core.storage import Storage
Expand Down
12 changes: 0 additions & 12 deletions python/src/space/core/ops/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,18 @@
#
"""Utilities for operation classes."""

from dataclasses import dataclass, field as dataclass_field
from typing import List, Optional, Set

import numpy as np
import pyarrow as pa
import pyarrow.compute as pc

from space.core.fs.array_record import ArrayRecordOptions
from space.core.fs.parquet import ParquetWriterOptions
from space.core.schema import arrow
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as rt
from space.core.utils import errors


@dataclass
class FileOptions:
"""Options of file IO."""
parquet_options: ParquetWriterOptions = dataclass_field(
default_factory=ParquetWriterOptions)
array_record_options: ArrayRecordOptions = dataclass_field(
default_factory=ArrayRecordOptions)


def update_index_storage_stats(
base: meta.StorageStatistics,
update: meta.StorageStatistics,
Expand Down
44 changes: 43 additions & 1 deletion python/src/space/core/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#
"""Options of Space core lib."""

from dataclasses import dataclass
from dataclasses import dataclass, field as dataclass_field
from typing import Any, Callable, List, Optional

import pyarrow.compute as pc
Expand Down Expand Up @@ -56,6 +56,48 @@ def __post_init__(self):
self.batch_size = self.batch_size or DEFAULT_READ_BATCH_SIZE


@dataclass
class ParquetWriterOptions:
"""Options of Parquet file writer."""
# Max uncompressed bytes per row group.
max_uncompressed_row_group_bytes: int = 100 * 1024

# Max uncompressed bytes per file.
max_uncompressed_file_bytes: int = 1 * 1024 * 1024


# pylint: disable=line-too-long
@dataclass
class ArrayRecordOptions:
"""Options of ArrayRecord file writer."""
# Max uncompressed bytes per file.
max_uncompressed_file_bytes: int = 100 * 1024 * 1024

# ArrayRecord lib options.
#
# See https://github.com/google/array_record/blob/2ac1d904f6be31e5aa2f09549774af65d84bff5a/cpp/array_record_writer.h#L83
# Default group size 1 maximizes random read performance.
# It matches the options of TFDS:
# https://github.com/tensorflow/datasets/blob/92ebd18102b62cf85557ba4b905c970203d8914d/tensorflow_datasets/core/sequential_writer.py#L108
#
# A larger group size improves read throughput from Cloud Storage, because
# each RPC reads a larger chunk of data, which performs better on Cloud
# Storage.
options: str = "group_size:1"


@dataclass
class FileOptions:
"""Options of file IO."""
# Parquet file options.
parquet_options: ParquetWriterOptions = dataclass_field(
default_factory=ParquetWriterOptions)

# ArrayRecord file options.
array_record_options: ArrayRecordOptions = dataclass_field(
default_factory=ArrayRecordOptions)


@dataclass
class Range:
"""A range of a field."""
Expand Down
3 changes: 1 addition & 2 deletions python/src/space/core/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@
from space.core.loaders.array_record import LocalArrayRecordLoadOp
from space.core.loaders.parquet import LocalParquetLoadOp
from space.core.ops.append import LocalAppendOp
from space.core.ops.append import FileOptions
from space.core.ops.base import InputData, InputIteratorFn
from space.core.ops.change_data import ChangeData, read_change_data
from space.core.ops.delete import FileSetDeleteOp
from space.core.ops.insert import InsertOptions, LocalInsertOp
from space.core.ops.read import FileSetReadOp
from space.core.options import JoinOptions, ReadOptions
from space.core.options import FileOptions, JoinOptions, ReadOptions
import space.core.proto.runtime_pb2 as rt
from space.core.storage import Storage, Version
from space.core.utils import errors
Expand Down
4 changes: 1 addition & 3 deletions python/src/space/core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import pyarrow as pa
from substrait.algebra_pb2 import Rel

from space.core.options import JoinOptions
from space.core.fs.factory import create_fs
from space.core.ops.utils import FileOptions
from space.core.options import ReadOptions
from space.core.options import FileOptions, JoinOptions, ReadOptions
import space.core.proto.metadata_pb2 as meta
from space.core.schema import FieldIdManager
from space.core.storage import Storage
Expand Down
2 changes: 1 addition & 1 deletion python/src/space/ray/ops/append.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import ray

from space.core.ops import utils
from space.core.ops.utils import FileOptions
from space.core.ops.append import BaseAppendOp, LocalAppendOp
from space.core.ops.base import InputData, InputIteratorFn
from space.core.options import FileOptions
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as rt
from space.ray.options import RayOptions
Expand Down
2 changes: 1 addition & 1 deletion python/src/space/ray/ops/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import ray

from space.core.ops import utils
from space.core.ops.utils import FileOptions
from space.core.ops.delete import BaseDeleteOp, FileSetDeleteOp
from space.core.options import FileOptions
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as rt
from space.core.storage import Storage
Expand Down
2 changes: 1 addition & 1 deletion python/src/space/ray/ops/insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from space.ray.ops.append import RayAppendOp
from space.core.ops.insert import InsertOptions, LocalInsertOp
from space.core.ops.insert import filter_matched
from space.core.ops.utils import FileOptions
from space.core.options import FileOptions
import space.core.proto.metadata_pb2 as meta
import space.core.proto.runtime_pb2 as rt
from space.core.storage import Storage
Expand Down
3 changes: 1 addition & 2 deletions python/src/space/ray/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@
from space.core.runners import BaseReadOnlyRunner, BaseReadWriteRunner
from space.core.runners import StorageMixin
from space.core.ops import utils
from space.core.ops.utils import FileOptions
from space.core.ops.base import InputData, InputIteratorFn
from space.core.ops.change_data import ChangeData, ChangeType
from space.core.ops.delete import FileSetDeleteOp
from space.core.ops.insert import InsertOptions
from space.core.options import JoinOptions, ReadOptions
from space.core.options import FileOptions, JoinOptions, ReadOptions
import space.core.proto.runtime_pb2 as rt
from space.core.utils import errors
from space.ray.ops.append import RayAppendOp
Expand Down
5 changes: 2 additions & 3 deletions python/tests/core/ops/test_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import pyarrow.parquet as pq

from space.core.ops.append import LocalAppendOp
from space.core.ops.utils import FileOptions
from space.core.options import FileOptions
import space.core.proto.metadata_pb2 as meta
from space.core.storage import Storage

Expand Down Expand Up @@ -64,8 +64,7 @@ def test_write_pydict_all_types(self, tmp_path, all_types_schema,
assert patch.storage_statistics_update == meta.StorageStatistics(
num_rows=5, index_compressed_bytes=114, index_uncompressed_bytes=126)

def test_write_pydict_with_record_fields(self, tmp_path,
record_fields_schema,
def test_write_pydict_with_record_fields(self, tmp_path, record_fields_schema,
record_fields_input_data):
location = tmp_path / "dataset"
storage = Storage.create(location=str(location),
Expand Down
2 changes: 1 addition & 1 deletion python/tests/core/ops/test_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import pyarrow.compute as pc

from space.core.ops.append import LocalAppendOp
from space.core.ops.utils import FileOptions
from space.core.ops.delete import FileSetDeleteOp
from space.core.ops.read import FileSetReadOp
from space.core.options import FileOptions
from space.core.storage import Storage

_default_file_options = FileOptions()
Expand Down
3 changes: 1 addition & 2 deletions python/tests/core/ops/test_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
import pyarrow.compute as pc

from space.core.ops.append import LocalAppendOp
from space.core.ops.utils import FileOptions
from space.core.ops.read import FileSetReadOp
from space.core.options import ReadOptions
from space.core.options import FileOptions, ReadOptions
from space.core.storage import Storage

_default_file_options = FileOptions()
Expand Down

0 comments on commit 5f40859

Please sign in to comment.