Skip to content

Commit

Permalink
Use Ray datasource as input of diff and MV refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
coufon committed Jan 27, 2024
1 parent de825aa commit 1a8ca4b
Show file tree
Hide file tree
Showing 22 changed files with 291 additions and 148 deletions.
8 changes: 4 additions & 4 deletions python/src/space/core/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from space.core.runners import LocalRunner
from space.core.storage import Storage
from space.core.transform.plans import LogicalPlanBuilder
from space.core.utils.lazy_imports_utils import ray, ray_runners
from space.core.utils.lazy_imports_utils import ray, ray_runners # pylint: disable=unused-import
from space.core.views import View
from space.ray.options import RayOptions

Expand Down Expand Up @@ -107,16 +107,16 @@ def to_relation(self, builder: LogicalPlanBuilder) -> Rel:
return Rel(read=ReadRel(named_table=ReadRel.NamedTable(names=[location]),
base_schema=self._storage.metadata.schema.fields))

def process_source(self, data: pa.Table) -> ray.Dataset:
def process_source(self, data: ray.data.Dataset) -> ray.data.Dataset:
# Dataset is the source, there is no transform, so simply return the data.
return ray.data.from_arrow(data)
return data

def ray_dataset(
self,
ray_options: RayOptions,
read_options: ReadOptions = ReadOptions(),
join_options: JoinOptions = JoinOptions()
) -> ray.Dataset:
) -> ray.data.Dataset:
"""Return a Ray dataset for a Space dataset."""
return self._storage.ray_dataset(ray_options, read_options)

Expand Down
2 changes: 1 addition & 1 deletion python/src/space/core/ops/append.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def _finish_index_writer(self) -> None:
update=stats)

self._patch.change_log.added_rows.append(
meta.RowBitmap(file=file_path, all_rows=True))
meta.RowBitmap(file=file_path, all_rows=True, num_rows=stats.num_rows))

self._index_writer_info = None
self._cached_index_file_bytes = 0
Expand Down
99 changes: 57 additions & 42 deletions python/src/space/core/ops/change_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

from dataclasses import dataclass
from enum import Enum
from typing import Iterator, List
from typing import Any, Iterable, Iterator, List

import pyarrow as pa

from space.core.fs.base import BaseFileSystem
from space.core.fs.factory import create_fs
from space.core.ops.read import FileSetReadOp
from space.core.options import ReadOptions
Expand Down Expand Up @@ -51,14 +50,14 @@ class ChangeData:
# The change type.
type_: ChangeType

# The change data.
data: pa.Table
# The change data (pa.Table or ray.data.Dataset).
# NOTE: type annotation not used, because of Ray lazy import.
data: Any


