Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fix log streaming issues #5104

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
76e7596
add logdistributor and test
bisgaard-itis Nov 28, 2023
00aea43
finalize tests for logdistributor
bisgaard-itis Nov 28, 2023
fd60b45
move logdistributor into code
bisgaard-itis Nov 28, 2023
8c3eb0a
modify loglistener to work with logdistributor
bisgaard-itis Nov 28, 2023
7e103df
LogListener -> LogStreamer and fix tests
bisgaard-itis Nov 28, 2023
c82c72e
add more tests of logstreamer
bisgaard-itis Nov 28, 2023
2e4e614
make sure to setup and teardown logdistributor
bisgaard-itis Nov 28, 2023
c8a9412
cosmetic change
bisgaard-itis Nov 28, 2023
899eecc
connect streamer and distributor
bisgaard-itis Nov 28, 2023
a3cd82c
dont limit queu size as it removes logs
bisgaard-itis Nov 29, 2023
cd02823
add error handling
bisgaard-itis Nov 29, 2023
cbe1e6d
minor changes
bisgaard-itis Nov 29, 2023
3b74c94
merge master into 5081-improve-log-streaming
bisgaard-itis Nov 29, 2023
3d20451
fix test
bisgaard-itis Nov 29, 2023
683c75a
minor fix
bisgaard-itis Nov 29, 2023
721c5b0
fix yet another test
bisgaard-itis Nov 29, 2023
d475c31
remember to deregister
bisgaard-itis Nov 29, 2023
9bd279c
fix tests
bisgaard-itis Nov 29, 2023
ddee97e
change accordinf to PR feedback
bisgaard-itis Nov 29, 2023
5e658a7
minor changes according to PR feedback
bisgaard-itis Nov 29, 2023
df95e35
minor changea ccording to PR feedback
bisgaard-itis Nov 29, 2023
3cb0d2f
refactor log streaming classes into separate class @pcrespov
bisgaard-itis Nov 29, 2023
776ddb1
clean up tests
bisgaard-itis Nov 29, 2023
8893fc9
add custom exception classes for the loghandling
bisgaard-itis Nov 29, 2023
031ad9b
add exception handler
bisgaard-itis Nov 29, 2023
c1b9593
@pcrespov add context managers
bisgaard-itis Nov 29, 2023
bd27896
minor improvements
bisgaard-itis Nov 29, 2023
798f467
fix final tests
bisgaard-itis Nov 29, 2023
b49df1d
merge master into 5081-improve-log-streaming
bisgaard-itis Nov 29, 2023
13f174e
raise exception is context manager is not activated
bisgaard-itis Nov 29, 2023
3180628
fix tests
bisgaard-itis Nov 30, 2023
f9aa20a
@pcrespov move sleep seconds to constant
bisgaard-itis Nov 30, 2023
7e0bbf2
minor change
bisgaard-itis Nov 30, 2023
a272e36
cosmetic
bisgaard-itis Nov 30, 2023
dba4b44
add user_id to api-server log
bisgaard-itis Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,86 +1,17 @@
import asyncio
from asyncio.queues import Queue
from typing import Annotated, AsyncIterable, Final, cast
from uuid import UUID
from typing import Annotated, cast

from fastapi import Depends, FastAPI
from models_library.projects import ProjectID
from models_library.rabbitmq_messages import LoggerRabbitMessage
from pydantic import PositiveInt
from servicelib.fastapi.dependencies import get_app
from servicelib.rabbitmq import RabbitMQClient

from ...models.schemas.jobs import JobLog
from ...services.director_v2 import DirectorV2Api
from ..dependencies.authentication import get_current_user_id
from ..dependencies.services import get_api_client

_NEW_LINE: Final[str] = "\n"
from ...services.log_streaming import LogDistributor


def get_rabbitmq_client(app: Annotated[FastAPI, Depends(get_app)]) -> RabbitMQClient:
assert app.state.rabbitmq_client # nosec
return cast(RabbitMQClient, app.state.rabbitmq_client)


