Skip to content

Commit

Permalink
WIP: fastapi/rabbitmq
Browse files Browse the repository at this point in the history
  • Loading branch information
pcrespov committed Oct 26, 2023
1 parent 31eb229 commit 102fedf
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 0 deletions.
9 changes: 9 additions & 0 deletions packages/service-library/src/servicelib/fastapi/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from pydantic.errors import PydanticErrorMixin


class ApplicationRuntimeError(PydanticErrorMixin, RuntimeError):
pass


class ApplicationStateError(ApplicationRuntimeError):
msg_template: str = "Invalid app.state.{state}: {msg}"
81 changes: 81 additions & 0 deletions packages/service-library/src/servicelib/fastapi/rabbitmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import logging

from fastapi import FastAPI
from settings_library.rabbit import RabbitSettings

from ..rabbitmq import RabbitMQClient
from ..rabbitmq_utils import wait_till_rabbitmq_responsive
from .errors import ApplicationStateError

_logger = logging.getLogger(__name__)


def _create_client(app: FastAPI):
app.state.rabbitmq_client = RabbitMQClient(
client_name=app.state.rabbitmq_client_name,
settings=app.state.rabbitmq_settings,
)


async def _remove_client(app: FastAPI):
await app.state.rabbitmq_client.close()
app.state.rabbitmq_client = None


async def connect(app: FastAPI):
assert app.state.rabbitmq_settings # nosec
await wait_till_rabbitmq_responsive(app.state.rabbitmq_settings.dsn)
_create_client(app)


async def disconnect(app: FastAPI):
if app.state.rabbitmq_client:
await _remove_client(app)


async def reconnect(app: FastAPI):
await disconnect(app)
await connect(app)


def setup_rabbit(
app: FastAPI,
*,
settings: RabbitSettings,
name: str,
) -> None:
"""Sets up rabbit in a given app
- Inits app.states for rabbitmq
- Creates a client to communicate with rabbitmq
Arguments:
app -- fastapi app
settings -- Rabbit settings or if None, the connection to rabbit is not done upon startup
name -- name for the rmq client name
"""
app.state.rabbitmq_client = None # RabbitMQClient | None
app.state.rabbitmq_client_name = name
app.state.rabbitmq_settings = settings

if settings is None:
return

async def on_startup() -> None:
await connect(app)

app.add_event_handler("startup", on_startup)

async def on_shutdown() -> None:
await disconnect(app)

app.add_event_handler("shutdown", on_shutdown)


def get_rabbitmq_client(app: FastAPI) -> RabbitMQClient:
if not app.state.rabbitmq_client:
raise ApplicationStateError(
state="rabbitmq_client",
msg="Rabbitmq service unavailable. Check app settings",
)
return app.state.rabbitmq_client
157 changes: 157 additions & 0 deletions packages/service-library/tests/fastapi/test_rabbitmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# pylint:disable=unused-variable
# pylint:disable=unused-argument
# pylint:disable=redefined-outer-name

import asyncio
from collections.abc import Callable, Mapping
from typing import Any

import aiodocker
import pytest
from faker import Faker
from fastapi import FastAPI
from models_library.rabbitmq_messages import LoggerRabbitMessage, RabbitMessageBase
from pytest_mock.plugin import MockerFixture
from servicelib.fastapi.rabbitmq import get_rabbitmq_client
from servicelib.rabbitmq import BIND_TO_ALL_TOPICS, RabbitMQClient
from settings_library.rabbit import RabbitSettings
from tenacity import retry
from tenacity._asyncio import AsyncRetrying
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_fixed

_TENACITY_RETRY_PARAMS = {
"reraise": True,
"retry": retry_if_exception_type(AssertionError),
"stop": stop_after_delay(30),
"wait": wait_fixed(0.1),
}

# Selection of core and tool services started in this swarm fixture (integration)
pytest_simcore_core_services_selection = [
"rabbit",
]

pytest_simcore_ops_services_selection = []


