diff --git a/.gitignore b/.gitignore index 1ff9b989f..8e64e73ab 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,6 @@ logs/ # Pytest-cov .coverage + +# Jinja templates test output +tests/jinja_test_outputs/ diff --git a/app/app.py b/app/app.py index dce3941b1..fdafb3461 100644 --- a/app/app.py +++ b/app/app.py @@ -30,14 +30,12 @@ from app.dependencies import ( get_db, get_redis_client, + get_scheduler, get_websocket_connection_manager, init_and_get_db_engine, ) from app.modules.module_list import module_list -from app.types.exceptions import ( - ContentHTTPException, - GoogleAPIInvalidCredentialsError, -) +from app.types.exceptions import ContentHTTPException, GoogleAPIInvalidCredentialsError from app.types.sqlalchemy import Base from app.utils import initialization from app.utils.redis import limiter @@ -45,6 +43,9 @@ if TYPE_CHECKING: import redis + from app.types.scheduler import Scheduler + from app.types.websocket import WebsocketConnectionManager + # NOTE: We can not get loggers at the top of this file like we do in other files # as the loggers are not yet initialized @@ -352,14 +353,27 @@ async def lifespan(app: FastAPI) -> AsyncGenerator: # We expect this error to be raised if the credentials were never set before pass - ws_manager = app.dependency_overrides.get( + ws_manager: WebsocketConnectionManager = app.dependency_overrides.get( get_websocket_connection_manager, get_websocket_connection_manager, )(settings=settings) + arq_scheduler: Scheduler = app.dependency_overrides.get( + get_scheduler, + get_scheduler, + )(settings=settings) + await ws_manager.connect_broadcaster() + await arq_scheduler.start( + redis_host=settings.REDIS_HOST, + redis_port=settings.REDIS_PORT, + redis_password=settings.REDIS_PASSWORD, + _dependency_overrides=app.dependency_overrides, + ) + yield hyperion_error_logger.info("Shutting down") + await arq_scheduler.close() await ws_manager.disconnect_broadcaster() # Initialize app diff --git a/app/core/endpoints_core.py b/app/core/endpoints_core.py index dab29ef5e..a23c7371b 100644 --- a/app/core/endpoints_core.py +++ b/app/core/endpoints_core.py @@ -28,7 +28,9 @@ response_model=schemas_core.CoreInformation, status_code=200, ) -async def read_information(settings: Settings = Depends(get_settings)): +async def read_information( + settings: Settings = Depends(get_settings), +): """ Return information about Hyperion. This endpoint can be used to check if the API is up. """ diff --git a/app/core/log.py b/app/core/log.py index 6756dad04..6e1b62d4b 100644 --- a/app/core/log.py +++ b/app/core/log.py @@ -234,6 +234,7 @@ def get_config_dict(self, settings: Settings): ], "level": MINIMUM_LOG_LEVEL, }, + "scheduler": {"handlers": ["console"], "level": MINIMUM_LOG_LEVEL}, # We disable "uvicorn.access" to replace it with our custom "hyperion.access" which add custom information like the request_id "uvicorn.access": {"handlers": []}, "uvicorn.error": { @@ -245,6 +246,15 @@ def get_config_dict(self, settings: Settings): "level": MINIMUM_LOG_LEVEL, "propagate": False, }, + "arq.worker": { + "handlers": [ + "console", + "file_errors", + "matrix_errors", + ], + "level": MINIMUM_LOG_LEVEL, + "propagate": False, + }, }, } diff --git a/app/core/notification/cruds_notification.py b/app/core/notification/cruds_notification.py index e59fe8f18..c0f39d120 100644 --- a/app/core/notification/cruds_notification.py +++ b/app/core/notification/cruds_notification.py @@ -9,96 +9,6 @@ from app.core.notification.notification_types import CustomTopic, Topic -async def create_message( - message: models_notification.Message, - db: AsyncSession, -) -> None: - db.add(message) - try: - await db.commit() - except IntegrityError: - await db.rollback() - raise - - -async def create_batch_messages( - messages: list[models_notification.Message], - db: AsyncSession, -) -> None: - db.add_all(messages) - try: - await db.commit() - except IntegrityError: - await db.rollback() - raise - - -async def get_messages_by_firebase_token( - firebase_token: str, - db: AsyncSession, -) -> Sequence[models_notification.Message]: - result = await db.execute( - select(models_notification.Message).where( - models_notification.Message.firebase_device_token == firebase_token, - ), - ) - return result.scalars().all() - - -async def get_messages_by_context_and_firebase_tokens( - context: str, - firebase_tokens: list[str], - db: AsyncSession, -) -> Sequence[models_notification.Message]: - result = await db.execute( - select(models_notification.Message).where( - models_notification.Message.context == context, - models_notification.Message.firebase_device_token.in_(firebase_tokens), - ), - ) - return result.scalars().all() - - -async def remove_message_by_context_and_firebase_device_token( - context: str, - firebase_device_token: str, - db: AsyncSession, -): - await db.execute( - delete(models_notification.Message).where( - models_notification.Message.context == context, - models_notification.Message.firebase_device_token == firebase_device_token, - ), - ) - await db.commit() - - -async def remove_message_by_firebase_device_token( - firebase_device_token: str, - db: AsyncSession, -): - await db.execute( - delete(models_notification.Message).where( - models_notification.Message.firebase_device_token == firebase_device_token, - ), - ) - await db.commit() - - -async def remove_messages_by_context_and_firebase_device_tokens_list( - context: str, - tokens: list[str], - db: AsyncSession, -): - await db.execute( - delete(models_notification.Message).where( - models_notification.Message.context == context, - models_notification.Message.firebase_device_token.in_(tokens), - ), - ) - await db.commit() - - async def get_firebase_devices_by_user_id( user_id: str, db: AsyncSession, diff --git a/app/core/notification/endpoints_notification.py b/app/core/notification/endpoints_notification.py index c59f8cb5c..3452a4a25 100644 --- a/app/core/notification/endpoints_notification.py +++ b/app/core/notification/endpoints_notification.py @@ -15,9 +15,11 @@ get_db, get_notification_manager, get_notification_tool, + get_scheduler, is_user, is_user_in, ) +from app.types.scheduler import Scheduler from app.utils.communication.notifications import NotificationManager, NotificationTool router = APIRouter(tags=["Notifications"]) @@ -61,6 +63,16 @@ async def register_firebase_device( db=db, ) + user_topics = await cruds_notification.get_topic_memberships_by_user_id( + user_id=user.id, + db=db, + ) + for topic in user_topics: + await notification_manager.subscribe_tokens_to_topic( + custom_topic=CustomTopic(topic.topic), + tokens=[firebase_token], + ) + firebase_device = models_notification.FirebaseDevice( user_id=user.id, firebase_device_token=firebase_token, @@ -84,7 +96,7 @@ async def unregister_firebase_device( notification_manager: NotificationManager = Depends(get_notification_manager), ): """ - Unregister a new firebase device for the user + Unregister a firebase device for the user **The user must be authenticated to use this endpoint** """ @@ -95,46 +107,15 @@ async def unregister_firebase_device( db=db, ) - -@router.get( - "/notification/messages/{firebase_token}", - response_model=list[schemas_notification.Message], - status_code=200, -) -async def get_messages( - firebase_token: str, - db: AsyncSession = Depends(get_db), - # If we want to enable authentification for /messages/{firebase_token} endpoint, we may to uncomment the following line - # user: models_core.CoreUser = Depends(is_user()), -): - """ - Get all messages for a specific device from the user - - **The user must be authenticated to use this endpoint** - """ - firebase_device = await cruds_notification.get_firebase_devices_by_user_id_and_firebase_token( - # If we want to enable authentification for /messages/{firebase_token} endpoint, we may to uncomment the following line - firebase_token=firebase_token, - db=db, # user_id=user.id, - ) - - if firebase_device is None: - raise HTTPException( - status_code=404, - detail="Device not found for user", # {user.id}" - ) - - messages = await cruds_notification.get_messages_by_firebase_token( - firebase_token=firebase_token, - db=db, - ) - - await cruds_notification.remove_message_by_firebase_device_token( - firebase_device_token=firebase_token, + user_topics = await cruds_notification.get_topic_memberships_by_user_id( + user_id=user.id, db=db, ) - - return messages + for topic in user_topics: + await notification_manager.unsubscribe_tokens_to_topic( + custom_topic=CustomTopic(topic.topic), + tokens=[firebase_token], + ) @router.post( @@ -268,12 +249,9 @@ async def send_notification( **Only admins can use this endpoint** """ message = schemas_notification.Message( - context="notification-test", - is_visible=True, title="Test notification", content="Ceci est un test de notification", - # The notification will expire in 3 days - expire_on=datetime.now(UTC) + timedelta(days=3), + action_module="test", ) await notification_tool.send_notification_to_user( user_id=user.id, @@ -288,6 +266,7 @@ async def send_notification( async def send_future_notification( user: models_core.CoreUser = Depends(is_user_in(GroupType.admin)), notification_tool: NotificationTool = Depends(get_notification_tool), + scheduler: Scheduler = Depends(get_scheduler), ): """ Send ourself a test notification. @@ -295,17 +274,68 @@ async def send_future_notification( **Only admins can use this endpoint** """ message = schemas_notification.Message( - context="future-notification-test", - is_visible=True, title="Test notification future", content="Ceci est un test de notification future", - # The notification will expire in 3 days - expire_on=datetime.now(UTC) + timedelta(days=3), - delivery_datetime=datetime.now(UTC) + timedelta(minutes=3), + action_module="test", ) - await notification_tool.send_notification_to_user( - user_id=user.id, + await notification_tool.send_notification_to_users( + user_ids=[user.id], + message=message, + defer_date=datetime.now(UTC) + timedelta(seconds=10), + scheduler=scheduler, + job_id="testtt", + ) + + +@router.post( + "/notification/send/topic", + status_code=201, +) +async def send_notification_topic( + user: models_core.CoreUser = Depends(is_user_in(GroupType.admin)), + notification_tool: NotificationTool = Depends(get_notification_tool), +): + """ + Send ourself a test notification. + + **Only admins can use this endpoint** + """ + message = schemas_notification.Message( + title="Test notification topic", + content="Ceci est un test de notification topic", + action_module="test", + ) + await notification_tool.send_notification_to_topic( + custom_topic=CustomTopic.from_str("test"), + message=message, + ) + + +@router.post( + "/notification/send/topic/future", + status_code=201, +) +async def send_future_notification_topic( + user: models_core.CoreUser = Depends(is_user_in(GroupType.admin)), + notification_tool: NotificationTool = Depends(get_notification_tool), + scheduler: Scheduler = Depends(get_scheduler), +): + """ + Send ourself a test notification. + + **Only admins can use this endpoint** + """ + message = schemas_notification.Message( + title="Test notification future topic", + content="Ceci est un test de notification future topic", + action_module="test", + ) + await notification_tool.send_notification_to_topic( + custom_topic=CustomTopic.from_str("test"), message=message, + defer_date=datetime.now(UTC) + timedelta(seconds=10), + job_id="test26", + scheduler=scheduler, ) diff --git a/app/core/notification/notification_types.py b/app/core/notification/notification_types.py index 745b005dc..ae081524b 100644 --- a/app/core/notification/notification_types.py +++ b/app/core/notification/notification_types.py @@ -15,6 +15,7 @@ class Topic(str, Enum): raffle = "raffle" vote = "vote" ph = "ph" + test = "test" class CustomTopic: diff --git a/app/core/notification/schemas_notification.py b/app/core/notification/schemas_notification.py index 633b3c164..871464355 100644 --- a/app/core/notification/schemas_notification.py +++ b/app/core/notification/schemas_notification.py @@ -1,38 +1,13 @@ -from datetime import datetime - -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, Field class Message(BaseModel): - # A context represents a topic (ex: a loan), - # there can only by one notification per context (ex: the loan should be returned, the loan is overdue or has been returned) - context: str = Field( - description="A context represents a topic. There can only by one notification per context.", - ) - # `firebase_device_token` is only contained in the database but is not returned by the API - - is_visible: bool = Field( - description="A message can be visible or not, if it is not visible, it should only trigger an action", - ) - title: str | None = None content: str | None = None - action_module: str | None = Field( - None, - description="An identifier for the module that should be triggered when the notification is clicked", - ) - action_table: str | None = None - - delivery_datetime: datetime | None = Field( - None, - description="The date the notification should be shown", - ) - expire_on: datetime - model_config = ConfigDict(from_attributes=True) + action_module: str class FirebaseDevice(BaseModel): user_id: str = Field(description="The Hyperion user id") firebase_device_token: str = Field("Firebase device token") - model_config = ConfigDict(from_attributes=True) diff --git a/app/dependencies.py b/app/dependencies.py index b7fb7d934..1531408f9 100644 --- a/app/dependencies.py +++ b/app/dependencies.py @@ -27,15 +27,13 @@ async def get_users(db: AsyncSession = Depends(get_db)): from app.core.groups.groups_type import AccountType, GroupType, get_ecl_account_types from app.core.payment.payment_tool import PaymentTool from app.modules.raid.utils.drive.drive_file_manager import DriveFileManager +from app.types.scheduler import OfflineScheduler, Scheduler from app.types.scopes_type import ScopeType from app.types.websocket import WebsocketConnectionManager from app.utils.auth import auth_utils from app.utils.communication.notifications import NotificationManager, NotificationTool from app.utils.redis import connect -from app.utils.tools import ( - is_user_external, - is_user_member_of_an_allowed_group, -) +from app.utils.tools import is_user_external, is_user_member_of_an_allowed_group # We could maybe use hyperion.security hyperion_access_logger = logging.getLogger("hyperion.access") @@ -46,6 +44,8 @@ async def get_users(db: AsyncSession = Depends(get_db)): ) # Is None if the redis client is not instantiated, is False if the redis client is instancied but not connected, is a redis.Redis object if the redis client is connected +scheduler: Scheduler | None = None + websocket_connection_manager: WebsocketConnectionManager | None = None engine: AsyncEngine | None = ( @@ -164,6 +164,13 @@ def get_redis_client( return redis_client +def get_scheduler(settings: Settings = Depends(get_settings)) -> Scheduler: + global scheduler + if scheduler is None: + scheduler = Scheduler() if settings.REDIS_HOST != "" else OfflineScheduler() + return scheduler + + def get_websocket_connection_manager( settings: Settings = Depends(get_settings), ): @@ -196,6 +203,7 @@ def get_notification_tool( background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), notification_manager: NotificationManager = Depends(get_notification_manager), + scheduler: Scheduler = Depends(get_scheduler), ) -> NotificationTool: """ Dependency that returns a notification tool, allowing to send push notification as a background tasks. diff --git a/app/modules/advert/endpoints_advert.py b/app/modules/advert/endpoints_advert.py index 093859a93..a52958cc4 100644 --- a/app/modules/advert/endpoints_advert.py +++ b/app/modules/advert/endpoints_advert.py @@ -1,6 +1,6 @@ import logging import uuid -from datetime import UTC, datetime, timedelta +from datetime import UTC, datetime from fastapi import Depends, File, HTTPException, Query, UploadFile from fastapi.responses import FileResponse @@ -266,12 +266,9 @@ async def create_advert( except ValueError as error: raise HTTPException(status_code=400, detail=str(error)) message = Message( - context=f"advert-new-{id}", - is_visible=True, title=f"📣 Annonce - {result.title}", content=result.content, - # The notification will expire in 3 days - expire_on=datetime.now(UTC) + timedelta(days=3), + action_module=module.root, ) await notification_tool.send_notification_to_topic( diff --git a/app/modules/amap/endpoints_amap.py b/app/modules/amap/endpoints_amap.py index 24eef6478..c797eec55 100644 --- a/app/modules/amap/endpoints_amap.py +++ b/app/modules/amap/endpoints_amap.py @@ -1,6 +1,6 @@ import logging import uuid -from datetime import UTC, datetime, timedelta +from datetime import UTC, datetime from fastapi import Depends, HTTPException, Response from redis import Redis @@ -719,12 +719,9 @@ async def open_ordering_of_delivery( await cruds_amap.open_ordering_of_delivery(delivery_id=delivery_id, db=db) message = Message( - context=f"amap-open-ordering-{delivery_id}", - is_visible=True, title="🛒 AMAP - Nouvelle livraison disponible", content="Viens commander !", - # The notification will expire in 3 days - expire_on=datetime.now(UTC) + timedelta(days=3), + action_module="amap", ) await notification_tool.send_notification_to_topic( custom_topic=CustomTopic(Topic.amap), @@ -904,12 +901,9 @@ async def create_cash_of_user( ) message = Message( - context=f"amap-cash-{user_id}", - is_visible=True, title="AMAP - Solde mis à jour", content=f"Votre nouveau solde est de {cash} €.", - # The notification will expire in 3 days - expire_on=datetime.now(UTC) + timedelta(days=3), + action_module="amap", ) await notification_tool.send_notification_to_user( user_id=user_id, diff --git a/app/modules/booking/endpoints_booking.py b/app/modules/booking/endpoints_booking.py index 35eb5cc3c..f5d06bd19 100644 --- a/app/modules/booking/endpoints_booking.py +++ b/app/modules/booking/endpoints_booking.py @@ -1,6 +1,6 @@ import logging import uuid -from datetime import UTC, datetime, timedelta +from datetime import UTC, datetime from zoneinfo import ZoneInfo from fastapi import Depends, HTTPException @@ -277,12 +277,9 @@ async def create_booking( if manager_group: message = Message( - context=f"booking-new-{id}", - is_visible=True, title="📅 Réservations - Nouvelle réservation", content=content, - # The notification will expire in 3 days - expire_on=datetime.now(UTC) + timedelta(days=3), + action_module="booking", ) await notification_tool.send_notification_to_users( diff --git a/app/modules/cinema/endpoints_cinema.py b/app/modules/cinema/endpoints_cinema.py index 8fa931e4d..99b4ff5ce 100644 --- a/app/modules/cinema/endpoints_cinema.py +++ b/app/modules/cinema/endpoints_cinema.py @@ -16,6 +16,7 @@ get_db, get_notification_tool, get_request_id, + get_scheduler, get_settings, is_user_a_member, is_user_in, @@ -23,6 +24,7 @@ from app.modules.cinema import cruds_cinema, schemas_cinema from app.types.content_type import ContentType from app.types.module import Module +from app.types.scheduler import Scheduler from app.utils.communication.date_manager import ( get_date_day, get_date_month, @@ -120,6 +122,7 @@ async def create_session( db: AsyncSession = Depends(get_db), user: models_core.CoreUser = Depends(is_user_in(GroupType.cinema)), notification_tool: NotificationTool = Depends(get_notification_tool), + scheduler: Scheduler = Depends(get_scheduler), ): db_session = schemas_cinema.CineSessionComplete( id=str(uuid.uuid4()), @@ -141,19 +144,22 @@ async def create_session( for next_session in next_week_sessions: message_content += f"{get_date_day(next_session.start)} {next_session.start.day} {get_date_month(next_session.start)} - {next_session.name}\n" message = Message( - # We use sunday date as context to avoid sending the recap twice - context=f"cinema-recap-{sunday}", - is_visible=True, title="🎬 Cinéma - Programme de la semaine", content=message_content, - delivery_datetime=sunday, - # The notification will expire the next sunday - expire_on=sunday + timedelta(days=7), + action_module="cinema", + ) + + await notification_tool.cancel_notification( + scheduler=scheduler, + job_id=f"cinema_weekly_{sunday}", ) await notification_tool.send_notification_to_topic( custom_topic=CustomTopic(topic=Topic.cinema), message=message, + scheduler=scheduler, + defer_date=sunday, + job_id=f"cinema_weekly_{sunday}", ) return result diff --git a/app/modules/loan/endpoints_loan.py b/app/modules/loan/endpoints_loan.py index 995ed1056..9d327590e 100644 --- a/app/modules/loan/endpoints_loan.py +++ b/app/modules/loan/endpoints_loan.py @@ -12,11 +12,13 @@ from app.dependencies import ( get_db, get_notification_tool, + get_scheduler, is_user_a_member, is_user_in, ) from app.modules.loan import cruds_loan, models_loan, schemas_loan from app.types.module import Module +from app.types.scheduler import Scheduler from app.utils.communication.notifications import NotificationTool from app.utils.tools import ( is_group_id_valid, @@ -499,6 +501,7 @@ async def create_loan( db: AsyncSession = Depends(get_db), user: models_core.CoreUser = Depends(is_user_a_member), notification_tool: NotificationTool = Depends(get_notification_tool), + scheduler: Scheduler = Depends(get_scheduler), ): """ Create a new loan in database and add the requested items @@ -613,11 +616,9 @@ async def create_loan( ] message = Message( - context=f"loan-new-{loan.id}-begin-notif", - is_visible=True, title="📦 Nouveau prêt", content=f"Un prêt a été enregistré pour l'association {loan.loaner.name}", - expire_on=datetime.now(UTC) + timedelta(days=3), + action_module="loan", ) await notification_tool.send_notification_to_user( user_id=loan.borrower_id, @@ -626,20 +627,19 @@ async def create_loan( delivery_time = time(11, 00, 00, tzinfo=UTC) delivery_datetime = datetime.combine(loan.end, delivery_time, tzinfo=UTC) - expire_on_date = loan.end + timedelta(days=30) - expire_on_datetime = datetime.combine(expire_on_date, delivery_time, tzinfo=UTC) + message = Message( - context=f"loan-new-{loan.id}-end-notif", - is_visible=True, title="📦 Prêt arrivé à échéance", content=f"N'oublie pas de rendre ton prêt à l'association {loan.loaner.name} !", - delivery_datetime=delivery_datetime, - expire_on=expire_on_datetime, + action_module="loan", ) - await notification_tool.send_notification_to_user( - user_id=loan.borrower_id, + await notification_tool.send_notification_to_users( + user_ids=[loan.borrower_id], message=message, + scheduler=scheduler, + defer_date=delivery_datetime, + job_id=f"loan_start_{loan.id}", ) return schemas_loan.Loan(items_qty=items_qty_ret, **loan.__dict__) @@ -827,6 +827,7 @@ async def return_loan( loan_id: str, db: AsyncSession = Depends(get_db), user: models_core.CoreUser = Depends(is_user_a_member), + scheduler: Scheduler = Depends(get_scheduler), notification_tool: NotificationTool = Depends(get_notification_tool), ): """ @@ -872,17 +873,9 @@ async def return_loan( returned=True, returned_date=datetime.now(UTC), ) - - message = Message( - context=f"loan-new-{loan.id}-end-notif", - is_visible=False, - title="", - content="", - expire_on=datetime.now(UTC) + timedelta(days=3), - ) - await notification_tool.send_notification_to_user( - user_id=loan.borrower_id, - message=message, + await notification_tool.cancel_notification( + scheduler=scheduler, + job_id=f"loan_end_{loan.id}", ) @@ -896,6 +889,7 @@ async def extend_loan( db: AsyncSession = Depends(get_db), user: models_core.CoreUser = Depends(is_user_a_member), notification_tool: NotificationTool = Depends(get_notification_tool), + scheduler: Scheduler = Depends(get_scheduler), ): """ A new `end` date or an extended `duration` can be provided. If the two are provided, only `end` will be used. @@ -913,7 +907,7 @@ async def extend_loan( status_code=400, detail="Invalid loan_id", ) - + end = loan.end # The user should be a member of the loaner's manager group if not is_user_member_of_an_allowed_group(user, [loan.loaner.group_manager_id]): raise HTTPException( @@ -922,12 +916,14 @@ async def extend_loan( ) if loan_extend.end is not None: + end = loan_extend.end loan_update = schemas_loan.LoanUpdate( - end=loan_extend.end, + end=end, ) elif loan_extend.duration is not None: + end = loan.end + timedelta(seconds=loan_extend.duration) loan_update = schemas_loan.LoanUpdate( - end=loan.end + timedelta(seconds=loan_extend.duration), + end=end, ) await cruds_loan.update_loan( @@ -935,17 +931,34 @@ async def extend_loan( loan_update=loan_update, db=db, ) - # same context so the first notification will be removed - message = Message( - context=f"loan-new-{loan.id}-end-notif", - is_visible=True, - title="📦 Prêt arrivé à échéance", - content=f"N'oublie pas de rendre ton prêt à l'association {loan.loaner.name} ! ", - delivery_datetime=loan.end, - expire_on=loan.end + timedelta(days=30), + await notification_tool.cancel_notification( + scheduler=scheduler, + job_id=f"loan_end_{loan.id}", ) + message = Message( + title="📦 Prêt prolongé", + content=f"Ton prêt à l'association {loan.loaner.name} à bien été renouvellé !", + action_module="loan", + ) await notification_tool.send_notification_to_user( user_id=loan.borrower_id, message=message, ) + + delivery_time = time(11, 00, 00, tzinfo=UTC) + delivery_datetime = datetime.combine(end, delivery_time, tzinfo=UTC) + + message = Message( + title="📦 Prêt arrivé à échéance", + content=f"N'oublie pas de rendre ton prêt à l'association {loan.loaner.name} !", + action_module="loan", + ) + + await notification_tool.send_notification_to_users( + user_ids=[loan.borrower_id], + message=message, + scheduler=scheduler, + defer_date=delivery_datetime, + job_id=f"loan_end_{loan.id}", + ) diff --git a/app/modules/ph/endpoints_ph.py b/app/modules/ph/endpoints_ph.py index c5941fc58..d3f1da40a 100644 --- a/app/modules/ph/endpoints_ph.py +++ b/app/modules/ph/endpoints_ph.py @@ -13,12 +13,14 @@ get_db, get_notification_tool, get_request_id, + get_scheduler, is_user_a_member, is_user_in, ) from app.modules.ph import cruds_ph, models_ph, schemas_ph from app.types.content_type import ContentType from app.types.module import Module +from app.types.scheduler import Scheduler from app.utils.communication.notifications import NotificationTool from app.utils.tools import ( delete_file_from_data, @@ -105,6 +107,7 @@ async def create_paper( db: AsyncSession = Depends(get_db), user: models_core.CoreUser = Depends(is_user_in(GroupType.ph)), notification_tool: NotificationTool = Depends(get_notification_tool), + scheduler: Scheduler = Depends(get_scheduler), ): """Create a new paper.""" @@ -124,21 +127,25 @@ async def create_paper( # We only want to send a notification if the paper was released less than a month ago. if paper_db.release_date >= now.date() - timedelta(days=30): message = Message( - context=f"ph-{paper_db.id}", - is_visible=True, title=f"📗 PH - {paper_db.name}", content="Un nouveau journal est disponible! 🎉", - delivery_datetime=datetime.combine( - paper_db.release_date, - time(hour=8, tzinfo=UTC), - ), - # The notification will expire in 10 days - expire_on=now + timedelta(days=10), - ) - await notification_tool.send_notification_to_topic( - custom_topic=CustomTopic(topic=Topic.ph), - message=message, + action_module="ph", ) + if paper_db.release_date == now.date(): + await notification_tool.send_notification_to_topic( + custom_topic=CustomTopic(topic=Topic.ph), + message=message, + ) + else: + delivery_time = time(11, 00, 00, tzinfo=UTC) + release_date = datetime.combine(paper_db.release_date, delivery_time) + await notification_tool.send_notification_to_topic( + custom_topic=CustomTopic(topic=Topic.ph), + message=message, + scheduler=scheduler, + defer_date=release_date, + job_id=f"ph_{paper_db.id}", + ) return await cruds_ph.create_paper(paper=paper_db, db=db) except ValueError as error: @@ -202,7 +209,7 @@ async def get_cover( ) return get_file_from_data( - default_asset="assets/images/default_cover.jpg", + default_asset="assets/images/default_cover.jpeg", directory="ph/cover", filename=str(paper_id), ) diff --git a/app/types/exceptions.py b/app/types/exceptions.py index fa1619753..29687babd 100644 --- a/app/types/exceptions.py +++ b/app/types/exceptions.py @@ -126,3 +126,8 @@ def __init__(self, email: str): super().__init__( f"An account with the email {email} already exist", ) + + +class SchedulerNotStartedError(Exception): + def __init__(self): + super().__init__("Scheduler not started") diff --git a/app/types/scheduler.py b/app/types/scheduler.py new file mode 100644 index 000000000..4b8e7bcc8 --- /dev/null +++ b/app/types/scheduler.py @@ -0,0 +1,230 @@ +import asyncio +import logging +from collections.abc import AsyncGenerator, Callable, Coroutine +from datetime import datetime +from inspect import signature +from typing import TYPE_CHECKING, Any + +from arq.connections import RedisSettings +from arq.jobs import Job +from arq.typing import WorkerSettingsBase +from arq.worker import create_worker +from sqlalchemy.ext.asyncio import AsyncSession + +from app import dependencies +from app.types.exceptions import SchedulerNotStartedError +from app.utils.tools import execute_async_or_sync_method + +if TYPE_CHECKING: + from arq import Worker + +scheduler_logger = logging.getLogger("scheduler") + + +async def run_task( + ctx: dict[Any, Any] | None, + job_function: Callable[..., Any], + _dependency_overrides: dict[Callable[..., Any], Callable[..., Any]], + **kwargs, +): + """ + Execute the job_function with the provided kwargs + + `job_function` may be a coroutine function or a regular function + + The method will inject an `AsyncSession` object, using `get_db`, in the kwargs if the job_function requires it + + NOTE: As a consequence, it is not possible to plan a job using a custom AsyncSession. + Passing a custom AsyncSession would not be advisable as it would require the + db connection to remain open for the duration of the job. + """ + scheduler_logger.debug(f"Job function consumed {job_function}") + + require_db_for_kwargs: list[str] = [] + sign = signature(job_function) + for param in sign.parameters.values(): + # See https://docs.python.org/3/library/inspect.html#inspect.Parameter.annotation + if param.annotation is AsyncSession: + # We iterate over the parameters of the job_function + # If we find a AsyncSession object, we want to inject the dependency + require_db_for_kwargs.append(param.name) + else: + # We could support other types of dependencies + pass + + # We distinguish between methods requiring a db and those that don't + # to only open the db connection when needed + if require_db_for_kwargs: + # `get_db` is the real dependency, defined in dependency.py + # `_get_db` may be the real dependency or an override + _get_db: Callable[[], AsyncGenerator[AsyncSession, None]] = ( + _dependency_overrides.get( + dependencies.get_db, + dependencies.get_db, + ) + ) + + async for db in _get_db(): + for name in require_db_for_kwargs: + kwargs[name] = db + await execute_async_or_sync_method(job_function, **kwargs) + else: + await execute_async_or_sync_method(job_function, **kwargs) + + +class Scheduler: + """ + An [arq](https://arq-docs.helpmanual.io/) scheduler. + The wrapper is intended to be used inside a FastAPI worker. + + The scheduler use a Redis database to store planned jobs. + """ + + # See https://github.com/fastapi/fastapi/discussions/9143#discussioncomment-5157572 + + # Pointer to the app dependency overrides dict + _dependency_overrides: dict[Callable[..., Any], Callable[..., Any]] + + def __init__(self): + # ArqWorker, in charge of scheduling and executing tasks + self.worker: Worker | None = None + # Task will contain the asyncio task that runs the worker + self.task: asyncio.Task | None = None + + async def start( + self, + redis_host: str, + redis_port: int, + redis_password: str | None, + _dependency_overrides: dict[Callable[..., Any], Callable[..., Any]], + **kwargs, + ): + """ + Parameters: + - redis_host: str + - redis_port: int + - redis_password: str + - _dependency_overrides: dict[Callable[..., Any], Callable[..., Any]] a pointer to the app dependency overrides dict + """ + + class ArqWorkerSettings(WorkerSettingsBase): + functions = [run_task] + allow_abort_jobs = True + keep_result_forever = True + redis_settings = RedisSettings( + host=redis_host, + port=redis_port, + password=redis_password or "", + ) + + # We pass handle_signals=False to avoid arq from handling signals + # See https://github.com/python-arq/arq/issues/182 + self.worker = create_worker( + ArqWorkerSettings, + handle_signals=False, + **kwargs, + ) + # We run the worker in an asyncio task + self.task = asyncio.create_task(self.worker.async_run()) + + self._dependency_overrides = _dependency_overrides + + scheduler_logger.info("Scheduler started") + + async def close(self): + # If the worker was started, we close it + if self.worker is not None: + await self.worker.close() + + async def queue_job_defer_to( + self, + job_function: Callable[..., Coroutine[Any, Any, Any]], + job_id: str, + defer_date: datetime, + **kwargs, + ): + """ + Queue a job to execute job_function at defer_date + job_id will allow to abort if needed + """ + if self.worker is None: + raise SchedulerNotStartedError + + job = await self.worker.pool.enqueue_job( + "run_task", + job_function=job_function, + _job_id=job_id, + _defer_until=defer_date, + _dependency_overrides=self._dependency_overrides, + **kwargs, + ) + scheduler_logger.debug(f"Job {job_id} queued {job}") + + async def cancel_job(self, job_id: str): + """ + cancel a queued job based on its job_id + """ + if self.worker is None: + raise SchedulerNotStartedError + job = Job(job_id, redis=self.worker.pool) + scheduler_logger.debug(f"Job aborted {job}") + await job.abort() + + +class OfflineScheduler(Scheduler): + """ + A Dummy implementation of the Scheduler to allow to run the server without a REDIS config + """ + + # See https://github.com/fastapi/fastapi/discussions/9143#discussioncomment-5157572 + + def __init__(self): + # ArqWorker, in charge of scheduling and executing tasks + self.worker: Worker | None = None + # Task will contain the asyncio task that runs the worker + self.task: asyncio.Task | None = None + # Pointer to the get_db dependency + + async def start( + self, + redis_host: str, + redis_port: int, + redis_password: str | None, + _dependency_overrides: dict[Callable[..., Any], Callable[..., Any]], + **kwargs, + ): + """ + Parameters: + - redis_host: str + - redis_port: int + - redis_password: str + - _dependency_overrides: dict[Callable[..., Any], Callable[..., Any]] a pointer to the app dependency overrides dict + """ + self._dependency_overrides = _dependency_overrides + + scheduler_logger.info("OfflineScheduler started") + + async def close(self): + # If the worker was started, we close it + pass + + async def queue_job_defer_to( + self, + job_function: Callable[..., Coroutine[Any, Any, Any]], + job_id: str, + defer_date: datetime, + **kwargs, + ): + """ + Queue a job to execute job_function at defer_date + job_id will allow to abort if needed + """ + scheduler_logger.debug( + f"Job {job_id} queued in OfflineScheduler with defer to {defer_date}", + ) + + async def cancel_job(self, job_id: str): + """ + cancel a queued job based on its job_id + """ + scheduler_logger.debug(f"Job {job_id} aborted in OfflineScheduler") diff --git a/app/utils/communication/notifications.py b/app/utils/communication/notifications.py index ad83015bb..07dfcf37e 100644 --- a/app/utils/communication/notifications.py +++ b/app/utils/communication/notifications.py @@ -1,4 +1,5 @@ import logging +from datetime import datetime import firebase_admin from fastapi import BackgroundTasks @@ -9,6 +10,7 @@ from app.core.notification import cruds_notification, models_notification from app.core.notification.notification_types import CustomTopic from app.core.notification.schemas_notification import Message +from app.types.scheduler import Scheduler hyperion_error_logger = logging.getLogger("hyperion.error") @@ -75,6 +77,7 @@ async def _send_firebase_push_notification_by_tokens( self, db: AsyncSession, tokens: list[str], + message_content: Message, ): """ Send a firebase push notification to a list of tokens. @@ -96,27 +99,20 @@ async def _send_firebase_push_notification_by_tokens( await self._send_firebase_push_notification_by_tokens( tokens=tokens[500:], db=db, + message_content=message_content, ) tokens = tokens[:500] - # We may pass a notification object along the data try: - # Set high priority for android, and background notification for iOS - # This allow to ensure that the notification will be processed in the background - # See https://firebase.google.com/docs/cloud-messaging/concept-options#setting-the-priority-of-a-message - # And https://developer.apple.com/documentation/usernotifications/setting_up_a_remote_notification_server/pushing_background_updates_to_your_app - androidconfig = messaging.AndroidConfig(priority="high") - apnsconfig = messaging.APNSConfig( - headers={"apns-priority": "10", "apns-push-type": "background"}, - payload=messaging.APNSPayload( - aps=messaging.Aps(content_available=True), - ), - ) message = messaging.MulticastMessage( tokens=tokens, - android=androidconfig, - apns=apnsconfig, + data={"action_module": message_content.action_module}, + notification=messaging.Notification( + title=message_content.title, + body=message_content.content, + ), ) + result = messaging.send_each_for_multicast(message) except Exception: hyperion_error_logger.exception( @@ -129,45 +125,62 @@ async def _send_firebase_push_notification_by_tokens( db=db, ) - async def _send_firebase_trigger_notification_by_tokens( + def _send_firebase_push_notification_by_topic( self, - db: AsyncSession, - tokens: list[str], + custom_topic: CustomTopic, + message_content: Message, ): """ - Send a firebase trigger notification to a list of tokens. - This approach let the application know that a new notification is available, - without sending the content of the notification. - This is better for privacy and RGPD compliance. + Send a firebase push notification for a given topic. + Prefer using `self._send_firebase_trigger_notification_by_topic` to send a trigger notification. """ - # Push without any data or notification may not be processed by the app in the background. - # We thus need to send a data object with a dummy key to make sure the notification is processed. - # See https://stackoverflow.com/questions/59298850/firebase-messaging-background-message-handler-method-not-called-when-the-app - await self._send_firebase_push_notification_by_tokens(tokens=tokens, db=db) - async def _add_message_for_user_in_database( + if not self.use_firebase: + return + message = messaging.Message( + topic=custom_topic.to_str(), + notification=messaging.Notification( + title=message_content.title, + body=message_content.content, + ), + ) + try: + messaging.send(message) + except messaging.FirebaseError: + hyperion_error_logger.exception( + f"Notification: Unable to send firebase notification for topic {custom_topic}", + ) + raise + + async def subscribe_tokens_to_topic( self, - message: Message, + custom_topic: CustomTopic, tokens: list[str], - db: AsyncSession, - ) -> None: - message_models = [ - models_notification.Message( - firebase_device_token=token, - **message.model_dump(), + ): + """ + Subscribe a list of tokens to a given topic. + """ + if not self.use_firebase: + return + + response = messaging.subscribe_to_topic(tokens, custom_topic.to_str()) + if response.failure_count > 0: + hyperion_error_logger.info( + f"Notification: Failed to subscribe to topic {custom_topic} due to {[error.reason for error in response.errors]}", ) - for token in tokens - ] - # We need to remove old messages with the same context and token - # as there can only be one message per context and token - await cruds_notification.remove_messages_by_context_and_firebase_device_tokens_list( - context=message.context, - tokens=tokens, - db=db, - ) + async def unsubscribe_tokens_to_topic( + self, + custom_topic: CustomTopic, + tokens: list[str], + ): + """ + Unsubscribe a list of tokens to a given topic. + """ + if not self.use_firebase: + return - await cruds_notification.create_batch_messages(messages=message_models, db=db) + messaging.unsubscribe_from_topic(tokens, custom_topic.to_str()) async def send_notification_to_users( self, @@ -196,21 +209,11 @@ async def send_notification_to_users( ) ) - if len(firebase_device_tokens) == 0: - hyperion_error_logger.warning( - f"Notification: No firebase device token found for the message {message.title} {message.content}", - ) - - await self._add_message_for_user_in_database( - message=message, - tokens=firebase_device_tokens, - db=db, - ) - try: - await self._send_firebase_trigger_notification_by_tokens( + await self._send_firebase_push_notification_by_tokens( tokens=firebase_device_tokens, db=db, + message_content=message, ) except Exception as error: hyperion_error_logger.warning( @@ -236,15 +239,15 @@ async def send_notification_to_topic( ) return - user_ids = await cruds_notification.get_user_ids_by_topic( - custom_topic=custom_topic, - db=db, - ) - await self.send_notification_to_users( - user_ids=user_ids, - message=message, - db=db, - ) + try: + self._send_firebase_push_notification_by_topic( + custom_topic=custom_topic, + message_content=message, + ) + except Exception as error: + hyperion_error_logger.warning( + f"Notification: Unable to send firebase notification for topic {custom_topic}: {error}", + ) async def subscribe_user_to_topic( self, @@ -274,6 +277,14 @@ async def subscribe_user_to_topic( topic_membership=topic_membership, db=db, ) + tokens = await cruds_notification.get_firebase_tokens_by_user_ids( + user_ids=[user_id], + db=db, + ) + await self.subscribe_tokens_to_topic( + custom_topic=custom_topic, + tokens=tokens, + ) async def unsubscribe_user_to_topic( self, @@ -289,6 +300,11 @@ async def unsubscribe_user_to_topic( user_id=user_id, db=db, ) + tokens = await cruds_notification.get_firebase_tokens_by_user_ids( + user_ids=[user_id], + db=db, + ) + await self.unsubscribe_tokens_to_topic(custom_topic=custom_topic, tokens=tokens) class NotificationTool: @@ -305,17 +321,51 @@ def __init__( background_tasks: BackgroundTasks, notification_manager: NotificationManager, db: AsyncSession, + # scheduler: Scheduler, ): self.background_tasks = background_tasks self.notification_manager = notification_manager self.db = db + # self.scheduler = scheduler - async def send_notification_to_users(self, user_ids: list[str], message: Message): - self.background_tasks.add_task( + async def send_notification_to_users( + self, + user_ids: list[str], + message: Message, + scheduler: Scheduler | None = None, + defer_date: datetime | None = None, + job_id: str | None = None, + ): + if defer_date is not None and scheduler is not None and job_id is not None: + await self.send_future_notification_to_users_defer_to( + user_ids=user_ids, + message=message, + scheduler=scheduler, + defer_date=defer_date, + job_id=job_id, + ) + else: + self.background_tasks.add_task( + self.notification_manager.send_notification_to_users, + user_ids=user_ids, + message=message, + db=self.db, + ) + + async def send_future_notification_to_users_defer_to( + self, + user_ids: list[str], + message: Message, + scheduler: Scheduler, + defer_date: datetime, + job_id: str, + ): + await scheduler.queue_job_defer_to( self.notification_manager.send_notification_to_users, user_ids=user_ids, message=message, - db=self.db, + job_id=job_id, + defer_date=defer_date, ) async def send_notification_to_user( @@ -332,10 +382,45 @@ async def send_notification_to_topic( self, custom_topic: CustomTopic, message: Message, + scheduler: Scheduler | None = None, + defer_date: datetime | None = None, + job_id: str | None = None, ): - self.background_tasks.add_task( + if defer_date is not None and scheduler is not None and job_id is not None: + await self.send_future_notification_to_topic_defer_to( + custom_topic=custom_topic, + message=message, + scheduler=scheduler, + defer_date=defer_date, + job_id=job_id, + ) + else: + self.background_tasks.add_task( + self.notification_manager.send_notification_to_topic, + custom_topic=custom_topic, + message=message, + db=self.db, + ) + + async def send_future_notification_to_topic_defer_to( + self, + custom_topic: CustomTopic, + message: Message, + scheduler: Scheduler, + defer_date: datetime, + job_id: str, + ): + await scheduler.queue_job_defer_to( self.notification_manager.send_notification_to_topic, custom_topic=custom_topic, message=message, - db=self.db, + job_id=job_id, + defer_date=defer_date, ) + + async def cancel_notification( + self, + scheduler: Scheduler, + job_id: str, + ): + await scheduler.cancel_job(job_id=job_id) diff --git a/app/utils/redis.py b/app/utils/redis.py index 44d78aa5d..ad6d98440 100644 --- a/app/utils/redis.py +++ b/app/utils/redis.py @@ -8,6 +8,7 @@ def connect(settings: Settings) -> redis.Redis | bool: host=settings.REDIS_HOST, port=settings.REDIS_PORT, password=settings.REDIS_PASSWORD, + socket_keepalive=True, ) redis_client.ping() # Test the connection diff --git a/app/utils/tools.py b/app/utils/tools.py index ec5ab5844..7b0077ad4 100644 --- a/app/utils/tools.py +++ b/app/utils/tools.py @@ -4,9 +4,10 @@ import re import secrets import unicodedata -from collections.abc import Sequence +from collections.abc import Callable, Sequence +from inspect import iscoroutinefunction from pathlib import Path -from typing import TYPE_CHECKING, TypeVar +from typing import TYPE_CHECKING, Any, TypeVar import aiofiles import fitz @@ -493,3 +494,16 @@ async def create_and_send_email_migration( hyperion_security_logger.info( f"You can confirm your new email address by clicking the following link: {settings.CLIENT_URL}users/migrate-mail-confirm?token={confirmation_token}", ) + + +async def execute_async_or_sync_method( + job_function: Callable[..., Any], + **kwargs, +): + """ + Execute the job_function with the provided kwargs, either as a coroutine or a regular function. + """ + if iscoroutinefunction(job_function): + return await job_function(**kwargs) + else: + return job_function(**kwargs) diff --git a/assets/templates/README.md b/assets/templates/README.md new file mode 100644 index 000000000..b0deebc00 --- /dev/null +++ b/assets/templates/README.md @@ -0,0 +1,12 @@ +# Email Jinja Templates + +The directory contains the templates used in the automatic mails sent by Hyperion. +Each mail is based on the `base_mail.html` template. + +## Adding a new mail template + +To add a new mail template, simply copy the `custom_mail_template.html` and modify it. + +## Testing your template + +To test how the templates render, you can run the Python file `tests/template_tester.py`. diff --git a/assets/templates/account_exists_mail.html b/assets/templates/account_exists_mail.html index fc23099a2..712e41cdb 100644 --- a/assets/templates/account_exists_mail.html +++ b/assets/templates/account_exists_mail.html @@ -1,33 +1,23 @@ - +{% extends "base_mail.html" %} -
- - -- Tu as demandé à créer un compte, mais ton adresse mail est déjà associée à un compte existant. Si tu as oublié ton mot de passe, tu peux utiliser la fonctionnalité de - mot de passe oublié. +{% block french_message %} +
+ Tu as demandé à créer un compte, mais ton adresse mail est déjà associée à un compte existant. + Si tu as oublié ton mot de passe, tu peux utiliser le réinitialiser en cliquant sur le lien suivant : + https://myecl.fr/forgot_password
-Si tu n'es pas à l'origine de cette demande, c'est qu'un bon fyot a tenté de s'inscrire avec ton adresse mail... Bref tu peux ignorer ce message.
+Si tu n'es pas à l'origine de cette demande, c'est qu'un bon fyot a tenté de s'inscrire avec ton adresse mail... Bref tu peux ignorer ce message.
+{% endblock %} -- You've asked to create an account, but your e-mail address is already linked to an existing account. If you've forgotten your password, you can use the - forgotten password feature. +{% block english_message %} +
+ You've asked to create an account, but your e-mail address is already linked to an existing account. + If you have forgotten your password, you can reset it by clicking on the following link: + https://myecl.fr/forgot_password
-If you are not behind this request, it likely means that a fool has tried to sign up using your e-mail address... In short, you can ignore this message.
- -Éclairement,
-