def read_change_data(storage: Storage, start_snapshot_id: int,
end_snapshot_id: int,
read_options: ReadOptions) -> Iterator[ChangeData]:
"""Read change data from a start to an end snapshot.
def ordered_snapshot_ids(storage: Storage, start_snapshot_id: int,
end_snapshot_id: int) -> List[int]:
"""Return a list of ordered snapshot IDs between two snapshots.
start_snapshot_id is excluded; end_snapshot_id is included.
"""
Expand All @@ -81,13 +80,23 @@ def read_change_data(storage: Storage, start_snapshot_id: int,
f"Start snapshot {start_snapshot_id} is not the ancestor of end "
f"snapshot {end_snapshot_id}")

for snapshot_id in all_snapshot_ids[1:]:
for result in iter(
_LocalChangeDataReadOp(storage, snapshot_id, read_options)):
yield result
return all_snapshot_ids[1:]


def read_change_data(storage: Storage, start_snapshot_id: int,
end_snapshot_id: int,
read_options: ReadOptions) -> Iterator[ChangeData]:
"""Read change data from a start to an end snapshot.
start_snapshot_id is excluded; end_snapshot_id is included.
"""
for snapshot_id in ordered_snapshot_ids(storage, start_snapshot_id,
end_snapshot_id):
for change in LocalChangeDataReadOp(storage, snapshot_id, read_options):
yield change


class _LocalChangeDataReadOp(StoragePathsMixin):
class LocalChangeDataReadOp(StoragePathsMixin):
"""Read changes of data from a given snapshot of a dataset."""

def __init__(self, storage: Storage, snapshot_id: int,
Expand All @@ -103,39 +112,45 @@ def __init__(self, storage: Storage, snapshot_id: int,
raise errors.VersionNotFoundError(
f"Change data read can't find snapshot ID {snapshot_id}")

snapshot = self._metadata.snapshots[snapshot_id]

fs = create_fs(self._location)
change_log_file = self._storage.full_path(snapshot.change_log_file)
self._change_log = _read_change_log_proto(fs, change_log_file)
self._snapshot = self._metadata.snapshots[snapshot_id]
self._change_log = _read_change_log_proto(
self._storage.full_path(self._snapshot.change_log_file))

def __iter__(self) -> Iterator[ChangeData]:
# Must return deletion first, otherwise when the upstream re-apply
# deletions and additions, it may delete newly added data.
# TODO: to enforce this check upstream, or merge deletion+addition as a
# update.
for bitmap in self._change_log.deleted_rows:
for change in self._read_bitmap_rows(ChangeType.DELETE, bitmap):
yield change

for bitmap in self._change_log.added_rows:
for change in self._read_bitmap_rows(ChangeType.ADD, bitmap):
yield change

def _read_bitmap_rows(self, change_type: ChangeType,
bitmap: meta.RowBitmap) -> Iterator[ChangeData]:
file_set = rt.FileSet(index_files=[rt.DataFile(path=bitmap.file)])
read_op = FileSetReadOp(
self._storage.location,
self._metadata,
file_set,
row_bitmap=(None if bitmap.all_rows else bitmap.roaring_bitmap),
options=self._read_options)

for data in read_op:
yield ChangeData(self._snapshot_id, change_type, data)


def _read_change_log_proto(fs: BaseFileSystem,
file_path: str) -> meta.ChangeLog:
for data in self._read_op(self._change_log.deleted_rows):
yield ChangeData(self._snapshot_id, ChangeType.DELETE, data)

for data in self._read_op(self._change_log.added_rows):
yield ChangeData(self._snapshot_id, ChangeType.ADD, data)

def _read_op(self, bitmaps: Iterable[meta.RowBitmap]) -> Iterator[pa.Table]:
return iter(
FileSetReadOp(self._storage.location,
self._metadata,
self._bitmaps_to_file_set(bitmaps),
options=self._read_options))

@classmethod
def _bitmaps_to_file_set(cls,
bitmaps: Iterable[meta.RowBitmap]) -> rt.FileSet:
return rt.FileSet(
index_files=[_bitmap_to_index_file(bitmap) for bitmap in bitmaps])


def _bitmap_to_index_file(bitmap: meta.RowBitmap) -> rt.DataFile:
index_file = rt.DataFile(
path=bitmap.file,
storage_statistics=meta.StorageStatistics(num_rows=bitmap.num_rows))
if not bitmap.all_rows:
index_file.row_bitmap.CopyFrom(bitmap)

return index_file


def _read_change_log_proto(file_path: str) -> meta.ChangeLog:
fs = create_fs(file_path)
return fs.read_proto(file_path, meta.ChangeLog())
13 changes: 7 additions & 6 deletions python/src/space/core/ops/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ def delete(self) -> Optional[rt.Patch]:

# Compute storage statistics update.
survivor_stats = _read_index_statistics(survivor_index_manifests)
reinsert_stats = (reinsert_patch.storage_statistics_update if
reinsert_patch is not None else meta.StorageStatistics())
reinsert_stats = (reinsert_patch.storage_statistics_update if reinsert_patch
is not None else meta.StorageStatistics())

deleted_compressed_bytes = (reinsert_stats.index_compressed_bytes +
survivor_stats.index_compressed_bytes
) - stats_before_delete.index_compressed_bytes
) - stats_before_delete.index_compressed_bytes
deleted_uncompressed_bytes = (
reinsert_stats.index_uncompressed_bytes +
survivor_stats.index_uncompressed_bytes
Expand Down Expand Up @@ -205,16 +205,17 @@ def _validate_files(file_set: rt.FileSet) -> bool:
deletion.
"""
for f in file_set.index_files:
if (not f.path or f.storage_statistics.num_rows == 0
or f.manifest_file_id == 0):
if (not f.path or f.storage_statistics.num_rows == 0 or
f.manifest_file_id == 0):
return False

return len(file_set.index_manifest_files) > 0


def _build_bitmap(file: rt.DataFile, index_data: pa.Table,
all_deleted: bool) -> meta.RowBitmap:
row_bitmap = meta.RowBitmap(file=file.path)
row_bitmap = meta.RowBitmap(file=file.path,
num_rows=file.storage_statistics.num_rows)
if all_deleted:
row_bitmap.all_rows = True
else:
Expand Down
33 changes: 13 additions & 20 deletions python/src/space/core/ops/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from space.core.schema import arrow
from space.core.schema.constants import FILE_PATH_FIELD, ROW_ID_FIELD
from space.core.schema import utils as schema_utils
from space.core.utils import errors
from space.core.utils.paths import StoragePathsMixin