class LogListener:
_queue: Queue[JobLog]
_queue_name: str
_rabbit_consumer: RabbitMQClient
_project_id: ProjectID
_user_id: PositiveInt
_director2_api: DirectorV2Api

def __init__(
self,
user_id: Annotated[PositiveInt, Depends(get_current_user_id)],
rabbit_consumer: Annotated[RabbitMQClient, Depends(get_rabbitmq_client)],
director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))],
):

self._rabbit_consumer = rabbit_consumer
self._user_id = user_id
self._director2_api = director2_api
self._queue: Queue[JobLog] = Queue(50)

async def listen(
self,
project_id: UUID,
):
self._project_id = project_id

self._queue_name = await self._rabbit_consumer.subscribe(
LoggerRabbitMessage.get_channel_name(),
self._add_logs_to_queue,
exclusive_queue=True,
topics=[f"{self._project_id}.*"],
)

async def stop_listening(self):
await self._rabbit_consumer.unsubscribe(self._queue_name)

async def _add_logs_to_queue(self, data: bytes):
got = LoggerRabbitMessage.parse_raw(data)
item = JobLog(
job_id=got.project_id,
node_id=got.node_id,
log_level=got.log_level,
messages=got.messages,
)
await self._queue.put(item)
return True

async def _project_done(self) -> bool:
task = await self._director2_api.get_computation(
self._project_id, self._user_id
)
return not task.stopped is None

async def log_generator(self) -> AsyncIterable[str]:
while True:
while self._queue.empty():
if await self._project_done():
return
await asyncio.sleep(5)
log: JobLog = await self._queue.get()
yield log.json() + _NEW_LINE
def get_log_distributor(app: Annotated[FastAPI, Depends(get_app)]) -> LogDistributor:
assert app.state.log_distributor # nosec
return cast(LogDistributor, app.state.log_distributor)
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from fastapi import status
from starlette.requests import Request
from starlette.responses import JSONResponse

from ...services.log_streaming import (
LogDistributionBaseException,
LogStreamerNotRegistered,
LogStreamerRegistionConflict,
)
from .http_error import create_error_json_response


async def log_handling_error_handler(
_: Request, exc: LogDistributionBaseException
) -> JSONResponse:
msg = f"{exc}"
status_code: int = 500
if isinstance(exc, LogStreamerNotRegistered):
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
elif isinstance(exc, LogStreamerRegistionConflict):
status_code = status.HTTP_409_CONFLICT

return create_error_json_response(msg, status_code=status_code)
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,33 @@

from fastapi import APIRouter, Depends, status
from fastapi.exceptions import HTTPException
from fastapi.responses import RedirectResponse, StreamingResponse
from fastapi.responses import RedirectResponse
from fastapi_pagination.api import create_page
from models_library.api_schemas_webserver.projects import ProjectGet
from models_library.api_schemas_webserver.resource_usage import PricingUnitGet
from models_library.api_schemas_webserver.wallets import WalletGetWithAvailableCredits
from models_library.projects_nodes_io import BaseFileLink
from models_library.users import UserID
from pydantic.types import PositiveInt
from servicelib.logging_utils import log_context
from starlette.background import BackgroundTask

