Skip to content

Commit

Permalink
Support reading a slice of an index file and its records as a Ray dat…
Browse files Browse the repository at this point in the history
…a block
  • Loading branch information
coufon committed Jan 25, 2024
1 parent e26d79f commit 183c349
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 54 deletions.
2 changes: 1 addition & 1 deletion python/src/space/core/ops/insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __init__(self, storage: Storage, options: InsertOptions,
self._storage = storage
self._metadata = self._storage.metadata

self._options = InsertOptions() if options is None else options
self._options = options or InsertOptions()
self._file_options = file_options

def write(self, data: InputData) -> Optional[rt.Patch]:
Expand Down
10 changes: 7 additions & 3 deletions python/src/space/core/ops/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def __init__(self,
self._file_set = file_set

# TODO: to validate options, e.g., fields are valid.
self._options = ReadOptions() if options is None else options
self._options = options or ReadOptions()

record_fields = set(self._metadata.schema.record_fields)
self._physical_schema = arrow.arrow_schema(self._metadata.schema.fields,
Expand All @@ -90,13 +90,17 @@ def __init__(self,

def __iter__(self) -> Iterator[pa.Table]:
for file in self._file_set.index_files:
# TODO: to read row group by row group if needed, maybe not because index
# data is usually small.
# TODO: always loading the whole table is inefficient, to only load the
# required row groups.
index_data = pq.read_table(
self.full_path(file.path),
columns=self._selected_fields,
filters=self._options.filter_) # type: ignore[arg-type]

if file.selected_rows.end > 0:
length = file.selected_rows.end - file.selected_rows.start
index_data = index_data.slice(file.selected_rows.start, length)

if self._options.reference_read:
yield index_data
continue
Expand Down
19 changes: 17 additions & 2 deletions python/src/space/core/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,28 @@ class ReadOptions:
"""Options of reading data."""
# Filters on index fields.
filter_: Optional[pc.Expression] = None

# When specified, only read the given fields instead of all fields.
fields: Optional[List[str]] = None

# The snapshot to read.
snapshot_id: Optional[int] = None

# If true, read the references (e.g., address) of read record fields instead
# of values.
reference_read: bool = False

# The max number of rows per batch in read result.
# TODO: currently a batch can be smaller than batch_size, to enforce size to
# be equal to batch_size.
#
# `None` will not enforce batch size, data will be read at the step of row
# groups. For large row group size, the cost of loading all record fields may
# be expensive and slow, choose a proper batch size will help.
#
# Too small batch size causes too many Ray blocks in Ray runner and will have
# negative impact on performance.
#
# TODO: currently a batch can be smaller than batch_size (e.g., at boundary
# of row groups), to enforce size to be equal to batch_size.
batch_size: Optional[int] = None


Expand All @@ -43,8 +55,10 @@ class Range:
"""A range of a field."""
# Always inclusive.
min_: Any

# Default exclusive.
max_: Any

# Max is inclusive when true.
include_max: bool = False

Expand All @@ -54,6 +68,7 @@ class JoinOptions:
"""Options of joining data."""
# Partition the join key range into multiple ranges for parallel processing.
partition_fn: Optional[Callable[[Range], List[Range]]] = None

# TODO: to support ReadOptions for left and right views, e.g., filter_,
# snapshot_id
# TODO: to support join type in PyArrow, only `inner` is supported now.
13 changes: 12 additions & 1 deletion python/src/space/core/proto/runtime.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import "space/core/proto/metadata.proto";
package space.proto;

// Information of a data file.
// NEXT_ID: 4
// NEXT_ID: 5
message DataFile {
// Data file path.
string path = 1;
Expand All @@ -29,6 +29,17 @@ message DataFile {

// Locally assigned manifest file IDs.
int64 manifest_file_id = 3;

message Range {
// Inclusive.
int64 start = 1;
// Exclusive.
int64 end = 2;
}

// A range of selected rows in the data file.
// Used for partially reading an index file and its records.
Range selected_rows = 4;
}

// A set of associated data and manifest files.
Expand Down
20 changes: 11 additions & 9 deletions python/src/space/core/proto/runtime_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 28 additions & 3 deletions python/src/space/core/proto/runtime_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,55 @@ DESCRIPTOR: google.protobuf.descriptor.FileDescriptor
@typing_extensions.final
class DataFile(google.protobuf.message.Message):
"""Information of a data file.
NEXT_ID: 4
NEXT_ID: 5
"""

DESCRIPTOR: google.protobuf.descriptor.Descriptor

@typing_extensions.final
class Range(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

START_FIELD_NUMBER: builtins.int
END_FIELD_NUMBER: builtins.int
start: builtins.int
"""Inclusive."""
end: builtins.int
"""Exclusive."""
def __init__(
self,
*,
start: builtins.int = ...,
end: builtins.int = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["end", b"end", "start", b"start"]) -> None: ...

PATH_FIELD_NUMBER: builtins.int
STORAGE_STATISTICS_FIELD_NUMBER: builtins.int
MANIFEST_FILE_ID_FIELD_NUMBER: builtins.int
SELECTED_ROWS_FIELD_NUMBER: builtins.int
path: builtins.str
"""Data file path."""
@property
def storage_statistics(self) -> space.core.proto.metadata_pb2.StorageStatistics:
"""Storage statistics of data in the file."""
manifest_file_id: builtins.int
"""Locally assigned manifest file IDs."""
@property
def selected_rows(self) -> global___DataFile.Range:
"""A range of selected rows in the data file.
Used for partially reading an index file and its records.
"""
def __init__(
self,
*,
path: builtins.str = ...,
storage_statistics: space.core.proto.metadata_pb2.StorageStatistics | None = ...,
manifest_file_id: builtins.int = ...,
selected_rows: global___DataFile.Range | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["storage_statistics", b"storage_statistics"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["manifest_file_id", b"manifest_file_id", "path", b"path", "storage_statistics", b"storage_statistics"]) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["selected_rows", b"selected_rows", "storage_statistics", b"storage_statistics"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["manifest_file_id", b"manifest_file_id", "path", b"path", "selected_rows", b"selected_rows", "storage_statistics", b"storage_statistics"]) -> None: ...

global___DataFile = DataFile

Expand Down
2 changes: 1 addition & 1 deletion python/src/space/core/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def __init__(self,
storage: Storage,
file_options: Optional[FileOptions] = None):
StorageMixin.__init__(self, storage)
self._file_options = FileOptions() if file_options is None else file_options
self._file_options = file_options or FileOptions()

@abstractmethod
def append(self, data: InputData) -> JobResult:
Expand Down
9 changes: 5 additions & 4 deletions python/src/space/core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import annotations
from datetime import datetime
import math
from os import path
from typing import Collection, Dict, Iterator, List, Optional, Union
from typing_extensions import TypeAlias
Expand Down Expand Up @@ -370,13 +371,13 @@ def ray_dataset(self, ray_options: RayOptions,
"""Return a Ray dataset for a Space storage."""
ds = ray.data.read_datasource(ray_data_sources.SpaceDataSource(),
storage=self,
ray_options=ray_options,
read_options=read_options,
parallelism=ray_options.max_parallelism)

if read_options.batch_size is not None:
num_rows = ds.count()
assert num_rows >= 0 and read_options.batch_size > 0
return ds.repartition(num_rows // read_options.batch_size)
if (not ray_options.enable_index_file_row_range_block and
read_options.batch_size):
return ds.repartition(math.ceil(ds.count() / read_options.batch_size))

return ds

Expand Down
4 changes: 2 additions & 2 deletions python/src/space/core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,12 @@ def ray(
return ray_runners.RayMaterializedViewRunner(self, ray_options,
file_options)

def local(self) -> LocalRunner:
def local(self, file_options: Optional[FileOptions] = None) -> LocalRunner:
"""Get a runner that runs operations locally.
TODO: should use a read-only local runner.
"""
return LocalRunner(self._storage)
return LocalRunner(self._storage, file_options)

@classmethod
def create(cls, location: str, view: View, logical_plan: meta.LogicalPlan,
Expand Down
74 changes: 50 additions & 24 deletions python/src/space/ray/data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

from __future__ import annotations
from functools import partial
from typing import Any, Dict, List, Optional, TYPE_CHECKING
import math
from typing import (Any, Callable, Dict, Iterator, List, Optional,
TYPE_CHECKING)

from ray.data.block import Block, BlockMetadata
from ray.data.datasource.datasource import Datasource, Reader, ReadTask
Expand All @@ -26,6 +28,7 @@
from space.core.ops.read import FileSetReadOp
from space.core.options import ReadOptions
import space.core.proto.runtime_pb2 as rt
from space.ray.options import RayOptions

if TYPE_CHECKING:
from space.core.storage import Storage
Expand All @@ -36,8 +39,9 @@ class SpaceDataSource(Datasource):

# pylint: disable=arguments-differ,too-many-arguments
def create_reader( # type: ignore[override]
self, storage: Storage, read_options: ReadOptions) -> Reader:
return _SpaceDataSourceReader(storage, read_options)
self, storage: Storage, ray_options: RayOptions,
read_options: ReadOptions) -> Reader:
return _SpaceDataSourceReader(storage, ray_options, read_options)

def do_write(self, blocks: List[ObjectRef[Block]],
metadata: List[BlockMetadata],
Expand All @@ -53,8 +57,10 @@ def on_write_complete( # type: ignore[override]

class _SpaceDataSourceReader(Reader):

def __init__(self, storage: Storage, read_options: ReadOptions):
def __init__(self, storage: Storage, ray_options: RayOptions,
read_options: ReadOptions):
self._storage = storage
self._ray_options = ray_options
self._read_options = read_options

def estimate_inmemory_data_size(self) -> Optional[int]:
Expand All @@ -73,25 +79,45 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
self._read_options.snapshot_id)

for index_file in file_set.index_files:
stats = index_file.storage_statistics
task_file_set = rt.FileSet(index_files=[index_file])

# The metadata about the block that we know prior to actually executing
# the read task.
# TODO: to populate the storage values.
block_metadata = BlockMetadata(
num_rows=stats.num_rows,
size_bytes=None,
schema=None,
input_files=None,
exec_stats=None,
)

# TODO: A single index file (with record files) is a single block. To
# check whether row group granularity is needed.
read_fn = partial(FileSetReadOp, self._storage.location,
self._storage.metadata, task_file_set,
self._read_options)
read_tasks.append(ReadTask(read_fn, block_metadata))
num_rows = index_file.storage_statistics.num_rows

if (self._ray_options.enable_index_file_row_range_block and
self._read_options.batch_size):
batch_size = self._read_options.batch_size
num_blocks = math.ceil(num_rows / batch_size)

for i in range(num_blocks):
index_file_slice = rt.DataFile()
index_file_slice.CopyFrom(index_file)

rows = index_file_slice.selected_rows
rows.start = i * batch_size
rows.end = min((i + 1) * batch_size, num_rows)

read_tasks.append(
ReadTask(self._read_fn(index_file_slice),
_block_metadata(rows.end - rows.start)))
else:
read_tasks.append(
ReadTask(self._read_fn(index_file), _block_metadata(num_rows)))

return read_tasks

def _read_fn(self, index_file: rt.DataFile) -> Callable[..., Iterator[Block]]:
return partial(FileSetReadOp, self._storage.location,
self._storage.metadata, rt.FileSet(index_files=[index_file]),
self._read_options) # type: ignore[return-value]


def _block_metadata(num_rows: int) -> BlockMetadata:
"""The metadata about the block that we know prior to actually executing the
read task.
"""
# TODO: to populate the storage values.
return BlockMetadata(
num_rows=num_rows,
size_bytes=None,
schema=None,
input_files=None,
exec_stats=None,
)
13 changes: 13 additions & 0 deletions python/src/space/ray/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,16 @@ class RayOptions:
"""Options of Ray runners."""
# The max parallelism of computing resources to use in a Ray cluster.
max_parallelism: int = 8

# Enable using a row range of an index file as a Ray data block, in the Ray
# datasource.
#
# When disabled, the minimal Ray block is data from one index file and
# the records it references. Read batch size is achieved by repartition the
# dataset. For an index Parquet file with 1 million rows, loading the block
# needs to read all 1 million records, which is too expensive.
#
# If enabled, a Ray block size is capped by the provided read batch size.
# The cost is possible duplicated read of index files. It should be disabled
# when most data are stored in index files.
enable_index_file_row_range_block: bool = True
Loading

0 comments on commit 183c349

Please sign in to comment.