Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
pcrespov committed Oct 26, 2023
1 parent d837b67 commit 20b82b1
Showing 1 changed file with 60 additions and 15 deletions.
75 changes: 60 additions & 15 deletions services/api-server/tests/unit/test_services_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
import logging
from collections.abc import AsyncIterable, Callable
from contextlib import asynccontextmanager, suppress
from typing import Annotated
from unittest.mock import AsyncMock

import httpx
import pytest
from faker import Faker
from fastapi import FastAPI
from fastapi import Depends, FastAPI, Request
from fastapi.responses import StreamingResponse
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
Expand All @@ -26,6 +27,7 @@
from servicelib.logging_utils import LogLevelInt, LogMessageStr
from servicelib.rabbitmq import RabbitMQClient
from simcore_service_api_server.services.rabbitmq import get_rabbitmq_client
from starlette.background import BackgroundTask

pytest_simcore_core_services_selection = [
"rabbit",
Expand Down Expand Up @@ -204,6 +206,40 @@ def new_routes_injected(app: FastAPI):

_NEW_LINE = "\n"

async def get_buffer(request: Request):
return asyncio.Queue()

async def listen_to_logs(
buffer: Annotated[asyncio.Queue, Depends(get_buffer)],
project_id: ProjectID,
follow: bool = False,
):
async def _consume_log(data: bytes):
got = LoggerRabbitMessage.parse_raw(data)
assert got.project_id == project_id

item = JobLog(
job_id=got.project_id,
node_id=got.node_id,
log_level=got.log_level,
messages=got.messages,
)

if follow:
await buffer.put(item)
else:
with suppress(asyncio.QueueFull):
buffer.put_nowait(item)

return True

# listen to project_id
async with rabbit_consuming_context(app, project_id, _consume_log):

yield

print("stopping listening to logs")

async def _json_logs_generator(
queue: asyncio.Queue, follow: bool
) -> AsyncIterable[str]:
Expand All @@ -215,12 +251,13 @@ async def _json_logs_generator(
yield job_log.json() + _NEW_LINE
else:
try:
await asyncio.sleep(0)
# Return an item if one is immediately available
job_log = queue.get_nowait()
yield job_log.json() + _NEW_LINE

except asyncio.QueueEmpty as err:
raise StopAsyncIteration from err
except asyncio.QueueEmpty:
pass

@app.get("/projects/{project_id}/logs")
async def _stream_logs_handler(project_id: ProjectID, *, follow: bool = False):
Expand All @@ -246,13 +283,19 @@ async def _consume_log(data: bytes):
return True

# listen to project_id
async with rabbit_consuming_context(app, project_id, _consume_log):
# FXIME: when return, it will disconnect and we need to keep it connected
# and shutdown when client disconnects
return StreamingResponse(
_json_logs_generator(buffer, follow=follow),
media_type="application/x-ndjson",
)
rabbit_consumer: RabbitMQClient = get_rabbitmq_client(app)
queue_name = await rabbit_consumer.subscribe(
LoggerRabbitMessage.get_channel_name(),
_consume_log,
exclusive_queue=True,
topics=[f"{project_id}.*"],
)

return StreamingResponse(
_json_logs_generator(buffer, follow=follow),
media_type="application/x-ndjson",
background=BackgroundTask(rabbit_consumer.unsubscribe, queue_name),
)


@pytest.mark.testit
Expand All @@ -261,17 +304,19 @@ async def test_it(
app: FastAPI,
client: httpx.AsyncClient,
user_id: UserID,
node_id: NodeID,
project_id: ProjectID,
produce_logs: Callable,
):

async with client.stream("GET", f"/projects/{project_id}/logs") as r:
coro = produce_logs(
"expected", project_id, node_id, ["expected message"] * 3, logging.DEBUG
)

# sends log
await produce_logs(
"expected", project_id, node_id, ["expected message"] * 3, logging.DEBUG
)
tasks = [asyncio.create_task(coro, name="log-producer") for _ in range(3)]

async with client.stream("GET", f"/projects/{project_id}/logs") as r:
# streams open
async for line in r.aiter_lines():
data = json.loads(line)
log = JobLog.parse_obj(data)
Expand Down

0 comments on commit 20b82b1

Please sign in to comment.