_RECORD_KEY_FIELD = "__RECORD_KEY"
Expand Down Expand Up @@ -59,15 +58,13 @@ def __init__(self,
location: str,
metadata: meta.StorageMetadata,
file_set: rt.FileSet,
row_bitmap: Optional[bytes] = None,
options: Optional[ReadOptions] = None):
StoragePathsMixin.__init__(self, location)

# TODO: to validate that filter_ does not contain record fields.

self._metadata = metadata
self._file_set = file_set
self._row_bitmap = row_bitmap

# TODO: to validate options, e.g., fields are valid.
self._options = options or ReadOptions()
Expand Down Expand Up @@ -95,13 +92,7 @@ def __init__(self,

def __iter__(self) -> Iterator[pa.Table]:
for file in self._file_set.index_files:
row_range_read = file.selected_rows.end > 0

# row_range_read is used by Ray SpaceDataSource. row_bitmap is used by Ray
# diff/refresh, which does not use Ray SpaceDataSource.
if row_range_read and self._row_bitmap is not None:
raise errors.SpaceRuntimeError(
"Row mask is not supported when row range read is enabled")
row_range_read = file.HasField("row_slice")

# TODO: always loading the whole table is inefficient, to only load the
# required row groups.
Expand All @@ -110,13 +101,13 @@ def __iter__(self) -> Iterator[pa.Table]:
columns=self._selected_fields,
filters=self._options.filter_) # type: ignore[arg-type]

if self._row_bitmap is not None:
index_data = index_data.filter(
mask=_bitmap_mask(self._row_bitmap, index_data.num_rows))

if row_range_read:
length = file.selected_rows.end - file.selected_rows.start
index_data = index_data.slice(file.selected_rows.start, length)
if file.HasField("row_bitmap") and not file.row_bitmap.all_rows:
index_data = index_data.filter(mask=_bitmap_mask(
file.row_bitmap.roaring_bitmap, index_data.num_rows,
file.row_slice if row_range_read else None))
elif row_range_read:
length = file.row_slice.end - file.row_slice.start
index_data = index_data.slice(file.row_slice.start, length)

if self._options.reference_read:
yield index_data
Expand All @@ -133,8 +124,8 @@ def __iter__(self) -> Iterator[pa.Table]:
(column_id,
arrow.binary_field(self._record_fields_dict[field_id])))

# The batch size enforcement is applied as row range.
if row_range_read:
# The batch size is already applied via row slice range.
yield self._read_index_and_record(index_data, index_column_ids,
record_columns)
else:
Expand Down Expand Up @@ -214,11 +205,13 @@ def read_record_column(paths: StoragePathsMixin,
return pa.array(sorted_values, pa.binary()) # type: ignore[return-value]


def _bitmap_mask(serialized_bitmap: bytes, num_rows: int) -> List[bool]:
def _bitmap_mask(serialized_bitmap: bytes, num_rows: int,
row_slice: Optional[rt.DataFile.Range]) -> List[bool]:
bitmap = BitMap.deserialize(serialized_bitmap)

mask = [False] * num_rows
for row_id in bitmap.to_array():
mask[row_id] = True
if row_slice is None or row_slice.start <= row_id < row_slice.end:
mask[row_id] = True

return mask
11 changes: 9 additions & 2 deletions python/src/space/core/proto/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//
// Proto messages used by Space metadata persistence.

syntax = "proto3";
Expand Down Expand Up @@ -160,6 +160,9 @@ message StorageStatistics {
// Change log stores changes made by a snapshot.
// NEXT_ID: 3
message ChangeLog {
// TODO: to replace RowBitmap list by runtime.FileSet (not backward
// compatible).

// Rows deleted in this snapshot.
repeated RowBitmap deleted_rows = 1;

Expand All @@ -169,7 +172,8 @@ message ChangeLog {


// Mark rows in a file by bitmap.
// NEXT_ID: 4
// TODO: to replace it by runtime.DataFile (not backward compatible).
// NEXT_ID: 5
message RowBitmap {
// File path that the bit map applies to.
string file = 1;
Expand All @@ -181,6 +185,9 @@ message RowBitmap {
// Roaring bitmap.
bytes roaring_bitmap = 3;
}

// Total number of rows in the file.
int64 num_rows = 4;
}

// Store the logical plan of a transform.
Expand Down
Loading

0 comments on commit 1a8ca4b

Please sign in to comment.