-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathinput_reader.py
109 lines (93 loc) · 3.19 KB
/
input_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
"""
Input reader.
"""
# Copyright (c) 2023. ECCO Sneaks & Data
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import re
from abc import abstractmethod
from functools import partial
from typing import Optional
from adapta.metrics import MetricsProvider
from adapta.process_communication import DataSocket
from adapta.storage.query_enabled_store import QueryEnabledStore
from adapta.utils.decorators._logging import run_time_metrics_async
from pandas import DataFrame as PandasDataFrame
from esd_services_api_client.nexus.abstractions.nexus_object import (
NexusObject,
TPayload,
)
from esd_services_api_client.nexus.abstractions.logger_factory import LoggerFactory
class InputReader(NexusObject[TPayload]):
"""
Base class for a raw data reader.
"""
def __init__(
self,
socket: DataSocket,
store: QueryEnabledStore,
metrics_provider: MetricsProvider,
logger_factory: LoggerFactory,
payload: TPayload,
*readers: "InputReader"
):
super().__init__(metrics_provider, logger_factory)
self.socket = socket
self._store = store
self._data: Optional[PandasDataFrame] = None
self._readers = readers
self._payload = payload
@property
def data(self) -> Optional[PandasDataFrame]:
"""
Data read by this reader.
"""
return self._data
@abstractmethod
async def _read_input(self) -> PandasDataFrame:
"""
Actual data reader logic. Implementing this method is mandatory for the reader to work
"""
@property
def _metric_name(self) -> str:
return re.sub(
r"(?<!^)(?=[A-Z])",
"_",
self.__class__.__name__.lower().replace("reader", ""),
)
@property
def _metric_tags(self) -> dict[str, str]:
return {"entity": self._metric_name}
async def read(self) -> PandasDataFrame:
"""
Coroutine that reads the data from external store and converts it to a dataframe.
"""
@run_time_metrics_async(
metric_name="read_input",
on_finish_message_template="Finished reading {entity} from path {data_path} in {elapsed:.2f}s seconds",
template_args={
"entity": self._metric_name.upper(),
"data_path": self.socket.data_path,
},
)
async def _read(**_) -> PandasDataFrame:
if not self._data:
self._data = await self._read_input()
return self._data
return await partial(
_read,
metric_tags=self._metric_tags,
metrics_provider=self._metrics_provider,
logger=self._logger,
)()