@pytest.fixture
def rabbit_log_message(faker: Faker) -> LoggerRabbitMessage:
return LoggerRabbitMessage(
user_id=faker.pyint(min_value=1),
project_id=faker.uuid4(),
node_id=faker.uuid4(),
messages=faker.pylist(allowed_types=(str,)),
)


@pytest.fixture(params=["rabbit_log_message"])
def rabbit_message(
request: pytest.FixtureRequest,
rabbit_log_message: LoggerRabbitMessage,
) -> RabbitMessageBase:
return {
"rabbit_log_message": rabbit_log_message,
}[request.param]


def test_rabbitmq_does_not_initialize_if_deactivated(
disabled_rabbitmq: None,
initialized_app: FastAPI,
):
assert hasattr(initialized_app.state, "rabbitmq_client")
assert initialized_app.state.rabbitmq_client is None
with pytest.raises(ConfigurationError):
get_rabbitmq_client(initialized_app)


def test_rabbitmq_initializes(
enabled_rabbitmq: RabbitSettings,
initialized_app: FastAPI,
):
assert hasattr(initialized_app.state, "rabbitmq_client")
assert initialized_app.state.rabbitmq_client is not None
assert get_rabbitmq_client(initialized_app) == initialized_app.state.rabbitmq_client


async def test_post_message(
enabled_rabbitmq: RabbitSettings,
initialized_app: FastAPI,
rabbit_message: RabbitMessageBase,
create_rabbitmq_client: Callable[[str], RabbitMQClient],
mocker: MockerFixture,
):
mocked_message_handler = mocker.AsyncMock(return_value=True)
consumer_rmq = create_rabbitmq_client("pytest_consumer")
await consumer_rmq.subscribe(
rabbit_message.channel_name,
mocked_message_handler,
topics=[BIND_TO_ALL_TOPICS] if rabbit_message.routing_key() else None,
)

producer_rmq = get_rabbitmq_client(initialized_app)
assert producer_rmq is not None
await producer_rmq.publish(rabbit_message.channel_name, rabbit_message)

async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS):
with attempt:
print(
f"--> checking for message in rabbit exchange {rabbit_message.channel_name}, {attempt.retry_state.retry_object.statistics}"
)
mocked_message_handler.assert_called_once_with(
rabbit_message.json().encode()
)
print("... message received")


async def test_post_message_with_disabled_rabbit_does_not_raise(
disabled_rabbitmq: None,
disabled_ec2: None,
mocked_redis_server: None,
initialized_app: FastAPI,
rabbit_message: RabbitMessageBase,
):
await post_message(initialized_app, message=rabbit_message)


async def _switch_off_rabbit_mq_instance(async_docker_client: aiodocker.Docker) -> None:
# remove the rabbit MQ instance
rabbit_services = [
s
for s in await async_docker_client.services.list()
if "rabbit" in s["Spec"]["Name"]
]
await asyncio.gather(
*(async_docker_client.services.delete(s["ID"]) for s in rabbit_services)
)

@retry(**_TENACITY_RETRY_PARAMS)
async def _check_service_task_gone(service: Mapping[str, Any]) -> None:
print(
f"--> checking if service {service['ID']}:{service['Spec']['Name']} is really gone..."
)
list_of_tasks = await async_docker_client.containers.list(
all=True,
filters={
"label": [f"com.docker.swarm.service.id={service['ID']}"],
},
)
assert not list_of_tasks
print(f"<-- service {service['ID']}:{service['Spec']['Name']} is gone.")

await asyncio.gather(*(_check_service_task_gone(s) for s in rabbit_services))


async def test_post_message_when_rabbit_disconnected(
enabled_rabbitmq: RabbitSettings,
disabled_ec2: None,
mocked_redis_server: None,
initialized_app: FastAPI,
rabbit_log_message: LoggerRabbitMessage,
async_docker_client: aiodocker.Docker,
):
await _switch_off_rabbit_mq_instance(async_docker_client)

# now posting should not raise out
await post_message(initialized_app, message=rabbit_log_message)

0 comments on commit 102fedf

Please sign in to comment.