diff --git a/python/src/space/core/datasets.py b/python/src/space/core/datasets.py index 546252c..6912354 100644 --- a/python/src/space/core/datasets.py +++ b/python/src/space/core/datasets.py @@ -25,7 +25,7 @@ from space.core.runners import LocalRunner from space.core.storage import Storage from space.core.transform.plans import LogicalPlanBuilder -from space.core.utils.lazy_imports_utils import ray, ray_runners +from space.core.utils.lazy_imports_utils import ray, ray_runners # pylint: disable=unused-import from space.core.views import View from space.ray.options import RayOptions @@ -107,16 +107,16 @@ def to_relation(self, builder: LogicalPlanBuilder) -> Rel: return Rel(read=ReadRel(named_table=ReadRel.NamedTable(names=[location]), base_schema=self._storage.metadata.schema.fields)) - def process_source(self, data: pa.Table) -> ray.Dataset: + def process_source(self, data: ray.data.Dataset) -> ray.data.Dataset: # Dataset is the source, there is no transform, so simply return the data. - return ray.data.from_arrow(data) + return data def ray_dataset( self, ray_options: RayOptions, read_options: ReadOptions = ReadOptions(), join_options: JoinOptions = JoinOptions() - ) -> ray.Dataset: + ) -> ray.data.Dataset: """Return a Ray dataset for a Space dataset.""" return self._storage.ray_dataset(ray_options, read_options) diff --git a/python/src/space/core/ops/append.py b/python/src/space/core/ops/append.py index ca92393..0c39c5d 100644 --- a/python/src/space/core/ops/append.py +++ b/python/src/space/core/ops/append.py @@ -238,7 +238,7 @@ def _finish_index_writer(self) -> None: update=stats) self._patch.change_log.added_rows.append( - meta.RowBitmap(file=file_path, all_rows=True)) + meta.RowBitmap(file=file_path, all_rows=True, num_rows=stats.num_rows)) self._index_writer_info = None self._cached_index_file_bytes = 0 diff --git a/python/src/space/core/ops/change_data.py b/python/src/space/core/ops/change_data.py index 56839c3..1ccdefb 100644 --- a/python/src/space/core/ops/change_data.py +++ b/python/src/space/core/ops/change_data.py @@ -16,11 +16,10 @@ from dataclasses import dataclass from enum import Enum -from typing import Iterator, List +from typing import Any, Iterable, Iterator, List import pyarrow as pa -from space.core.fs.base import BaseFileSystem from space.core.fs.factory import create_fs from space.core.ops.read import FileSetReadOp from space.core.options import ReadOptions @@ -51,14 +50,14 @@ class ChangeData: # The change type. type_: ChangeType - # The change data. - data: pa.Table + # The change data (pa.Table or ray.data.Dataset). + # NOTE: type annotation not used, because of Ray lazy import. + data: Any -def read_change_data(storage: Storage, start_snapshot_id: int, - end_snapshot_id: int, - read_options: ReadOptions) -> Iterator[ChangeData]: - """Read change data from a start to an end snapshot. +def ordered_snapshot_ids(storage: Storage, start_snapshot_id: int, + end_snapshot_id: int) -> List[int]: + """Return a list of ordered snapshot IDs between two snapshots. start_snapshot_id is excluded; end_snapshot_id is included. """ @@ -81,13 +80,23 @@ def read_change_data(storage: Storage, start_snapshot_id: int, f"Start snapshot {start_snapshot_id} is not the ancestor of end " f"snapshot {end_snapshot_id}") - for snapshot_id in all_snapshot_ids[1:]: - for result in iter( - _LocalChangeDataReadOp(storage, snapshot_id, read_options)): - yield result + return all_snapshot_ids[1:] + + +def read_change_data(storage: Storage, start_snapshot_id: int, + end_snapshot_id: int, + read_options: ReadOptions) -> Iterator[ChangeData]: + """Read change data from a start to an end snapshot. + + start_snapshot_id is excluded; end_snapshot_id is included. + """ + for snapshot_id in ordered_snapshot_ids(storage, start_snapshot_id, + end_snapshot_id): + for change in LocalChangeDataReadOp(storage, snapshot_id, read_options): + yield change -class _LocalChangeDataReadOp(StoragePathsMixin): +class LocalChangeDataReadOp(StoragePathsMixin): """Read changes of data from a given snapshot of a dataset.""" def __init__(self, storage: Storage, snapshot_id: int, @@ -103,39 +112,45 @@ def __init__(self, storage: Storage, snapshot_id: int, raise errors.VersionNotFoundError( f"Change data read can't find snapshot ID {snapshot_id}") - snapshot = self._metadata.snapshots[snapshot_id] - - fs = create_fs(self._location) - change_log_file = self._storage.full_path(snapshot.change_log_file) - self._change_log = _read_change_log_proto(fs, change_log_file) + self._snapshot = self._metadata.snapshots[snapshot_id] + self._change_log = _read_change_log_proto( + self._storage.full_path(self._snapshot.change_log_file)) def __iter__(self) -> Iterator[ChangeData]: # Must return deletion first, otherwise when the upstream re-apply # deletions and additions, it may delete newly added data. # TODO: to enforce this check upstream, or merge deletion+addition as a # update. - for bitmap in self._change_log.deleted_rows: - for change in self._read_bitmap_rows(ChangeType.DELETE, bitmap): - yield change - - for bitmap in self._change_log.added_rows: - for change in self._read_bitmap_rows(ChangeType.ADD, bitmap): - yield change - - def _read_bitmap_rows(self, change_type: ChangeType, - bitmap: meta.RowBitmap) -> Iterator[ChangeData]: - file_set = rt.FileSet(index_files=[rt.DataFile(path=bitmap.file)]) - read_op = FileSetReadOp( - self._storage.location, - self._metadata, - file_set, - row_bitmap=(None if bitmap.all_rows else bitmap.roaring_bitmap), - options=self._read_options) - - for data in read_op: - yield ChangeData(self._snapshot_id, change_type, data) - - -def _read_change_log_proto(fs: BaseFileSystem, - file_path: str) -> meta.ChangeLog: + for data in self._read_op(self._change_log.deleted_rows): + yield ChangeData(self._snapshot_id, ChangeType.DELETE, data) + + for data in self._read_op(self._change_log.added_rows): + yield ChangeData(self._snapshot_id, ChangeType.ADD, data) + + def _read_op(self, bitmaps: Iterable[meta.RowBitmap]) -> Iterator[pa.Table]: + return iter( + FileSetReadOp(self._storage.location, + self._metadata, + self._bitmaps_to_file_set(bitmaps), + options=self._read_options)) + + @classmethod + def _bitmaps_to_file_set(cls, + bitmaps: Iterable[meta.RowBitmap]) -> rt.FileSet: + return rt.FileSet( + index_files=[_bitmap_to_index_file(bitmap) for bitmap in bitmaps]) + + +def _bitmap_to_index_file(bitmap: meta.RowBitmap) -> rt.DataFile: + index_file = rt.DataFile( + path=bitmap.file, + storage_statistics=meta.StorageStatistics(num_rows=bitmap.num_rows)) + if not bitmap.all_rows: + index_file.row_bitmap.CopyFrom(bitmap) + + return index_file + + +def _read_change_log_proto(file_path: str) -> meta.ChangeLog: + fs = create_fs(file_path) return fs.read_proto(file_path, meta.ChangeLog()) diff --git a/python/src/space/core/ops/delete.py b/python/src/space/core/ops/delete.py index a4f0179..0f18e04 100644 --- a/python/src/space/core/ops/delete.py +++ b/python/src/space/core/ops/delete.py @@ -161,12 +161,12 @@ def delete(self) -> Optional[rt.Patch]: # Compute storage statistics update. survivor_stats = _read_index_statistics(survivor_index_manifests) - reinsert_stats = (reinsert_patch.storage_statistics_update if - reinsert_patch is not None else meta.StorageStatistics()) + reinsert_stats = (reinsert_patch.storage_statistics_update if reinsert_patch + is not None else meta.StorageStatistics()) deleted_compressed_bytes = (reinsert_stats.index_compressed_bytes + survivor_stats.index_compressed_bytes - ) - stats_before_delete.index_compressed_bytes + ) - stats_before_delete.index_compressed_bytes deleted_uncompressed_bytes = ( reinsert_stats.index_uncompressed_bytes + survivor_stats.index_uncompressed_bytes @@ -205,8 +205,8 @@ def _validate_files(file_set: rt.FileSet) -> bool: deletion. """ for f in file_set.index_files: - if (not f.path or f.storage_statistics.num_rows == 0 - or f.manifest_file_id == 0): + if (not f.path or f.storage_statistics.num_rows == 0 or + f.manifest_file_id == 0): return False return len(file_set.index_manifest_files) > 0 @@ -214,7 +214,8 @@ def _validate_files(file_set: rt.FileSet) -> bool: def _build_bitmap(file: rt.DataFile, index_data: pa.Table, all_deleted: bool) -> meta.RowBitmap: - row_bitmap = meta.RowBitmap(file=file.path) + row_bitmap = meta.RowBitmap(file=file.path, + num_rows=file.storage_statistics.num_rows) if all_deleted: row_bitmap.all_rows = True else: diff --git a/python/src/space/core/ops/read.py b/python/src/space/core/ops/read.py index 4675f0f..9094b91 100644 --- a/python/src/space/core/ops/read.py +++ b/python/src/space/core/ops/read.py @@ -31,7 +31,6 @@ from space.core.schema import arrow from space.core.schema.constants import FILE_PATH_FIELD, ROW_ID_FIELD from space.core.schema import utils as schema_utils -from space.core.utils import errors from space.core.utils.paths import StoragePathsMixin _RECORD_KEY_FIELD = "__RECORD_KEY" @@ -59,7 +58,6 @@ def __init__(self, location: str, metadata: meta.StorageMetadata, file_set: rt.FileSet, - row_bitmap: Optional[bytes] = None, options: Optional[ReadOptions] = None): StoragePathsMixin.__init__(self, location) @@ -67,7 +65,6 @@ def __init__(self, self._metadata = metadata self._file_set = file_set - self._row_bitmap = row_bitmap # TODO: to validate options, e.g., fields are valid. self._options = options or ReadOptions() @@ -95,13 +92,7 @@ def __init__(self, def __iter__(self) -> Iterator[pa.Table]: for file in self._file_set.index_files: - row_range_read = file.selected_rows.end > 0 - - # row_range_read is used by Ray SpaceDataSource. row_bitmap is used by Ray - # diff/refresh, which does not use Ray SpaceDataSource. - if row_range_read and self._row_bitmap is not None: - raise errors.SpaceRuntimeError( - "Row mask is not supported when row range read is enabled") + row_range_read = file.HasField("row_slice") # TODO: always loading the whole table is inefficient, to only load the # required row groups. @@ -110,13 +101,13 @@ def __iter__(self) -> Iterator[pa.Table]: columns=self._selected_fields, filters=self._options.filter_) # type: ignore[arg-type] - if self._row_bitmap is not None: - index_data = index_data.filter( - mask=_bitmap_mask(self._row_bitmap, index_data.num_rows)) - - if row_range_read: - length = file.selected_rows.end - file.selected_rows.start - index_data = index_data.slice(file.selected_rows.start, length) + if file.HasField("row_bitmap") and not file.row_bitmap.all_rows: + index_data = index_data.filter(mask=_bitmap_mask( + file.row_bitmap.roaring_bitmap, index_data.num_rows, + file.row_slice if row_range_read else None)) + elif row_range_read: + length = file.row_slice.end - file.row_slice.start + index_data = index_data.slice(file.row_slice.start, length) if self._options.reference_read: yield index_data @@ -133,8 +124,8 @@ def __iter__(self) -> Iterator[pa.Table]: (column_id, arrow.binary_field(self._record_fields_dict[field_id]))) - # The batch size enforcement is applied as row range. if row_range_read: + # The batch size is already applied via row slice range. yield self._read_index_and_record(index_data, index_column_ids, record_columns) else: @@ -214,11 +205,13 @@ def read_record_column(paths: StoragePathsMixin, return pa.array(sorted_values, pa.binary()) # type: ignore[return-value] -def _bitmap_mask(serialized_bitmap: bytes, num_rows: int) -> List[bool]: +def _bitmap_mask(serialized_bitmap: bytes, num_rows: int, + row_slice: Optional[rt.DataFile.Range]) -> List[bool]: bitmap = BitMap.deserialize(serialized_bitmap) mask = [False] * num_rows for row_id in bitmap.to_array(): - mask[row_id] = True + if row_slice is None or row_slice.start <= row_id < row_slice.end: + mask[row_id] = True return mask diff --git a/python/src/space/core/proto/metadata.proto b/python/src/space/core/proto/metadata.proto index 54d34ec..8460dc2 100644 --- a/python/src/space/core/proto/metadata.proto +++ b/python/src/space/core/proto/metadata.proto @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - +// // Proto messages used by Space metadata persistence. syntax = "proto3"; @@ -160,6 +160,9 @@ message StorageStatistics { // Change log stores changes made by a snapshot. // NEXT_ID: 3 message ChangeLog { + // TODO: to replace RowBitmap list by runtime.FileSet (not backward + // compatible). + // Rows deleted in this snapshot. repeated RowBitmap deleted_rows = 1; @@ -169,7 +172,8 @@ message ChangeLog { // Mark rows in a file by bitmap. -// NEXT_ID: 4 +// TODO: to replace it by runtime.DataFile (not backward compatible). +// NEXT_ID: 5 message RowBitmap { // File path that the bit map applies to. string file = 1; @@ -181,6 +185,9 @@ message RowBitmap { // Roaring bitmap. bytes roaring_bitmap = 3; } + + // Total number of rows in the file. + int64 num_rows = 4; } // Store the logical plan of a transform. diff --git a/python/src/space/core/proto/metadata_pb2.py b/python/src/space/core/proto/metadata_pb2.py index 041f361..11ea287 100644 --- a/python/src/space/core/proto/metadata_pb2.py +++ b/python/src/space/core/proto/metadata_pb2.py @@ -16,7 +16,7 @@ from substrait import type_pb2 as substrait_dot_type__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1fspace/core/proto/metadata.proto\x12\x0bspace.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x14substrait/plan.proto\x1a\x14substrait/type.proto\"#\n\nEntryPoint\x12\x15\n\rmetadata_file\x18\x01 \x01(\t\"\xe9\x04\n\x0fStorageMetadata\x12/\n\x0b\x63reate_time\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x10last_update_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12/\n\x04type\x18\x03 \x01(\x0e\x32!.space.proto.StorageMetadata.Type\x12#\n\x06schema\x18\x04 \x01(\x0b\x32\x13.space.proto.Schema\x12\x1b\n\x13\x63urrent_snapshot_id\x18\x05 \x01(\x03\x12>\n\tsnapshots\x18\x06 \x03(\x0b\x32+.space.proto.StorageMetadata.SnapshotsEntry\x12.\n\x0clogical_plan\x18\x07 \x01(\x0b\x32\x18.space.proto.LogicalPlan\x12\x34\n\x04refs\x18\x08 \x03(\x0b\x32&.space.proto.StorageMetadata.RefsEntry\x1aG\n\x0eSnapshotsEntry\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12$\n\x05value\x18\x02 \x01(\x0b\x32\x15.space.proto.Snapshot:\x02\x38\x01\x1aK\n\tRefsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12-\n\x05value\x18\x02 \x01(\x0b\x32\x1e.space.proto.SnapshotReference:\x02\x38\x01\"@\n\x04Type\x12\x14\n\x10TYPE_UNSPECIFIED\x10\x00\x12\x0b\n\x07\x44\x41TASET\x10\x01\x12\x15\n\x11MATERIALIZED_VIEW\x10\x02\"]\n\x06Schema\x12&\n\x06\x66ields\x18\x01 \x01(\x0b\x32\x16.substrait.NamedStruct\x12\x14\n\x0cprimary_keys\x18\x02 \x03(\t\x12\x15\n\rrecord_fields\x18\x03 \x03(\t\"\xa0\x02\n\x08Snapshot\x12\x13\n\x0bsnapshot_id\x18\x01 \x01(\x03\x12/\n\x0b\x63reate_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x0emanifest_files\x18\x03 \x01(\x0b\x32\x1a.space.proto.ManifestFilesH\x00\x12:\n\x12storage_statistics\x18\x04 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12\x17\n\x0f\x63hange_log_file\x18\x05 \x01(\t\x12\x1f\n\x12parent_snapshot_id\x18\x06 \x01(\x03H\x01\x88\x01\x01\x42\x0b\n\tdata_infoB\x15\n\x13_parent_snapshot_id\"\xb8\x01\n\x11SnapshotReference\x12\x16\n\x0ereference_name\x18\x01 \x01(\t\x12\x13\n\x0bsnapshot_id\x18\x02 \x01(\x03\x12:\n\x04type\x18\x03 \x01(\x0e\x32,.space.proto.SnapshotReference.ReferenceType\":\n\rReferenceType\x12\x14\n\x10TYPE_UNSPECIFIED\x10\x00\x12\x07\n\x03TAG\x10\x01\x12\n\n\x06\x42RANCH\x10\x02\"L\n\rManifestFiles\x12\x1c\n\x14index_manifest_files\x18\x01 \x03(\t\x12\x1d\n\x15record_manifest_files\x18\x02 \x03(\t\"\x8a\x01\n\x11StorageStatistics\x12\x10\n\x08num_rows\x18\x01 \x01(\x03\x12\x1e\n\x16index_compressed_bytes\x18\x02 \x01(\x03\x12 \n\x18index_uncompressed_bytes\x18\x03 \x01(\x03\x12!\n\x19record_uncompressed_bytes\x18\x04 \x01(\x03\"e\n\tChangeLog\x12,\n\x0c\x64\x65leted_rows\x18\x01 \x03(\x0b\x32\x16.space.proto.RowBitmap\x12*\n\nadded_rows\x18\x02 \x03(\x0b\x32\x16.space.proto.RowBitmap\"O\n\tRowBitmap\x12\x0c\n\x04\x66ile\x18\x01 \x01(\t\x12\x10\n\x08\x61ll_rows\x18\x02 \x01(\x08\x12\x18\n\x0eroaring_bitmap\x18\x03 \x01(\x0cH\x00\x42\x08\n\x06\x62itmap\"\x93\x01\n\x0bLogicalPlan\x12%\n\x0clogical_plan\x18\x01 \x01(\x0b\x32\x0f.substrait.Plan\x12\x30\n\x04udfs\x18\x02 \x03(\x0b\x32\".space.proto.LogicalPlan.UdfsEntry\x1a+\n\tUdfsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x1d\n\x08\x46ileType\x12\x11\n\tdirectory\x18\x01 \x01(\tb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1fspace/core/proto/metadata.proto\x12\x0bspace.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x14substrait/plan.proto\x1a\x14substrait/type.proto\"#\n\nEntryPoint\x12\x15\n\rmetadata_file\x18\x01 \x01(\t\"\xe9\x04\n\x0fStorageMetadata\x12/\n\x0b\x63reate_time\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x10last_update_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12/\n\x04type\x18\x03 \x01(\x0e\x32!.space.proto.StorageMetadata.Type\x12#\n\x06schema\x18\x04 \x01(\x0b\x32\x13.space.proto.Schema\x12\x1b\n\x13\x63urrent_snapshot_id\x18\x05 \x01(\x03\x12>\n\tsnapshots\x18\x06 \x03(\x0b\x32+.space.proto.StorageMetadata.SnapshotsEntry\x12.\n\x0clogical_plan\x18\x07 \x01(\x0b\x32\x18.space.proto.LogicalPlan\x12\x34\n\x04refs\x18\x08 \x03(\x0b\x32&.space.proto.StorageMetadata.RefsEntry\x1aG\n\x0eSnapshotsEntry\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12$\n\x05value\x18\x02 \x01(\x0b\x32\x15.space.proto.Snapshot:\x02\x38\x01\x1aK\n\tRefsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12-\n\x05value\x18\x02 \x01(\x0b\x32\x1e.space.proto.SnapshotReference:\x02\x38\x01\"@\n\x04Type\x12\x14\n\x10TYPE_UNSPECIFIED\x10\x00\x12\x0b\n\x07\x44\x41TASET\x10\x01\x12\x15\n\x11MATERIALIZED_VIEW\x10\x02\"]\n\x06Schema\x12&\n\x06\x66ields\x18\x01 \x01(\x0b\x32\x16.substrait.NamedStruct\x12\x14\n\x0cprimary_keys\x18\x02 \x03(\t\x12\x15\n\rrecord_fields\x18\x03 \x03(\t\"\xa0\x02\n\x08Snapshot\x12\x13\n\x0bsnapshot_id\x18\x01 \x01(\x03\x12/\n\x0b\x63reate_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x0emanifest_files\x18\x03 \x01(\x0b\x32\x1a.space.proto.ManifestFilesH\x00\x12:\n\x12storage_statistics\x18\x04 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12\x17\n\x0f\x63hange_log_file\x18\x05 \x01(\t\x12\x1f\n\x12parent_snapshot_id\x18\x06 \x01(\x03H\x01\x88\x01\x01\x42\x0b\n\tdata_infoB\x15\n\x13_parent_snapshot_id\"\xb8\x01\n\x11SnapshotReference\x12\x16\n\x0ereference_name\x18\x01 \x01(\t\x12\x13\n\x0bsnapshot_id\x18\x02 \x01(\x03\x12:\n\x04type\x18\x03 \x01(\x0e\x32,.space.proto.SnapshotReference.ReferenceType\":\n\rReferenceType\x12\x14\n\x10TYPE_UNSPECIFIED\x10\x00\x12\x07\n\x03TAG\x10\x01\x12\n\n\x06\x42RANCH\x10\x02\"L\n\rManifestFiles\x12\x1c\n\x14index_manifest_files\x18\x01 \x03(\t\x12\x1d\n\x15record_manifest_files\x18\x02 \x03(\t\"\x8a\x01\n\x11StorageStatistics\x12\x10\n\x08num_rows\x18\x01 \x01(\x03\x12\x1e\n\x16index_compressed_bytes\x18\x02 \x01(\x03\x12 \n\x18index_uncompressed_bytes\x18\x03 \x01(\x03\x12!\n\x19record_uncompressed_bytes\x18\x04 \x01(\x03\"e\n\tChangeLog\x12,\n\x0c\x64\x65leted_rows\x18\x01 \x03(\x0b\x32\x16.space.proto.RowBitmap\x12*\n\nadded_rows\x18\x02 \x03(\x0b\x32\x16.space.proto.RowBitmap\"a\n\tRowBitmap\x12\x0c\n\x04\x66ile\x18\x01 \x01(\t\x12\x10\n\x08\x61ll_rows\x18\x02 \x01(\x08\x12\x18\n\x0eroaring_bitmap\x18\x03 \x01(\x0cH\x00\x12\x10\n\x08num_rows\x18\x04 \x01(\x03\x42\x08\n\x06\x62itmap\"\x93\x01\n\x0bLogicalPlan\x12%\n\x0clogical_plan\x18\x01 \x01(\x0b\x32\x0f.substrait.Plan\x12\x30\n\x04udfs\x18\x02 \x03(\x0b\x32\".space.proto.LogicalPlan.UdfsEntry\x1a+\n\tUdfsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x1d\n\x08\x46ileType\x12\x11\n\tdirectory\x18\x01 \x01(\tb\x06proto3') _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'space.core.proto.metadata_pb2', globals()) @@ -54,11 +54,11 @@ _CHANGELOG._serialized_start=1574 _CHANGELOG._serialized_end=1675 _ROWBITMAP._serialized_start=1677 - _ROWBITMAP._serialized_end=1756 - _LOGICALPLAN._serialized_start=1759 - _LOGICALPLAN._serialized_end=1906 - _LOGICALPLAN_UDFSENTRY._serialized_start=1863 - _LOGICALPLAN_UDFSENTRY._serialized_end=1906 - _FILETYPE._serialized_start=1908 - _FILETYPE._serialized_end=1937 + _ROWBITMAP._serialized_end=1774 + _LOGICALPLAN._serialized_start=1777 + _LOGICALPLAN._serialized_end=1924 + _LOGICALPLAN_UDFSENTRY._serialized_start=1881 + _LOGICALPLAN_UDFSENTRY._serialized_end=1924 + _FILETYPE._serialized_start=1926 + _FILETYPE._serialized_end=1955 # @@protoc_insertion_point(module_scope) diff --git a/python/src/space/core/proto/metadata_pb2.pyi b/python/src/space/core/proto/metadata_pb2.pyi index 37c9a07..0581e6c 100644 --- a/python/src/space/core/proto/metadata_pb2.pyi +++ b/python/src/space/core/proto/metadata_pb2.pyi @@ -14,6 +14,8 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + +Proto messages used by Space metadata persistence. """ import builtins import collections.abc @@ -389,7 +391,7 @@ global___ChangeLog = ChangeLog @typing_extensions.final class RowBitmap(google.protobuf.message.Message): """Mark rows in a file by bitmap. - NEXT_ID: 4 + NEXT_ID: 5 """ DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -397,21 +399,25 @@ class RowBitmap(google.protobuf.message.Message): FILE_FIELD_NUMBER: builtins.int ALL_ROWS_FIELD_NUMBER: builtins.int ROARING_BITMAP_FIELD_NUMBER: builtins.int + NUM_ROWS_FIELD_NUMBER: builtins.int file: builtins.str """File path that the bit map applies to.""" all_rows: builtins.bool """All rows are selected. Bitmap is empty in this case.""" roaring_bitmap: builtins.bytes """Roaring bitmap.""" + num_rows: builtins.int + """Total number of rows in the file.""" def __init__( self, *, file: builtins.str = ..., all_rows: builtins.bool = ..., roaring_bitmap: builtins.bytes = ..., + num_rows: builtins.int = ..., ) -> None: ... def HasField(self, field_name: typing_extensions.Literal["bitmap", b"bitmap", "roaring_bitmap", b"roaring_bitmap"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["all_rows", b"all_rows", "bitmap", b"bitmap", "file", b"file", "roaring_bitmap", b"roaring_bitmap"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["all_rows", b"all_rows", "bitmap", b"bitmap", "file", b"file", "num_rows", b"num_rows", "roaring_bitmap", b"roaring_bitmap"]) -> None: ... def WhichOneof(self, oneof_group: typing_extensions.Literal["bitmap", b"bitmap"]) -> typing_extensions.Literal["roaring_bitmap"] | None: ... global___RowBitmap = RowBitmap diff --git a/python/src/space/core/proto/runtime.proto b/python/src/space/core/proto/runtime.proto index 6920371..543b5f1 100644 --- a/python/src/space/core/proto/runtime.proto +++ b/python/src/space/core/proto/runtime.proto @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - +// // Proto messages used by Space runtime. // // Different from metadata.proto, protos here are not persisted in metadata @@ -29,7 +29,7 @@ import "space/core/proto/metadata.proto"; package space.proto; // Information of a data file. -// NEXT_ID: 5 +// NEXT_ID: 6 message DataFile { // Data file path. string path = 1; @@ -47,9 +47,13 @@ message DataFile { int64 end = 2; } - // A range of selected rows in the data file. + // Optional, a range of selected rows in the data file. // Used for partially reading an index file and its records. - Range selected_rows = 4; + Range row_slice = 4; + + // Optional, bitmap masking rows to read; can be used together with + // `row_slice`. `path` in RowBitmap is not used. + RowBitmap row_bitmap = 5; } // A set of associated data and manifest files. diff --git a/python/src/space/core/proto/runtime_pb2.py b/python/src/space/core/proto/runtime_pb2.py index f5a26fe..992676d 100644 --- a/python/src/space/core/proto/runtime_pb2.py +++ b/python/src/space/core/proto/runtime_pb2.py @@ -14,7 +14,7 @@ from space.core.proto import metadata_pb2 as space_dot_core_dot_proto_dot_metadata__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1espace/core/proto/runtime.proto\x12\x0bspace.proto\x1a\x1fspace/core/proto/metadata.proto\"\xc7\x01\n\x08\x44\x61taFile\x12\x0c\n\x04path\x18\x01 \x01(\t\x12:\n\x12storage_statistics\x18\x02 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12\x18\n\x10manifest_file_id\x18\x03 \x01(\x03\x12\x32\n\rselected_rows\x18\x04 \x01(\x0b\x32\x1b.space.proto.DataFile.Range\x1a#\n\x05Range\x12\r\n\x05start\x18\x01 \x01(\x03\x12\x0b\n\x03\x65nd\x18\x02 \x01(\x03\"\xbc\x01\n\x07\x46ileSet\x12*\n\x0bindex_files\x18\x01 \x03(\x0b\x32\x15.space.proto.DataFile\x12J\n\x14index_manifest_files\x18\x02 \x03(\x0b\x32,.space.proto.FileSet.IndexManifestFilesEntry\x1a\x39\n\x17IndexManifestFilesEntry\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xd2\x01\n\x05Patch\x12,\n\x08\x61\x64\x64ition\x18\x01 \x01(\x0b\x32\x1a.space.proto.ManifestFiles\x12,\n\x08\x64\x65letion\x18\x02 \x01(\x0b\x32\x1a.space.proto.ManifestFiles\x12\x41\n\x19storage_statistics_update\x18\x03 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12*\n\nchange_log\x18\x04 \x01(\x0b\x32\x16.space.proto.ChangeLogb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1espace/core/proto/runtime.proto\x12\x0bspace.proto\x1a\x1fspace/core/proto/metadata.proto\"\xef\x01\n\x08\x44\x61taFile\x12\x0c\n\x04path\x18\x01 \x01(\t\x12:\n\x12storage_statistics\x18\x02 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12\x18\n\x10manifest_file_id\x18\x03 \x01(\x03\x12.\n\trow_slice\x18\x04 \x01(\x0b\x32\x1b.space.proto.DataFile.Range\x12*\n\nrow_bitmap\x18\x05 \x01(\x0b\x32\x16.space.proto.RowBitmap\x1a#\n\x05Range\x12\r\n\x05start\x18\x01 \x01(\x03\x12\x0b\n\x03\x65nd\x18\x02 \x01(\x03\"\xbc\x01\n\x07\x46ileSet\x12*\n\x0bindex_files\x18\x01 \x03(\x0b\x32\x15.space.proto.DataFile\x12J\n\x14index_manifest_files\x18\x02 \x03(\x0b\x32,.space.proto.FileSet.IndexManifestFilesEntry\x1a\x39\n\x17IndexManifestFilesEntry\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xd2\x01\n\x05Patch\x12,\n\x08\x61\x64\x64ition\x18\x01 \x01(\x0b\x32\x1a.space.proto.ManifestFiles\x12,\n\x08\x64\x65letion\x18\x02 \x01(\x0b\x32\x1a.space.proto.ManifestFiles\x12\x41\n\x19storage_statistics_update\x18\x03 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12*\n\nchange_log\x18\x04 \x01(\x0b\x32\x16.space.proto.ChangeLogb\x06proto3') _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'space.core.proto.runtime_pb2', globals()) @@ -24,13 +24,13 @@ _FILESET_INDEXMANIFESTFILESENTRY._options = None _FILESET_INDEXMANIFESTFILESENTRY._serialized_options = b'8\001' _DATAFILE._serialized_start=81 - _DATAFILE._serialized_end=280 - _DATAFILE_RANGE._serialized_start=245 - _DATAFILE_RANGE._serialized_end=280 - _FILESET._serialized_start=283 - _FILESET._serialized_end=471 - _FILESET_INDEXMANIFESTFILESENTRY._serialized_start=414 - _FILESET_INDEXMANIFESTFILESENTRY._serialized_end=471 - _PATCH._serialized_start=474 - _PATCH._serialized_end=684 + _DATAFILE._serialized_end=320 + _DATAFILE_RANGE._serialized_start=285 + _DATAFILE_RANGE._serialized_end=320 + _FILESET._serialized_start=323 + _FILESET._serialized_end=511 + _FILESET_INDEXMANIFESTFILESENTRY._serialized_start=454 + _FILESET_INDEXMANIFESTFILESENTRY._serialized_end=511 + _PATCH._serialized_start=514 + _PATCH._serialized_end=724 # @@protoc_insertion_point(module_scope) diff --git a/python/src/space/core/proto/runtime_pb2.pyi b/python/src/space/core/proto/runtime_pb2.pyi index c94f2a7..be87ffb 100644 --- a/python/src/space/core/proto/runtime_pb2.pyi +++ b/python/src/space/core/proto/runtime_pb2.pyi @@ -14,6 +14,16 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + +Proto messages used by Space runtime. + +Different from metadata.proto, protos here are not persisted in metadata +files. We use proto instead of Python classes for the capabilities of +serialization to bytes for cross machines/languages messaging. For example, +`FileSet` is sent to worker machine for processing, and `Patch` is sent back +for the coordinator machine to commit to storage. Pickling Python classses +may work but it may have more restrictions, especially when crossing +languages. """ import builtins import collections.abc @@ -33,7 +43,7 @@ DESCRIPTOR: google.protobuf.descriptor.FileDescriptor @typing_extensions.final class DataFile(google.protobuf.message.Message): """Information of a data file. - NEXT_ID: 5 + NEXT_ID: 6 """ DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -59,7 +69,8 @@ class DataFile(google.protobuf.message.Message): PATH_FIELD_NUMBER: builtins.int STORAGE_STATISTICS_FIELD_NUMBER: builtins.int MANIFEST_FILE_ID_FIELD_NUMBER: builtins.int - SELECTED_ROWS_FIELD_NUMBER: builtins.int + ROW_SLICE_FIELD_NUMBER: builtins.int + ROW_BITMAP_FIELD_NUMBER: builtins.int path: builtins.str """Data file path.""" @property @@ -68,20 +79,26 @@ class DataFile(google.protobuf.message.Message): manifest_file_id: builtins.int """Locally assigned manifest file IDs.""" @property - def selected_rows(self) -> global___DataFile.Range: - """A range of selected rows in the data file. + def row_slice(self) -> global___DataFile.Range: + """Optional, a range of selected rows in the data file. Used for partially reading an index file and its records. """ + @property + def row_bitmap(self) -> space.core.proto.metadata_pb2.RowBitmap: + """Optional, bitmap masking rows to read; can be used together with + `row_slice`. `path` in RowBitmap is not used. + """ def __init__( self, *, path: builtins.str = ..., storage_statistics: space.core.proto.metadata_pb2.StorageStatistics | None = ..., manifest_file_id: builtins.int = ..., - selected_rows: global___DataFile.Range | None = ..., + row_slice: global___DataFile.Range | None = ..., + row_bitmap: space.core.proto.metadata_pb2.RowBitmap | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["selected_rows", b"selected_rows", "storage_statistics", b"storage_statistics"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["manifest_file_id", b"manifest_file_id", "path", b"path", "selected_rows", b"selected_rows", "storage_statistics", b"storage_statistics"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["row_bitmap", b"row_bitmap", "row_slice", b"row_slice", "storage_statistics", b"storage_statistics"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["manifest_file_id", b"manifest_file_id", "path", b"path", "row_bitmap", b"row_bitmap", "row_slice", b"row_slice", "storage_statistics", b"storage_statistics"]) -> None: ... global___DataFile = DataFile diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 9444eeb..14e4470 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -367,7 +367,7 @@ def versions(self) -> pa.Table: join_type="left outer").sort_by(sort_args) def ray_dataset(self, ray_options: RayOptions, - read_options: ReadOptions) -> ray.Dataset: + read_options: ReadOptions) -> ray.data.Dataset: """Return a Ray dataset for a Space storage.""" ds = ray.data.read_datasource(ray_data_sources.SpaceDataSource(), storage=self, diff --git a/python/src/space/core/transform/join.py b/python/src/space/core/transform/join.py index 71f2753..e0a5b23 100644 --- a/python/src/space/core/transform/join.py +++ b/python/src/space/core/transform/join.py @@ -105,11 +105,11 @@ def record_fields(self) -> List[str]: set(self.right.fields or self.right.view.schema.names)) return list(left_record_fields) + list(right_record_fields) - def process_source(self, data: pa.Table) -> ray.Dataset: + def process_source(self, data: ray.data.Dataset) -> ray.data.Dataset: raise NotImplementedError("Processing change data in join is not supported") def ray_dataset(self, ray_options: RayOptions, read_options: ReadOptions, - join_options: JoinOptions) -> ray.Dataset: + join_options: JoinOptions) -> ray.data.Dataset: # TODO: to use paralelism specified by ray_options. Today parallelism is # controlled by join_options.partition_fn. if read_options.fields is not None: diff --git a/python/src/space/core/transform/udfs.py b/python/src/space/core/transform/udfs.py index ce5acbc..0325fe9 100644 --- a/python/src/space/core/transform/udfs.py +++ b/python/src/space/core/transform/udfs.py @@ -99,12 +99,12 @@ def _arguments(self) -> List[FunctionArgument]: field_id_dict = arrow.field_name_to_id_dict(self.input_.schema) return [_fn_arg(field_id_dict[name]) for name in self.input_fields] - def process_source(self, data: pa.Table) -> ray.Dataset: + def process_source(self, data: ray.data.Dataset) -> ray.data.Dataset: return self._transform( self.input_.process_source(data).select_columns(self.input_fields)) def ray_dataset(self, ray_options: RayOptions, read_options: ReadOptions, - join_options: JoinOptions) -> ray.Dataset: + join_options: JoinOptions) -> ray.data.Dataset: if read_options.fields is not None: raise errors.UserInputError( "`fields` is not supported for views, use `input_fields` of " @@ -115,7 +115,7 @@ def ray_dataset(self, ray_options: RayOptions, read_options: ReadOptions, transform_utils.ray_dataset(self.input_, ray_options, read_options)) @abstractmethod - def _transform(self, ds: ray.Dataset) -> ray.Dataset: + def _transform(self, ds: ray.data.Dataset) -> ray.data.Dataset: """Transform a Ray dataset using the UDF.""" @@ -148,10 +148,11 @@ def from_relation(cls, location: str, metadata: meta.StorageMetadata, return MapTransform(*_load_udf(location, metadata, rel.project. expressions[0], rel.project.input, plan)) - def _transform(self, ds: ray.Dataset) -> ray.Dataset: + def _transform(self, ds: ray.data.Dataset) -> ray.data.Dataset: batch_size = ("default" if self.udf.batch_size is None else self.udf.batch_size) - return ds.map_batches(self.udf.fn, batch_size=batch_size) + return ds.map_batches(self.udf.fn, + batch_size=batch_size) # type: ignore[arg-type] @dataclass @@ -183,7 +184,7 @@ def from_relation(cls, location: str, metadata: meta.StorageMetadata, return FilterTransform(*_load_udf(location, metadata, rel.filter.condition, rel.filter.input, plan)) - def _transform(self, ds: ray.Dataset) -> ray.Dataset: + def _transform(self, ds: ray.data.Dataset) -> ray.data.Dataset: return ds.filter(self.udf.fn) diff --git a/python/src/space/core/transform/utils.py b/python/src/space/core/transform/utils.py index 3533f7a..34726fc 100644 --- a/python/src/space/core/transform/utils.py +++ b/python/src/space/core/transform/utils.py @@ -27,7 +27,7 @@ def ray_dataset(view: View, ray_options: RayOptions, - read_options: ReadOptions) -> ray.Dataset: + read_options: ReadOptions) -> ray.data.Dataset: """A wrapper for creating Ray dataset for datasets and views.""" empty_join_options = JoinOptions() @@ -38,5 +38,6 @@ def ray_dataset(view: View, ray_options: RayOptions, # For non-dataset views, fields can't be pushed down to storage. fields = read_options.fields read_options.fields = None - return view.ray_dataset(ray_options, read_options, - empty_join_options).select_columns(fields) + ds = view.ray_dataset(ray_options, read_options, empty_join_options) + + return ds if fields is None else ds.select_columns(fields) diff --git a/python/src/space/core/views.py b/python/src/space/core/views.py index 6a7acf4..e17fbd6 100644 --- a/python/src/space/core/views.py +++ b/python/src/space/core/views.py @@ -88,12 +88,12 @@ def to_relation(self, builder: LogicalPlanBuilder) -> Rel: """ @abstractmethod - def process_source(self, data: pa.Table) -> ray.Dataset: + def process_source(self, data: ray.data.Dataset) -> ray.data.Dataset: """Process input data using the transform defined by the view.""" @abstractmethod def ray_dataset(self, ray_options: RayOptions, read_options: ReadOptions, - join_options: JoinOptions) -> ray.Dataset: + join_options: JoinOptions) -> ray.data.Dataset: """Return a Ray dataset for a Space view.""" def ray( diff --git a/python/src/space/ray/data_sources.py b/python/src/space/ray/data_sources.py index 9c65588..b7f4d10 100644 --- a/python/src/space/ray/data_sources.py +++ b/python/src/space/ray/data_sources.py @@ -20,6 +20,7 @@ from typing import (Any, Callable, Dict, Iterator, List, Optional, TYPE_CHECKING) +from absl import logging # type: ignore[import-untyped] from ray.data.block import Block, BlockMetadata from ray.data.datasource.datasource import Datasource, Reader, ReadTask from ray.data.datasource.datasource import WriteResult @@ -39,9 +40,13 @@ class SpaceDataSource(Datasource): # pylint: disable=arguments-differ,too-many-arguments def create_reader( # type: ignore[override] - self, storage: Storage, ray_options: RayOptions, - read_options: ReadOptions) -> Reader: - return _SpaceDataSourceReader(storage, ray_options, read_options) + self, + storage: Storage, + ray_options: RayOptions, + read_options: ReadOptions, + file_set: Optional[rt.FileSet] = None, + ) -> Reader: + return _SpaceDataSourceReader(storage, ray_options, read_options, file_set) def do_write(self, blocks: List[ObjectRef[Block]], metadata: List[BlockMetadata], @@ -58,10 +63,11 @@ def on_write_complete( # type: ignore[override] class _SpaceDataSourceReader(Reader): def __init__(self, storage: Storage, ray_options: RayOptions, - read_options: ReadOptions): + read_options: ReadOptions, file_set: Optional[rt.FileSet]): self._storage = storage self._ray_options = ray_options self._read_options = read_options + self._file_set = file_set def estimate_inmemory_data_size(self) -> Optional[int]: # TODO: to implement this method. @@ -75,22 +81,38 @@ def estimate_inmemory_data_size(self) -> Optional[int]: # TODO: to use parallelism when generating blocks. def get_read_tasks(self, parallelism: int) -> List[ReadTask]: read_tasks: List[ReadTask] = [] - file_set = self._storage.data_files(self._read_options.filter_, - self._read_options.snapshot_id) - for index_file in file_set.index_files: + if self._file_set is None: + self._file_set = self._storage.data_files(self._read_options.filter_, + self._read_options.snapshot_id) + + for index_file in self._file_set.index_files: num_rows = index_file.storage_statistics.num_rows + if num_rows == 0: + # For old Space datasets that num_rows in change logs is not populated. + logging.warning( + f"Statistics of index file {index_file.path} is unavailable, " + "index file slicing in Ray datasource is disabled") + read_tasks.append( + ReadTask(self._read_fn(index_file), _block_metadata(None))) + continue + if (self._ray_options.enable_row_range_block and self._read_options.batch_size): batch_size = self._read_options.batch_size num_blocks = math.ceil(num_rows / batch_size) for i in range(num_blocks): + # Ignore row bitmaps in slicing. The benefit of considering bitmaps + # should be small because we use bitmap to mark deleted rows in + # change logs. In most compute intense cases (e.g., refreshing MVs), + # we only need the primary keys of deleted rows and don't need to + # read records (it is a TODO). index_file_slice = rt.DataFile() index_file_slice.CopyFrom(index_file) - rows = index_file_slice.selected_rows + rows = index_file_slice.row_slice rows.start = i * batch_size rows.end = min((i + 1) * batch_size, num_rows) @@ -104,13 +126,12 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]: return read_tasks def _read_fn(self, index_file: rt.DataFile) -> Callable[..., Iterator[Block]]: - return partial(FileSetReadOp, - self._storage.location, self._storage.metadata, - rt.FileSet(index_files=[index_file]), None, + return partial(FileSetReadOp, self._storage.location, + self._storage.metadata, rt.FileSet(index_files=[index_file]), self._read_options) # type: ignore[return-value] -def _block_metadata(num_rows: int) -> BlockMetadata: +def _block_metadata(num_rows: Optional[int]) -> BlockMetadata: """The metadata about the block that we know prior to actually executing the read task. """ diff --git a/python/src/space/ray/ops/change_data.py b/python/src/space/ray/ops/change_data.py new file mode 100644 index 0000000..5e2a841 --- /dev/null +++ b/python/src/space/ray/ops/change_data.py @@ -0,0 +1,70 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Change data feed that computes delta between two snapshots by Ray.""" + +from typing import Iterable, Iterator + +import ray + +from space.core.ops.change_data import (ChangeData, ChangeType, + LocalChangeDataReadOp, + ordered_snapshot_ids) +from space.core.options import ReadOptions +import space.core.proto.metadata_pb2 as meta +from space.core.storage import Storage +from space.ray import data_sources as ray_data_sources +from space.ray.options import RayOptions + + +def read_change_data(storage: Storage, start_snapshot_id: int, + end_snapshot_id: int, ray_options: RayOptions, + read_options: ReadOptions) -> Iterator[ChangeData]: + """Read change data from a start to an end snapshot. + + start_snapshot_id is excluded; end_snapshot_id is included. + """ + for snapshot_id in ordered_snapshot_ids(storage, start_snapshot_id, + end_snapshot_id): + for change in _RayChangeDataReadOp(storage, snapshot_id, ray_options, + read_options): + yield change + + +class _RayChangeDataReadOp(LocalChangeDataReadOp): + """Read changes of data from a given snapshot of a dataset.""" + + def __init__(self, storage: Storage, snapshot_id: int, + ray_options: RayOptions, read_options: ReadOptions): + LocalChangeDataReadOp.__init__(self, storage, snapshot_id, read_options) + self._ray_options = ray_options + + def __iter__(self) -> Iterator[ChangeData]: + # Must return deletion first, otherwise when the upstream re-apply + # deletions and additions, it may delete newly added data. + # TODO: to enforce this check upstream, or merge deletion+addition as a + # update. + yield ChangeData(self._snapshot_id, ChangeType.DELETE, + self._ray_dataset(self._change_log.deleted_rows)) + yield ChangeData(self._snapshot_id, ChangeType.ADD, + self._ray_dataset(self._change_log.added_rows)) + + def _ray_dataset(self, bitmaps: Iterable[meta.RowBitmap]) -> ray.data.Dataset: + return ray.data.read_datasource( + ray_data_sources.SpaceDataSource(), + storage=self._storage, + ray_options=self._ray_options, + read_options=self._read_options, + file_set=self._bitmaps_to_file_set(bitmaps), + parallelism=self._ray_options.max_parallelism) diff --git a/python/src/space/ray/ops/join.py b/python/src/space/ray/ops/join.py index d54849e..afc248d 100644 --- a/python/src/space/ray/ops/join.py +++ b/python/src/space/ray/ops/join.py @@ -64,7 +64,7 @@ def __init__(self, left: JoinInput, right: JoinInput, join_keys: List[str], self._left_record_fields = _selected_record_fields(left) self._right_record_fields = _selected_record_fields(right) - def ray_dataset(self) -> ray.Dataset: + def ray_dataset(self) -> ray.data.Dataset: """Return join result as a Ray dataset.""" left_range = _join_key_range(self._left, self._join_key) right_range = _join_key_range(self._right, self._join_key) @@ -103,7 +103,7 @@ def ray_dataset(self) -> ray.Dataset: @dataclass class _JoinInputInternal: - ds: ray.Dataset + ds: ray.data.Dataset reference_read: bool record_fields: List[str] @@ -137,7 +137,7 @@ def _join(left: _JoinInputInternal, right: _JoinInputInternal, join_key: str, return result.select(output_fields) -def _read_all(ds: ray.Dataset) -> Optional[pa.Table]: +def _read_all(ds: ray.data.Dataset) -> Optional[pa.Table]: results = [] for ref in ds.to_arrow_refs(): data = ray.get(ref) diff --git a/python/src/space/ray/runners.py b/python/src/space/ray/runners.py index e2cb483..2650e20 100644 --- a/python/src/space/ray/runners.py +++ b/python/src/space/ray/runners.py @@ -31,13 +31,14 @@ from space.core.ops.utils import FileOptions from space.core.ops.append import LocalAppendOp from space.core.ops.base import InputData, InputIteratorFn -from space.core.ops.change_data import ChangeData, ChangeType, read_change_data +from space.core.ops.change_data import ChangeData, ChangeType from space.core.ops.delete import FileSetDeleteOp from space.core.ops.insert import InsertOptions from space.core.options import JoinOptions, ReadOptions import space.core.proto.runtime_pb2 as rt from space.core.utils import errors from space.ray.ops.append import RayAppendOp +from space.ray.ops.change_data import read_change_data from space.ray.ops.delete import RayDeleteOp from space.ray.ops.insert import RayInsertOp from space.ray.ops.utils import singleton_storage @@ -95,16 +96,17 @@ def diff(self, self._source_storage, self._source_storage.version_to_snapshot_id(start_version), self._source_storage.version_to_snapshot_id(end_version), - ReadOptions(batch_size=batch_size)) + self._ray_options, ReadOptions(batch_size=batch_size)) for change in source_changes: # TODO: skip processing the data for deletions; the caller is usually # only interested at deleted primary keys. # TODO: to split change data into chunks for parallel processing. processed_remote_data = self._view.process_source(change.data) - processed_data = ray.get(processed_remote_data.to_arrow_refs()) - yield ChangeData(change.snapshot_id, change.type_, - pa.concat_tables(processed_data)) + for ref in processed_remote_data.to_arrow_refs(): + data = ray.get(ref) + if data.num_rows > 0: + yield ChangeData(change.snapshot_id, change.type_, data) @property def _source_storage(self) -> Storage: @@ -191,6 +193,8 @@ def refresh(self, txn = self._start_txn() try: + # TODO: to avoid creating a new delete/append op for each batch. The + # output file size will be smaller than configured. if change.type_ == ChangeType.DELETE: patches.append(self._process_delete(change.data)) elif change.type_ == ChangeType.ADD: @@ -220,6 +224,7 @@ def _process_delete(self, data: pa.Table) -> Optional[rt.Patch]: return op.delete() def _process_append(self, data: pa.Table) -> Optional[rt.Patch]: + # TODO: to use RayAppendOp. op = LocalAppendOp(self._storage.location, self._storage.metadata, self._file_options) op.write(data) diff --git a/python/tests/catalogs/test_directory.py b/python/tests/catalogs/test_directory.py index 5da63ef..9b6fd2f 100644 --- a/python/tests/catalogs/test_directory.py +++ b/python/tests/catalogs/test_directory.py @@ -19,7 +19,7 @@ import pyarrow as pa import pytest -from space import DatasetInfo, DirCatalog +from space import DatasetInfo, DirCatalog, RayOptions from space.core.utils import errors @@ -88,7 +88,7 @@ def test_materialized_view_crud(self, tmp_path): mv1 = cat.materialize("mv1", view) ds.local().append({"f": [1, 2, 3], "float64": [0.1, 0.2, 0.3]}) - mv1.ray().refresh() + mv1.ray(RayOptions(max_parallelism=1)).refresh() expected_data = {"f": [1, 2, 3], "float64": [1.1, 1.2, 1.3]} assert mv1.local().read_all().to_pydict() == expected_data diff --git a/python/tests/ray/test_runners.py b/python/tests/ray/test_runners.py index 8e5cb12..0dd4dbc 100644 --- a/python/tests/ray/test_runners.py +++ b/python/tests/ray/test_runners.py @@ -178,6 +178,7 @@ def test_read_batch_size(self, tmp_path, sample_schema, def test_diff_map_batches(self, tmp_path, sample_dataset, refresh_batch_size): ds = sample_dataset + ray_options = RayOptions(max_parallelism=1) view_schema = pa.schema( [pa.field("int64", pa.int64()), pa.field("float64", pa.float64())]) @@ -188,7 +189,7 @@ def test_diff_map_batches(self, tmp_path, sample_dataset, refresh_batch_size): mv1 = view.materialize(str(tmp_path / "mv1")) ds_runner = ds.local() - view_runner = view.ray() + view_runner = view.ray(ray_options) # Test append. ds_runner.append({ @@ -225,7 +226,7 @@ def test_diff_map_batches(self, tmp_path, sample_dataset, refresh_batch_size): assert list(view_runner.diff(0, 2)) == [expected_change0, expected_change1] # Test materialized views. - ray_runner = mv.ray() + ray_runner = mv.ray(ray_options) local_runner = mv.local() assert len(ray_runner.refresh("tag1", batch_size=refresh_batch_size)) == 1 @@ -273,7 +274,7 @@ def test_diff_map_batches(self, tmp_path, sample_dataset, refresh_batch_size): ] # Test refresh multiple snapshots. - ray_runner = mv1.ray() + ray_runner = mv1.ray(ray_options) assert len(ray_runner.refresh(batch_size=refresh_batch_size)) == 3 assert ray_runner.read_all() == pa.Table.from_pydict({ "int64": [1, 3, 4], @@ -281,6 +282,7 @@ def test_diff_map_batches(self, tmp_path, sample_dataset, refresh_batch_size): }) def test_diff_batch_size(self, tmp_path, sample_dataset): + ray_options = RayOptions(max_parallelism=1) ds = sample_dataset view_schema = pa.schema( @@ -296,7 +298,7 @@ def test_diff_batch_size(self, tmp_path, sample_dataset): "binary": [b"b1", b"b2", b"b3"] }) - assert list(view.ray().diff(0, 1, batch_size=2)) == [ + assert list(view.ray(ray_options).diff(0, 1, batch_size=2)) == [ ChangeData( ds.storage.metadata.current_snapshot_id, ChangeType.ADD, pa.Table.from_pydict({ @@ -311,7 +313,7 @@ def test_diff_batch_size(self, tmp_path, sample_dataset): ] mv = view.materialize(str(tmp_path / "mv")) - ray_runner = mv.ray(RayOptions(max_parallelism=1)) + ray_runner = mv.ray(ray_options) ray_runner.refresh(batch_size=2) assert list(ray_runner.read()) == [ pa.Table.from_pydict({ @@ -333,7 +335,7 @@ def _sample_filter_udf(row: Dict[str, Any]) -> Dict[str, Any]: input_fields=["int64", "float64"]) ds_runner = sample_dataset.local() - view_runner = view.ray() + view_runner = view.ray(RayOptions(max_parallelism=1)) # Test append. ds_runner.append({