Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move TFDS conversion class to loaders. #14

Merged
merged 1 commit into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""TFDS to Space dataset conversion."""
"""Loads ArrayRecord files into Space datasets."""

import os
from typing import Any, Callable, Dict, List, Optional, Tuple
Expand All @@ -27,20 +27,20 @@
from space.core.ops.append import LocalAppendOp
from space.core.schema import arrow
from space.core.serializers import DictSerializer
from space.core.utils.paths import StoragePaths
from space.core.utils.paths import StoragePathsMixin

TfdsIndexFn: TypeAlias = Callable[[Dict[str, Any]], Dict[str, Any]]
ArrayRecordIndexFn: TypeAlias = Callable[[Dict[str, Any]], Dict[str, Any]]


class LocalConvertTfdsOp(StoragePaths):
"""Convert a TFDS dataset to a Space dataset without copying data."""
class LocalLoadArrayRecordOp(StoragePathsMixin):
"""Load ArrayRecord files into Space without copying data."""

def __init__(self, location: str, metadata: meta.StorageMetadata,
tfds_path: str, index_fn: TfdsIndexFn):
StoragePaths.__init__(self, location)
array_record_dir: str, index_fn: ArrayRecordIndexFn):
StoragePathsMixin.__init__(self, location)

self._metadata = metadata
self._tfds_path = tfds_path
self._array_record_dir = array_record_dir
self._index_fn = index_fn

record_fields = set(self._metadata.schema.record_fields)
Expand All @@ -58,17 +58,17 @@ def __init__(self, location: str, metadata: meta.StorageMetadata,
self._record_field = self._record_fields[0]

self._serializer = DictSerializer(logical_schema)
self._tfds_files = _list_tfds_files(tfds_path)
self._array_record_files = _list_files(array_record_dir)

def write(self) -> Optional[runtime.Patch]:
"""Write files to append a TFDS dataset to Space."""
# TODO: to convert files in parallel.
"""Write index files to load ArrayRecord files to Space dataset."""
# TODO: to load files in parallel.
append_op = LocalAppendOp(self._location,
self._metadata,
record_address_input=True)

total_record_bytes = 0
for f in self._tfds_files:
for f in self._array_record_files:
index_data, record_bytes = self._build_index_for_array_record(f)
total_record_bytes += record_bytes
append_op.write(index_data)
Expand Down Expand Up @@ -101,10 +101,10 @@ def _build_index_for_array_record(self,
return index_data, record_uncompressed_bytes


def _list_tfds_files(tfds_path: str) -> List[str]:
def _list_files(array_record_dir: str) -> List[str]:
files: List[str] = []
for f in os.listdir(tfds_path):
full_path = os.path.join(tfds_path, f)
for f in os.listdir(array_record_dir):
full_path = os.path.join(array_record_dir, f)
if os.path.isfile(full_path) and '.array_record' in f:
files.append(full_path)

Expand Down
6 changes: 3 additions & 3 deletions python/src/space/core/ops/append.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from space.core.schema import utils as schema_utils
from space.core.utils import paths
from space.core.utils.lazy_imports_utils import array_record_module as ar
from space.core.utils.paths import StoragePaths
from space.core.utils.paths import StoragePathsMixin

# TODO: to obtain the values from user provided options.
# Thresholds for writing Parquet files. The sizes are uncompressed bytes.
Expand Down Expand Up @@ -71,7 +71,7 @@ class _RecordWriterInfo:
default_factory=meta.StorageStatistics)


class LocalAppendOp(BaseAppendOp, StoragePaths):
class LocalAppendOp(BaseAppendOp, StoragePathsMixin):
"""Append operation running locally.

