diff --git a/services/api-server/src/simcore_service_api_server/core/application.py b/services/api-server/src/simcore_service_api_server/core/application.py index 8160e38d648c..939ccc8351c0 100644 --- a/services/api-server/src/simcore_service_api_server/core/application.py +++ b/services/api-server/src/simcore_service_api_server/core/application.py @@ -19,6 +19,7 @@ from ..api.root import create_router from ..api.routes.health import router as health_router from ..services import catalog, director_v2, remote_debug, storage, webserver +from ..services.rabbitmq import setup_rabbitmq from .events import create_start_app_handler, create_stop_app_handler from .openapi import override_openapi_method, use_route_names_as_operation_ids from .settings import ApplicationSettings @@ -76,6 +77,8 @@ def init_app(settings: ApplicationSettings | None = None) -> FastAPI: if settings.SC_BOOT_MODE == BootModeEnum.DEBUG: remote_debug.setup(app) + setup_rabbitmq(app) + if settings.API_SERVER_WEBSERVER: webserver.setup(app, settings.API_SERVER_WEBSERVER) diff --git a/services/api-server/src/simcore_service_api_server/core/settings.py b/services/api-server/src/simcore_service_api_server/core/settings.py index 2bf1dc531f36..288c3df8dd93 100644 --- a/services/api-server/src/simcore_service_api_server/core/settings.py +++ b/services/api-server/src/simcore_service_api_server/core/settings.py @@ -8,6 +8,7 @@ from settings_library.basic_types import PortInt, VersionTag from settings_library.catalog import CatalogSettings from settings_library.postgres import PostgresSettings +from settings_library.rabbit import RabbitSettings from settings_library.storage import StorageSettings from settings_library.utils_logging import MixinLoggingSettings from settings_library.utils_service import ( @@ -119,9 +120,12 @@ class ApplicationSettings(BasicSettings): # DOCKER BOOT SC_BOOT_MODE: BootModeEnum | None - # POSTGRES API_SERVER_POSTGRES: PostgresSettings | None = Field(auto_default_from_env=True) + PAYMENTS_RABBITMQ: RabbitSettings = Field( + auto_default_from_env=True, description="settings for service/rabbitmq" + ) + # SERVICES with http API API_SERVER_WEBSERVER: WebServerSettings | None = Field(auto_default_from_env=True) API_SERVER_CATALOG: CatalogSettings | None = Field(auto_default_from_env=True) diff --git a/services/api-server/src/simcore_service_api_server/services/rabbitmq.py b/services/api-server/src/simcore_service_api_server/services/rabbitmq.py new file mode 100644 index 000000000000..91964c06eacc --- /dev/null +++ b/services/api-server/src/simcore_service_api_server/services/rabbitmq.py @@ -0,0 +1,38 @@ +import logging +from typing import cast + +from fastapi import FastAPI +from models_library.rabbitmq_messages import RabbitMessageBase +from servicelib.rabbitmq import RabbitMQClient, wait_till_rabbitmq_responsive +from settings_library.rabbit import RabbitSettings + +_logger = logging.getLogger(__name__) + + +def setup_rabbitmq(app: FastAPI) -> None: + settings: RabbitSettings = app.state.settings.PAYMENTS_RABBITMQ + app.state.rabbitmq_client = None + app.state.rabbitmq_rpc_server = None + + async def _on_startup() -> None: + await wait_till_rabbitmq_responsive(settings.dsn) + + app.state.rabbitmq_client = RabbitMQClient( + client_name="api_server", settings=settings + ) + + async def _on_shutdown() -> None: + if app.state.rabbitmq_client: + await app.state.rabbitmq_client.close() + + app.add_event_handler("startup", _on_startup) + app.add_event_handler("shutdown", _on_shutdown) + + +def get_rabbitmq_client(app: FastAPI) -> RabbitMQClient: + assert app.state.rabbitmq_client # nosec + return cast(RabbitMQClient, app.state.rabbitmq_client) + + +async def post_message(app: FastAPI, message: RabbitMessageBase) -> None: + await get_rabbitmq_client(app).publish(message.channel_name, message) diff --git a/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py b/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py index 2b515679258b..80fa199d2c37 100644 --- a/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py +++ b/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py @@ -1,6 +1,14 @@ +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name +# pylint: disable=too-many-arguments +# pylint: disable=unused-argument +# pylint: disable=unused-variable + + import asyncio import datetime import json +from typing import AsyncIterable import httpx import pytest @@ -18,20 +26,20 @@ @pytest.fixture() def app() -> FastAPI: - _app = FastAPI() + app = FastAPI() - async def _text_generator(): + async def _text_generator() -> AsyncIterable[str]: for i in range(10): yield f"some log data {i}\n" await asyncio.sleep(1) - async def _json_generator(): + async def _json_generator() -> AsyncIterable[str]: i = 0 async for text in _text_generator(): yield json.dumps({"envent_id": i, "data": text}, indent=None) + _NEW_LINE i += 1 - @_app.get("/logs") + @app.get("/logs") async def stream_logs(as_json: bool = False): if as_json: return StreamingResponse( @@ -39,7 +47,7 @@ async def stream_logs(as_json: bool = False): ) return StreamingResponse(_text_generator()) - return _app + return app @pytest.mark.parametrize("as_json", [True, False]) diff --git a/services/api-server/tests/unit/test_services_rabbitmq.py b/services/api-server/tests/unit/test_services_rabbitmq.py new file mode 100644 index 000000000000..c669cead05bf --- /dev/null +++ b/services/api-server/tests/unit/test_services_rabbitmq.py @@ -0,0 +1,24 @@ +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name +# pylint: disable=too-many-arguments +# pylint: disable=unused-argument +# pylint: disable=unused-variable + + +from fastapi import FastAPI +from simcore_service_api_server.services.rabbitmq import ( + get_rabbitmq_client, + setup_rabbitmq, +) + +pytest_simcore_core_services_selection = [ + "rabbit", +] +pytest_simcore_ops_services_selection = [] + + +def test_it(app: FastAPI): + + setup_rabbitmq(app) + + rabbit_client = get_rabbitmq_client(app)