Skip to content

Commit

Permalink
🎨 Job log streaming: connect endpoint with rabbitMQ (ITISFoundation#5045
Browse files Browse the repository at this point in the history
)

Co-authored-by: Pedro Crespo <[email protected]>
  • Loading branch information
bisgaard-itis and pcrespov authored Nov 21, 2023
1 parent 3f42bed commit 56fe30c
Show file tree
Hide file tree
Showing 39 changed files with 938 additions and 180 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import TypeAlias

EnvVarsDict: TypeAlias = dict[str, str]
EnvVarsList: TypeAlias = list[str]


# SEE packages/pytest-simcore/tests/test_helpers_utils_envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,20 @@
import dotenv
import pytest

from .typing_env import EnvVarsDict
from .typing_env import EnvVarsDict, EnvVarsList

#
# monkeypatch using dict
#


def delenvs_from_dict(monkeypatch: pytest.MonkeyPatch, envs: EnvVarsList):
for var in envs:
assert isinstance(var, str)
assert var is not None # None keys cannot be is defined w/o value
monkeypatch.delenv(var)


def setenvs_from_dict(
monkeypatch: pytest.MonkeyPatch, envs: EnvVarsDict
) -> EnvVarsDict:
Expand Down
11 changes: 10 additions & 1 deletion packages/pytest-simcore/src/pytest_simcore/rabbit_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async def rabbit_service(


@pytest.fixture
async def rabbitmq_client(
async def create_rabbitmq_client(
rabbit_service: RabbitSettings,
) -> AsyncIterator[Callable[[str], RabbitMQClient]]:
created_clients = []
Expand All @@ -101,6 +101,7 @@ def _creator(client_name: str, *, heartbeat: int = 60) -> RabbitMQClient:
return client

yield _creator

# cleanup, properly close the clients
await asyncio.gather(*(client.close() for client in created_clients))

Expand All @@ -126,3 +127,11 @@ async def _creator(client_name: str, *, heartbeat: int = 60) -> RabbitMQRPCClien
yield _creator
# cleanup, properly close the clients
await asyncio.gather(*(client.close() for client in created_clients))



async def rabbitmq_client(create_rabbitmq_client):
# NOTE: Legacy fixture
# Use create_rabbitmq_client instead of rabbitmq_client
# SEE docs/coding-conventions.md::CC4
return create_rabbitmq_client
9 changes: 9 additions & 0 deletions packages/service-library/src/servicelib/fastapi/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from pydantic.errors import PydanticErrorMixin


class ApplicationRuntimeError(PydanticErrorMixin, RuntimeError):
pass


class ApplicationStateError(ApplicationRuntimeError):
msg_template: str = "Invalid app.state.{state}: {msg}"
87 changes: 87 additions & 0 deletions packages/service-library/src/servicelib/fastapi/rabbitmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import logging

from fastapi import FastAPI
from models_library.rabbitmq_messages import RabbitMessageBase
from settings_library.rabbit import RabbitSettings

from ..rabbitmq import RabbitMQClient
from ..rabbitmq._utils import wait_till_rabbitmq_responsive
from .errors import ApplicationStateError

_logger = logging.getLogger(__name__)


def _create_client(app: FastAPI):
app.state.rabbitmq_client = RabbitMQClient(
client_name=app.state.rabbitmq_client_name,
settings=app.state.rabbitmq_settings,
)


async def _remove_client(app: FastAPI):
await app.state.rabbitmq_client.close()
app.state.rabbitmq_client = None


async def connect(app: FastAPI):
assert app.state.rabbitmq_settings # nosec
await wait_till_rabbitmq_responsive(app.state.rabbitmq_settings.dsn)
_create_client(app)


async def disconnect(app: FastAPI):
if app.state.rabbitmq_client:
await _remove_client(app)


async def reconnect(app: FastAPI):
await disconnect(app)
await connect(app)


def setup_rabbit(
app: FastAPI,
*,
settings: RabbitSettings,
name: str,
) -> None:
"""Sets up rabbit in a given app
- Inits app.states for rabbitmq
- Creates a client to communicate with rabbitmq
Arguments:
app -- fastapi app
settings -- Rabbit settings or if None, the connection to rabbit is not done upon startup
name -- name for the rmq client name
"""
app.state.rabbitmq_client = None # RabbitMQClient | None
app.state.rabbitmq_client_name = name
app.state.rabbitmq_settings = settings

if settings is None:
return

async def on_startup() -> None:
await connect(app)

app.add_event_handler("startup", on_startup)

async def on_shutdown() -> None:
await disconnect(app)

app.add_event_handler("shutdown", on_shutdown)


def get_rabbitmq_client(app: FastAPI) -> RabbitMQClient:
if not app.state.rabbitmq_client:
raise ApplicationStateError(
state="rabbitmq_client",
msg="Rabbitmq service unavailable. Check app settings",
)
assert isinstance(rabbitmq_client := app.state.rabbitmq_client, RabbitMQClient)
return rabbitmq_client


async def post_message(app: FastAPI, message: RabbitMessageBase) -> None:
await get_rabbitmq_client(app).publish(message.channel_name, message)
12 changes: 9 additions & 3 deletions packages/service-library/tests/fastapi/test_openapi.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
# pylint: disable=protected-access
# pylint: disable=redefined-outer-name
# pylint: disable=too-many-arguments
# pylint: disable=unused-argument
# pylint: disable=unused-variable

import pytest
import starlette.routing
from fastapi.applications import FastAPI
from fastapi.routing import APIRouter
from openapi_spec_validator import openapi_v31_spec_validator, validate_spec
from openapi_spec_validator import validate_spec
from openapi_spec_validator.exceptions import OpenAPISpecValidatorError
from servicelib.fastapi.openapi import (
override_fastapi_openapi_method,
Expand Down Expand Up @@ -31,7 +37,7 @@ def test_exclusive_min_openapi_issue(app: FastAPI):
# NOTE: With the latest update of openapi_spec_validator, now passes validation 3.1 but
# does not seem resolved. It was moved to https://github.com/tiangolo/fastapi/discussions/9140
with pytest.raises(OpenAPISpecValidatorError):
validate_spec(app.openapi(), validator=openapi_v31_spec_validator)
validate_spec(app.openapi())


def test_overriding_openapi_method(app: FastAPI):
Expand All @@ -48,7 +54,7 @@ def test_overriding_openapi_method(app: FastAPI):
assert openapi
assert isinstance(openapi, dict)

validate_spec(openapi, validator=openapi_v31_spec_validator)
validate_spec(openapi)

# NOTE: https://github.com/tiangolo/fastapi/issues/240 now passes validation 3.1 but
# does not seem resolved. It was moved to https://github.com/tiangolo/fastapi/discussions/9140
Expand Down
147 changes: 147 additions & 0 deletions packages/service-library/tests/fastapi/test_rabbitmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# pylint:disable=unused-variable
# pylint:disable=unused-argument
# pylint:disable=redefined-outer-name


import pytest
from faker import Faker
from models_library.rabbitmq_messages import LoggerRabbitMessage, RabbitMessageBase
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_fixed

_TENACITY_RETRY_PARAMS = {
"reraise": True,
"retry": retry_if_exception_type(AssertionError),
"stop": stop_after_delay(30),
"wait": wait_fixed(0.1),
}

# Selection of core and tool services started in this swarm fixture (integration)
pytest_simcore_core_services_selection = [
"rabbit",
]

pytest_simcore_ops_services_selection = []


@pytest.fixture
def rabbit_log_message(faker: Faker) -> LoggerRabbitMessage:
return LoggerRabbitMessage(
user_id=faker.pyint(min_value=1),
project_id=faker.uuid4(),
node_id=faker.uuid4(),
messages=faker.pylist(allowed_types=(str,)),
)


@pytest.fixture(params=["rabbit_log_message"])
def rabbit_message(
request: pytest.FixtureRequest,
rabbit_log_message: LoggerRabbitMessage,
) -> RabbitMessageBase:
return {
"rabbit_log_message": rabbit_log_message,
}[request.param]


# # https://github.com/ITISFoundation/osparc-simcore/issues/5059
# def test_rabbitmq_does_not_initialize_if_deactivated(
# disabled_rabbitmq: None,
# initialized_app: FastAPI,
# ):
# assert hasattr(initialized_app.state, "rabbitmq_client")
# assert initialized_app.state.rabbitmq_client is None
# with pytest.raises(InvalidConfig):
# get_rabbitmq_client(initialized_app)

# def test_rabbitmq_initializes(
# enabled_rabbitmq: RabbitSettings,
# initialized_app: FastAPI,
# ):
# assert hasattr(initialized_app.state, "rabbitmq_client")
# assert initialized_app.state.rabbitmq_client is not None
# assert (
# get_rabbitmq_client(initialized_app)
# == initialized_app.state.rabbitmq_client
# )

# async def test_post_message(
# enabled_rabbitmq: RabbitSettings,
# initialized_app: FastAPI,
# rabbit_message: RabbitMessageBase,
# create_rabbitmq_client: Callable[[str], RabbitMQClient],
# mocker: MockerFixture,
# ):
# mocked_message_handler = mocker.AsyncMock(return_value=True)
# consumer_rmq = create_rabbitmq_client("pytest_consumer")
# await consumer_rmq.subscribe(
# rabbit_message.channel_name,
# mocked_message_handler,
# topics=[BIND_TO_ALL_TOPICS] if rabbit_message.routing_key() else None,
# )

# producer_rmq = get_rabbitmq_client(initialized_app)
# assert producer_rmq is not None
# await producer_rmq.publish(rabbit_message.channel_name, rabbit_message)

# async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS):
# with attempt:
# print(
# f"--> checking for message in rabbit exchange {rabbit_message.channel_name}, {attempt.retry_state.retry_object.statistics}"
# )
# mocked_message_handler.assert_called_once_with(
# rabbit_message.json().encode()
# )
# print("... message received")

# async def test_post_message_with_disabled_rabbit_does_not_raise(
# disabled_rabbitmq: None,
# disabled_ec2: None,
# mocked_redis_server: None,
# initialized_app: FastAPI,
# rabbit_message: RabbitMessageBase,
# ):
# await post_message(initialized_app, message=rabbit_message)

# async def _switch_off_rabbit_mq_instance(
# async_docker_client: aiodocker.Docker,
# ) -> None:
# # remove the rabbit MQ instance
# rabbit_services = [
# s
# for s in await async_docker_client.services.list()
# if "rabbit" in s["Spec"]["Name"]
# ]
# await asyncio.gather(
# *(async_docker_client.services.delete(s["ID"]) for s in rabbit_services)
# )

# @retry(**_TENACITY_RETRY_PARAMS)
# async def _check_service_task_gone(service: Mapping[str, Any]) -> None:
# print(
# f"--> checking if service {service['ID']}:{service['Spec']['Name']} is really gone..."
# )
# list_of_tasks = await async_docker_client.containers.list(
# all=True,
# filters={
# "label": [f"com.docker.swarm.service.id={service['ID']}"],
# },
# )
# assert not list_of_tasks
# print(f"<-- service {service['ID']}:{service['Spec']['Name']} is gone.")

# await asyncio.gather(*(_check_service_task_gone(s) for s in rabbit_services))

# async def test_post_message_when_rabbit_disconnected(
# enabled_rabbitmq: RabbitSettings,
# disabled_ec2: None,
# mocked_redis_server: None,
# initialized_app: FastAPI,
# rabbit_log_message: LoggerRabbitMessage,
# async_docker_client: aiodocker.Docker,
# ):
# await _switch_off_rabbit_mq_instance(async_docker_client)

# # now posting should not raise out
# await post_message(initialized_app, message=rabbit_log_message)
Loading

0 comments on commit 56fe30c

Please sign in to comment.