-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path_models.py
95 lines (77 loc) · 3.26 KB
/
_models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""
Models used for inter-process communication in data processing applications.
"""
# Copyright (c) 2023-2024. ECCO Sneaks & Data
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from dataclasses import dataclass
from typing import Optional, List, Iterable
from dataclasses_json import DataClassJsonMixin
from adapta.storage.models import parse_data_path
from adapta.storage.models.astra import AstraPath
from adapta.storage.models.base import DataPath
from adapta.storage.models.azure import AdlsGen2Path, WasbPath
from adapta.storage.models.local import LocalPath
@dataclass(frozen=True)
class DataSocket(DataClassJsonMixin):
"""
Defines an input or an output of a data processing application.
"""
# name of a Socket
alias: str
# path to data (read-in, write-out)
data_path: str
# format of the data (read-in, write-out)
data_format: str
# optional partitions that exist in the data (read-in, write-out)
data_partitions: Optional[List[str]] = None
def __post_init__(self):
assert (
self.alias and self.data_path and self.data_format
), "Fields alias, data_path and data_format must have a value provided to instantiate a DataSocket."
def parse_data_path(
self, candidates: Iterable[DataPath] = (AdlsGen2Path, LocalPath, WasbPath, AstraPath)
) -> Optional[DataPath]:
"""
Attempts to convert this socket's data path to one of the known DataPath types.
:param candidates: Conversion candidate classes for `DataPath`. Default to all currently supported `DataPath` implementations.
If a user has their own `DataPath` implementations, those can be supplied instead for convenience.
:return:
"""
return parse_data_path(self.data_path, candidates=candidates)
def serialize(self) -> str:
"""
Serializes to a |-delimited string
"""
return f"{self.alias}|{self.data_path}|{self.data_format}"
@classmethod
def deserialize(cls, string_socket: str) -> "DataSocket":
"""
Deserializes from a |-delimited string
"""
vals = string_socket.split("|")
return cls(alias=vals[0], data_path=vals[1], data_format=vals[2])
@staticmethod
def find(sockets: List["DataSocket"], alias: str) -> "DataSocket":
"""Fetches a data socket from a list of sockets.
:param sockets: List of sockets
:param alias: Alias to look up
:returns: Socket with alias 'alias'
"""
socket = [s for s in sockets if s.alias == alias]
if len(socket) > 1:
raise ValueError(f"Multiple data sockets exist with alias {alias}")
if len(socket) == 0:
raise ValueError(f"No data sockets exist with alias {alias}")
return socket[0]