Skip to content

Commit

Permalink
MLA Framework V0 (#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
george-zubrienko authored Jan 11, 2024
1 parent f126131 commit 068e760
Show file tree
Hide file tree
Showing 23 changed files with 2,497 additions and 842 deletions.
14 changes: 7 additions & 7 deletions esd_services_api_client/crystal/_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,19 +282,19 @@ def get_api_path() -> str:
"sasUri": result.sas_uri,
}

if debug and self._logger is not None:
if not debug:
run_response = self._http.post(url=get_api_path(), json=payload)
# raise if not successful
run_response.raise_for_status()
return

if self._logger is not None:
self._logger.debug(
"Submitting result to {submission_url}, payload {payload}",
submission_url=get_api_path(),
payload=json.dumps(payload),
)

else:
run_response = self._http.post(url=get_api_path(), json=payload)

# raise if not successful
run_response.raise_for_status()

@staticmethod
def read_input(
*,
Expand Down
124 changes: 124 additions & 0 deletions esd_services_api_client/nexus/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
## Nexus
Set the following environment variables for Azure:
```
IS_LOCAL_RUN=1
NEXUS__ALGORITHM_OUTPUT_PATH=abfss://[email protected]/path/to/result
NEXUS__METRIC_PROVIDER_CONFIGURATION={"metric_namespace": "test"}
NEXUS__QES_CONNECTION_STRING=qes://engine\=DELTA\;plaintext_credentials\={"auth_client_class":"adapta.security.clients.AzureClient"}\;settings\={}
NEXUS__STORAGE_CLIENT_CLASS=adapta.storage.blob.azure_storage_client.AzureStorageClient
PROTEUS__USE_AZURE_CREDENTIAL=1
```

Example usage:

```python
import asyncio
from typing import Dict

import pandas
from adapta.metrics import MetricsProvider
from adapta.process_communication import DataSocket
from adapta.storage.query_enabled_store import QueryEnabledStore
from injector import inject

from esd_services_api_client.nexus.abstractions.logger_factory import LoggerFactory
from esd_services_api_client.nexus.core.app_core import Nexus
from esd_services_api_client.nexus.algorithms import MinimalisticAlgorithm
from esd_services_api_client.nexus.input import InputReader, InputProcessor
from pandas import DataFrame as PandasDataFrame


async def my_on_complete_func_1(**kwargs):
pass


async def my_on_complete_func_2(**kwargs):
pass


class XReader(InputReader):
async def _context_open(self):
pass

async def _context_close(self):
pass

@inject
def __init__(self, store: QueryEnabledStore, metrics_provider: MetricsProvider, logger_factory: LoggerFactory,
*readers: "InputReader"):
super().__init__(DataSocket(alias="x", data_path="testx", data_format='delta'), store, metrics_provider, logger_factory, *readers)

async def _read_input(self) -> PandasDataFrame:
return pandas.DataFrame([{'a': 1, 'b': 2}, {'a': 2, 'b': 3}])


class YReader(InputReader):
async def _context_open(self):
pass

async def _context_close(self):
pass

@inject
def __init__(self, store: QueryEnabledStore, metrics_provider: MetricsProvider, logger_factory: LoggerFactory,
*readers: "InputReader"):
super().__init__(DataSocket(alias="y", data_path="testy", data_format='delta'), store, metrics_provider, logger_factory, *readers)

async def _read_input(self) -> PandasDataFrame:
return pandas.DataFrame([{'a': 10, 'b': 12}, {'a': 11, 'b': 13}])


class MyInputProcessor(InputProcessor):
async def _context_open(self):
pass

async def _context_close(self):
pass

@inject
def __init__(self, x: XReader, y: YReader, metrics_provider: MetricsProvider, logger_factory: LoggerFactory,):
super().__init__(x, y, metrics_provider=metrics_provider, logger_factory=logger_factory)

async def process_input(self, **_) -> Dict[str, PandasDataFrame]:
inputs = await self._read_input()
return {
'x_ready': inputs["x"].assign(c=[-1, 1]),
'y_ready': inputs["y"].assign(c=[-1, 1])
}


class MyAlgorithm(MinimalisticAlgorithm):
async def _context_open(self):
pass

async def _context_close(self):
pass

@inject
def __init__(self, input_processor: MyInputProcessor, metrics_provider: MetricsProvider, logger_factory: LoggerFactory,):
super().__init__(input_processor, metrics_provider, logger_factory)

async def _run(self, x_ready: PandasDataFrame, y_ready: PandasDataFrame, **kwargs) -> PandasDataFrame:
return pandas.concat([x_ready, y_ready])


async def main():
nexus = Nexus.create() \
.add_reader(XReader) \
.add_reader(YReader) \
.use_processor(MyInputProcessor) \
.use_algorithm(MyAlgorithm)

await nexus.activate()


if __name__ == "__main__":
asyncio.run(main())

```

Run this code as `sample.py`:

```shell
python3 sample.py --sas-uri 'https://localhost' --request-id test
```
18 changes: 18 additions & 0 deletions esd_services_api_client/nexus/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""
Import index.
"""

# Copyright (c) 2023. ECCO Sneaks & Data
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
18 changes: 18 additions & 0 deletions esd_services_api_client/nexus/abstractions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""
Import index.
"""

# Copyright (c) 2023. ECCO Sneaks & Data
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
64 changes: 64 additions & 0 deletions esd_services_api_client/nexus/abstractions/logger_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
Logger factory for async loggers.
"""

# Copyright (c) 2023. ECCO Sneaks & Data
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import json
import os
from logging import StreamHandler
from typing import final, Type, TypeVar, Optional, Dict

from adapta.logs._async_logger import _AsyncLogger, create_async_logger
from adapta.logs.handlers.datadog_api_handler import DataDogApiHandler
from adapta.logs.models import LogLevel

TLogger = TypeVar("TLogger") # pylint: disable=C0103:


@final
class LoggerFactory:
"""
Async logger provisioner.
"""

def __init__(self):
self._log_handlers = [
StreamHandler(),
]
if "NEXUS__DATADOG_LOGGER_CONFIGURATION" in os.environ:
self._log_handlers.append(
DataDogApiHandler(
**json.loads(os.getenv("NEXUS__DATADOG_LOGGER_CONFIGURATION"))
)
)

def create_logger(
self,
logger_type: Type[TLogger],
fixed_template: Optional[Dict[str, Dict[str, str]]] = None,
fixed_template_delimiter=", ",
) -> _AsyncLogger[TLogger]:
"""
Creates an async-safe logger for the provided class name.
"""
return create_async_logger(
logger_type=logger_type,
log_handlers=self._log_handlers,
min_log_level=LogLevel(os.getenv("NEXUS__LOG_LEVEL", "INFO")),
fixed_template=fixed_template,
fixed_template_delimiter=fixed_template_delimiter,
)
60 changes: 60 additions & 0 deletions esd_services_api_client/nexus/abstractions/nexus_object.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
Base classes for all objects used by Nexus.
"""

# Copyright (c) 2023. ECCO Sneaks & Data
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


from abc import ABC, abstractmethod

from adapta.metrics import MetricsProvider

from esd_services_api_client.nexus.abstractions.logger_factory import LoggerFactory


class NexusObject(ABC):
"""
Base class for all Nexus objects.
"""

def __init__(
self,
metrics_provider: MetricsProvider,
logger_factory: LoggerFactory,
):
self._metrics_provider = metrics_provider
self._logger = logger_factory.create_logger(logger_type=self.__class__)

async def __aenter__(self):
self._logger.start()
await self._context_open()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
self._logger.stop()
await self._context_close()

@abstractmethod
async def _context_open(self):
"""
Optional actions to perform on context activation.
"""

@abstractmethod
async def _context_close(self):
"""
Optional actions to perform on context closure.
"""
22 changes: 22 additions & 0 deletions esd_services_api_client/nexus/algorithms/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""
Import index.
"""

# Copyright (c) 2023. ECCO Sneaks & Data
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from esd_services_api_client.nexus.algorithms.minimalistic import *
from esd_services_api_client.nexus.algorithms.recursive import *
from esd_services_api_client.nexus.algorithms.distributed import *
56 changes: 56 additions & 0 deletions esd_services_api_client/nexus/algorithms/_baseline_algorithm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
Base algorithm
"""

# Copyright (c) 2023. ECCO Sneaks & Data
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


from abc import abstractmethod

from adapta.metrics import MetricsProvider
from pandas import DataFrame as PandasDataFrame

from esd_services_api_client.nexus.abstractions.nexus_object import NexusObject
from esd_services_api_client.nexus.abstractions.logger_factory import LoggerFactory
from esd_services_api_client.nexus.input.input_processor import InputProcessor


class BaselineAlgorithm(NexusObject):
"""
Base class for all algorithm implementations.
"""

def __init__(
self,
input_processor: InputProcessor,
metrics_provider: MetricsProvider,
logger_factory: LoggerFactory,
):
super().__init__(metrics_provider, logger_factory)
self._input_processor = input_processor

@abstractmethod
async def _run(self, **kwargs) -> PandasDataFrame:
"""
Core logic for this algorithm. Implementing this method is mandatory.
"""

async def run(self, **kwargs) -> PandasDataFrame:
"""
Coroutine that executes the algorithm logic.
"""
async with self._input_processor as input_processor:
return await self._run(**(await input_processor.process_input(**kwargs)))
Loading

0 comments on commit 068e760

Please sign in to comment.