from ...models.basic_types import VersionStr
from ...models.basic_types import LogStreamingResponse, VersionStr
from ...models.pagination import Page, PaginationParams
from ...models.schemas.files import File
from ...models.schemas.jobs import ArgumentTypes, Job, JobID, JobMetadata, JobOutputs
from ...models.schemas.solvers import SolverKeyId
from ...services.catalog import CatalogApi
from ...services.director_v2 import DirectorV2Api, DownloadLink, NodeName
from ...services.log_streaming import LogDistributor, LogStreamer
from ...services.solver_job_models_converters import create_job_from_project
from ...services.solver_job_outputs import ResultsTypes, get_solver_output_results
from ...services.storage import StorageApi, to_file_api_model
from ...services.webserver import ProjectNotFoundError
from ..dependencies.application import get_reverse_url_mapper
from ..dependencies.authentication import get_current_user_id, get_product_name
from ..dependencies.database import Engine, get_db_engine
from ..dependencies.rabbitmq import LogListener
from ..dependencies.rabbitmq import get_log_distributor
from ..dependencies.services import get_api_client
from ..dependencies.webserver import AuthSession, get_webserver_session
from ..errors.http_error import create_error_json_response
Expand Down Expand Up @@ -360,24 +362,27 @@ async def get_job_pricing_unit(

@router.get(
"/{solver_key:path}/releases/{version}/jobs/{job_id:uuid}/logstream",
response_class=StreamingResponse,
response_class=LogStreamingResponse,
include_in_schema=API_SERVER_DEV_FEATURES_ENABLED,
)
async def get_log_stream(
solver_key: SolverKeyId,
version: VersionStr,
job_id: JobID,
log_listener: Annotated[LogListener, Depends(LogListener)],
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))],
log_distributor: Annotated[LogDistributor, Depends(get_log_distributor)],
user_id: Annotated[UserID, Depends(get_current_user_id)],
):
job_name = _compose_job_resource_name(solver_key, version, job_id)
with log_context(_logger, logging.DEBUG, "Begin streaming logs"):
_logger.debug("job: %s", job_name)
with log_context(
_logger, logging.DEBUG, f"Streaming logs for {job_name=} and {user_id=}"
):
project: ProjectGet = await webserver_api.get_project(project_id=job_id)
_raise_if_job_not_associated_with_solver(solver_key, version, project)
await log_listener.listen(job_id)
return StreamingResponse(
log_listener.log_generator(),
media_type="application/x-ndjson",
background=BackgroundTask(log_listener.stop_listening),
log_streamer = LogStreamer(user_id, director2_api, job_id, log_distributor)
await log_streamer.setup()
return LogStreamingResponse(
log_streamer.log_generator(),
background=BackgroundTask(log_streamer.teardown),
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
from httpx import HTTPStatusError
from models_library.basic_types import BootModeEnum
from servicelib.logging_utils import config_all_loggers
from simcore_service_api_server.api.errors.log_handling_error import (
log_handling_error_handler,
)
from simcore_service_api_server.services.log_streaming import (
LogDistributionBaseException,
)
from starlette import status
from starlette.exceptions import HTTPException

Expand Down Expand Up @@ -98,6 +104,7 @@ def init_app(settings: ApplicationSettings | None = None) -> FastAPI:
app.add_exception_handler(HTTPException, http_error_handler)
app.add_exception_handler(RequestValidationError, http422_error_handler)
app.add_exception_handler(HTTPStatusError, httpx_client_error_handler)
app.add_exception_handler(LogDistributionBaseException, log_handling_error_handler)

# SEE https://docs.python.org/3/library/exceptions.html#exception-hierarchy
app.add_exception_handler(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re

from fastapi.responses import StreamingResponse
from models_library.basic_regex import VERSION_RE
from pydantic import ConstrainedStr

Expand All @@ -11,3 +12,7 @@ class VersionStr(ConstrainedStr):

class FileNameStr(ConstrainedStr):
strip_whitespace = True


class LogStreamingResponse(StreamingResponse):
media_type = "application/x-ndjson"
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import hashlib
import logging
from typing import Any, ClassVar, TypeAlias
from uuid import UUID, uuid4

Expand Down Expand Up @@ -306,3 +307,13 @@ class JobLog(BaseModel):
node_id: NodeID | None
log_level: LogLevelInt
messages: list[LogMessageStr]

class Config(BaseConfig):
schema_extra: ClassVar[dict[str, Any]] = {
"example": {
"job_id": "145beae4-a3a8-4fde-adbb-4e8257c2c083",
"node_id": "3742215e-6756-48d2-8b73-4d043065309f",
"log_level": logging.DEBUG,
"messages": ["PROGRESS: 5/10"],
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import asyncio
from asyncio import Queue
from typing import AsyncIterable, Awaitable, Callable, Final

from models_library.rabbitmq_messages import LoggerRabbitMessage
from models_library.users import UserID
from pydantic import PositiveInt
from servicelib.rabbitmq import RabbitMQClient

from ..models.schemas.jobs import JobID, JobLog
from .director_v2 import DirectorV2Api

_NEW_LINE: Final[str] = "\n"
_SLEEP_SECONDS_BEFORE_CHECK_JOB_STATUS: Final[PositiveInt] = 10
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved


class LogDistributionBaseException(Exception):
pass


class LogStreamerNotRegistered(LogDistributionBaseException):
pass


class LogStreamerRegistionConflict(LogDistributionBaseException):
pass


class LogDistributor:
def __init__(self, rabbitmq_client: RabbitMQClient):
self._rabbit_client = rabbitmq_client
self._log_streamers: dict[JobID, Callable[[JobLog], Awaitable[None]]] = {}
self._queue_name: str

async def setup(self):
self._queue_name = await self._rabbit_client.subscribe(
LoggerRabbitMessage.get_channel_name(),
self._distribute_logs,
exclusive_queue=True,
topics=[],
)

async def teardown(self):
await self._rabbit_client.unsubscribe(self._queue_name)

async def __aenter__(self):
await self.setup()
return self

async def __aexit__(self, exc_type, exc, tb):
await self.teardown()

async def _distribute_logs(self, data: bytes):
got = LoggerRabbitMessage.parse_raw(data)
item = JobLog(
job_id=got.project_id,
node_id=got.node_id,
log_level=got.log_level,
messages=got.messages,
)
callback = self._log_streamers.get(item.job_id)
if callback is None:
raise LogStreamerNotRegistered(
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
f"Could not forward log because a logstreamer associated with job_id={item.job_id} was not registered"
)
await callback(item)
return True

async def register(
self, job_id: JobID, callback: Callable[[JobLog], Awaitable[None]]
):
if job_id in self._log_streamers:
raise LogStreamerRegistionConflict(
f"A stream was already connected to {job_id=}. Only a single stream can be connected at the time"
)
self._log_streamers[job_id] = callback
await self._rabbit_client.add_topics(
LoggerRabbitMessage.get_channel_name(), topics=[f"{job_id}.*"]
)

async def deregister(self, job_id: JobID):
if job_id not in self._log_streamers:
raise LogStreamerNotRegistered(f"No stream was connected to {job_id=}.")
await self._rabbit_client.remove_topics(
LoggerRabbitMessage.get_channel_name(), topics=[f"{job_id}.*"]
)
del self._log_streamers[job_id]


class LogStreamer:
def __init__(
self,
user_id: UserID,
director2_api: DirectorV2Api,
job_id: JobID,
log_distributor: LogDistributor,
):
self._user_id = user_id
self._director2_api = director2_api
self._queue: Queue[JobLog] = Queue()
self._job_id: JobID = job_id
self._log_distributor: LogDistributor = log_distributor
self._is_registered: bool = False

async def setup(self):
await self._log_distributor.register(self._job_id, self._queue.put)
self._is_registered = True

async def teardown(self):
await self._log_distributor.deregister(self._job_id)
self._is_registered = False

async def __aenter__(self):
await self.setup()
return self

async def __aexit__(self, exc_type, exc, tb):
await self.teardown()

async def _project_done(self) -> bool:
task = await self._director2_api.get_computation(self._job_id, self._user_id)
return not task.stopped is None

async def log_generator(self) -> AsyncIterable[str]:
if not self._is_registered:
raise LogStreamerNotRegistered(
f"LogStreamer for job_id={self._job_id} is not correctly registered"
)
while True:
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
while self._queue.empty():
if await self._project_done():
return
await asyncio.sleep(_SLEEP_SECONDS_BEFORE_CHECK_JOB_STATUS)
log: JobLog = await self._queue.get()
yield log.json() + _NEW_LINE
Loading