It can be used as components of more complex operations and distributed
Expand All @@ -88,7 +88,7 @@ def __init__(self,
Args:
record_address_input: if true, input record fields are addresses.
"""
StoragePaths.__init__(self, location)
StoragePathsMixin.__init__(self, location)

self._metadata = metadata
record_fields = set(self._metadata.schema.record_fields)
Expand Down
6 changes: 3 additions & 3 deletions python/src/space/core/ops/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from space.core.ops.base import BaseOp
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as runtime
from space.core.utils.paths import StoragePaths
from space.core.utils.paths import StoragePathsMixin
from space.core.schema import constants


Expand All @@ -46,7 +46,7 @@ def delete(self) -> Optional[runtime.Patch]:
"""


class FileSetDeleteOp(BaseDeleteOp, StoragePaths):
class FileSetDeleteOp(BaseDeleteOp, StoragePathsMixin):
"""Delete operation of a given file set running locally.

It can be used as components of more complex operations and distributed
Expand All @@ -57,7 +57,7 @@ class FileSetDeleteOp(BaseDeleteOp, StoragePaths):

def __init__(self, location: str, metadata: meta.StorageMetadata,
file_set: runtime.FileSet, filter_: pc.Expression):
StoragePaths.__init__(self, location)
StoragePathsMixin.__init__(self, location)

if not _validate_files(file_set):
raise RuntimeError(f"Invalid input file set for delete op:\n{file_set}")
Expand Down
6 changes: 3 additions & 3 deletions python/src/space/core/ops/insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from space.core.ops.base import BaseOp, InputData
import space.core.proto.runtime_pb2 as runtime
from space.core.storage import Storage
from space.core.utils.paths import StoragePaths
from space.core.utils.paths import StoragePathsMixin


@dataclass
Expand All @@ -53,11 +53,11 @@ def write(self, data: InputData) -> Optional[runtime.Patch]:
"""Insert data into storage."""


class LocalInsertOp(BaseInsertOp, StoragePaths):
class LocalInsertOp(BaseInsertOp, StoragePathsMixin):
'''Append data to a dataset.'''

def __init__(self, location: str, storage: Storage, options: InsertOptions):
StoragePaths.__init__(self, location)
StoragePathsMixin.__init__(self, location)

self._storage = storage
self._metadata = self._storage.metadata
Expand Down
6 changes: 3 additions & 3 deletions python/src/space/core/ops/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from space.core.schema import arrow
from space.core.schema.constants import FILE_PATH_FIELD, ROW_ID_FIELD
from space.core.schema import utils as schema_utils
from space.core.utils.paths import StoragePaths
from space.core.utils.paths import StoragePathsMixin

_RECORD_KEY_FIELD = "__RECORD_KEY"

Expand All @@ -56,7 +56,7 @@ def __iter__(self) -> Iterator[pa.Table]:
"""Iterator of read data."""


class FileSetReadOp(BaseReadOp, StoragePaths):
class FileSetReadOp(BaseReadOp, StoragePathsMixin):
"""Read operation of a given file set running locally.

It can be used as components of more complex operations and distributed
Expand All @@ -70,7 +70,7 @@ def __init__(self,
metadata: meta.StorageMetadata,
file_set: runtime.FileSet,
options: Optional[ReadOptions] = None):
StoragePaths.__init__(self, location)
StoragePathsMixin.__init__(self, location)

# TODO: to validate that filter_ does not contain record files.

Expand Down
22 changes: 12 additions & 10 deletions python/src/space/core/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
from space.core.ops.base import InputData
import space.core.proto.runtime_pb2 as runtime
from space.core.storage import Storage
from space.tf.conversion import LocalConvertTfdsOp, TfdsIndexFn
from space.core.loaders.array_record import ArrayRecordIndexFn
from space.core.loaders.array_record import LocalLoadArrayRecordOp


class BaseRunner(ABC):
Expand Down Expand Up @@ -64,13 +65,14 @@ def append_from(self, source: Iterator[InputData]) -> runtime.JobResult:
"""Append data into the dataset from an iterator source."""

@abstractmethod
def append_tfds(self, tfds_path: str,
index_fn: TfdsIndexFn) -> runtime.JobResult:
"""Append data from a Tensorflow Dataset without copying data.
def append_array_record(self, array_record_dir: str,
index_fn: ArrayRecordIndexFn) -> runtime.JobResult:
"""Append data from ArrayRecord files without copying data.

TODO: to support a pattern of files to expand.

Args:
tfds_path: the folder of TFDS dataset files, should contain ArrowRecord
files.
array_record_dir: the folder of ArrayRecord files.
index_fn: a function that build index fields from each TFDS record.
"""

Expand Down Expand Up @@ -134,10 +136,10 @@ def append_from(self, source: Iterator[InputData]) -> runtime.JobResult:

return self._try_commit(op.finish())

def append_tfds(self, tfds_path: str,
index_fn: TfdsIndexFn) -> runtime.JobResult:
op = LocalConvertTfdsOp(self._storage.location, self._storage.metadata,
tfds_path, index_fn)
def append_array_record(self, array_record_dir: str,
index_fn: ArrayRecordIndexFn) -> runtime.JobResult:
op = LocalLoadArrayRecordOp(self._storage.location, self._storage.metadata,
array_record_dir, index_fn)
return self._try_commit(op.write())

def _insert(self, data: InputData,
Expand Down
2 changes: 1 addition & 1 deletion python/src/space/core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
_INIT_SNAPSHOT_ID = 0


class Storage(paths.StoragePaths):
class Storage(paths.StoragePathsMixin):
"""Storage manages data files by metadata using the Space format."""

def __init__(self, location: str, metadata: meta.StorageMetadata):
Expand Down
2 changes: 1 addition & 1 deletion python/src/space/core/utils/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def new_metadata_path(metadata_dir_: str) -> str:
return path.join(metadata_dir_, f"metadata_{uuid_()}.txtpb")


class StoragePaths:
class StoragePathsMixin:
"""Provides util methods for file and directory paths."""

def __init__(self, location: str):
Expand Down
4 changes: 2 additions & 2 deletions python/tests/core/utils/test_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ def test_new_metadata_path(mock_uuid): # pylint: disable=unused-argument
"metadata") == "metadata/metadata_<uuid>.txtpb"


class TestStoragePaths:
class TestStoragePathsMixin:

_LOCATION = "location"

@pytest.fixture
def storage_paths(self):
return paths.StoragePaths(self._LOCATION)
return paths.StoragePathsMixin(self._LOCATION)

def test_data_dir(self, storage_paths):
assert storage_paths.data_dir == f"{self._LOCATION}/data"
Expand Down
4 changes: 2 additions & 2 deletions python/tests/tf/test_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from space.core.utils.uuids import uuid_


class TestLocalConvertTfdsOp:
class TestLocalLoadArrayRecordOp:

@pytest.fixture
def tf_features(self):
Expand Down Expand Up @@ -74,7 +74,7 @@ def index_fn(record):
}

runner = ds.local()
response = runner.append_tfds(tfds_path, index_fn)
response = runner.append_array_record(tfds_path, index_fn)
assert response.storage_statistics_update == meta.StorageStatistics(
num_rows=2,
index_compressed_bytes=104,
Expand Down