Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul session transactions, cleanup event dispatches, replace fastapi-events #50

Open
wants to merge 13 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/api/api_v1/endpoints/auth_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async def appstore_login(
appstore_service = AppstoreService(db, appstore_access_token, login_body.user_type)
user = await appstore_service.get_associated_eduhelx_user()
# If the user is authenticated in appstore with a corresponding onyen, we can create a token for them.
token = await UserService(db)._create_user_token(user)
token = await UserService(db).create_user_token(user)
return token

@router.put("/login/gitea/ssh", description="Set an SSH key for your Gitea user")
Expand Down
1 change: 1 addition & 0 deletions app/core/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
from .ldap import *
from .appstore import *
from .lms import *
from .database import *
5 changes: 5 additions & 0 deletions app/core/exceptions/assignment.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from .base import CustomException

class AssignmentNameTakenException(CustomException):
code = 400
error_code = "ASSIGNMENT__NAME_TAKEN"
message = "assignment cannot use name that is already in use"

class AssignmentDueBeforeOpenException(CustomException):
code = 400
error_code = "ASSIGNMENT__DUE_BEFORE_OPEN"
Expand Down
7 changes: 6 additions & 1 deletion app/core/exceptions/course.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@ class NoCourseExistsException(CustomException):
class CourseAlreadyExistsException(CustomException):
code = 409
error_code = "COURSE__ALREADY_EXISTS"
message = "course already exists, try modifying the existing course instead"
message = "course already exists, try modifying the existing course instead"

class CourseEndsBeforeStartDateException(CustomException):
code = 400
error_code = "COURSE__ENDS_BEFORE_START"
message = "course cannot end before it has started"
50 changes: 50 additions & 0 deletions app/core/exceptions/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from sqlalchemy.exc import (
SQLAlchemyError, IntegrityError, OperationalError, DataError
)
from .base import CustomException

# General exception for transactional errors
class DatabaseTransactionException(CustomException):
code = 500
error_code = "DATABASE__TRANSACTION_EXCEPTION"
message = "failed to flush database transaction to server"

def __init__(self, sqlalchemy_exc: SQLAlchemyError, message: str | None = None):
super().__init__(message)
self.sqlalchemy_exc = sqlalchemy_exc

@classmethod
def from_exception(cls, exc: SQLAlchemyError):
orig = getattr(exc, "orig", None)
message = str(orig) if orig is not None else None

if isinstance(exc, IntegrityError):
raise DatabaseIntegrityException(exc, message) from exc
elif isinstance(exc, OperationalError):
raise DatabaseOperationalException(exc, message) from exc
elif isinstance(exc, DataError):
raise DatabaseDataException(exc, message) from exc

raise cls(exc, message) from exc

@classmethod
def raise_exception(cls, exc: SQLAlchemyError):
raise cls.from_exception(exc)

# Relational integrity errors, e.g., unique violations, null violations, foreign key violations, etc.
class DatabaseIntegrityException(DatabaseTransactionException):
code = 400
error_code = "DATABASE__INTEGRITY_EXCEPTION"
message = "database integrity violation encountered while flushing transaction"

# E.g. internal database errors, connection issues, timeouts, etc.
class DatabaseOperationalException(DatabaseTransactionException):
code = 500
error_code = "DATABASE__OPERATIONAL_EXCEPTION"
message = "database operational error encountered while flushing transaction"

# E.g. string violates length restraint, type violations, division by 0, etc.
class DatabaseDataException(DatabaseTransactionException):
code = 400
error_code = "DATABASE__DATA_EXCEPTION"
message = "database data violation encountered while flushing transaction"
7 changes: 6 additions & 1 deletion app/core/exceptions/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,9 @@
class SubmissionNotFoundException(CustomException):
code = 404
error_code = "SUBMISSION__NOT_FOUND"
message = "submission not found"
message = "submission not found"

class SubmissionCommitNotFoundException(CustomException):
code = 404
error_code = "SUBMISSION__COMMIT_HASH_NOT_FOUND"
message = "commit sha could not found inside repostiory"
4 changes: 2 additions & 2 deletions app/events/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .emitter import *
from .schemas import *
from .handlers import *
from .decorators import *
from .dispatcher import *
from .decorators import *
6 changes: 3 additions & 3 deletions app/events/decorators.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from .dispatcher import dispatch
from .emitter import event_emitter

# For use with static events.
def dispatches(*args, **kwargs):
def emits(*args, **kwargs):
def inner(func):
result = func()
dispatch(*args, **kwargs)
event_emitter.emit_future(*args, **kwargs)
return result
return inner
10 changes: 0 additions & 10 deletions app/events/dispatcher.py

This file was deleted.

52 changes: 52 additions & 0 deletions app/events/emitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import asyncio
from typing import Callable
from pydantic import BaseModel
from pymitter import EventEmitter

"""
NOTE: Any exceptions encountered within handlers will bubble up to the emitter.
In general, emits need to be caught by the caller for cleanup/etc. Any handlers
who are ok with failing should not raise exceptions to the emitter.
"""

class PydanticEvent(BaseModel):
__event_name__: str

class Config:
arbitrary_types_allowed = True

class PydanticEventEmitter(EventEmitter):
def _validate_pydantic_event(self, event: PydanticEvent) -> tuple[str, dict]:
if not isinstance(event, PydanticEvent):
raise TypeError(f"unrecognized event { event }, only Pydantic events are supported")

# By converting the event to a dict, Pydantic performs validation on it.
event.dict()

name, payload = event.__event_name__, event
return name, payload

def on(self, event: str | BaseModel, *args, **kwargs) -> Callable:
# Support passing in an event model as an event name.
if isinstance(event, BaseModel):
event = event.__event_name__

return super().on(event, *args, **kwargs)

def emit(self, *args, **kwargs):
raise NotImplementedError("use `emit_future` instead if you need to dispatch from a sync block")

""" This method is safe to use within sync and async blocks to trigger async handlers. However,
you must keep in mind that since it's a future, the event is not necessary processed. """
def emit_future(self, event: BaseModel) -> asyncio.Task:
name, payload = self._validate_pydantic_event(event)
awaitables = self._emit(name, payload)

if awaitables:
return asyncio.ensure_future(asyncio.gather(*awaitables))

async def emit_async(self, event: BaseModel) -> None:
name, payload = self._validate_pydantic_event(event)
await super().emit_async(name, payload)

event_emitter = PydanticEventEmitter(wildcard=True, delimiter=":")
19 changes: 7 additions & 12 deletions app/events/handlers.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
from fastapi import Depends
from fastapi_events.handlers.local import local_handler
from fastapi_events.typing import Event
from sqlalchemy.orm import Session

from .schemas import SyncEvents
from app.database import SessionLocal
from app.models import AssignmentModel
from app.events import ModifyAssignmentCrudEvent
from app.events.schemas import AssignmentCrudEvent
from app.events.emitter import event_emitter
from app.core.dependencies import get_db_persistent

"""
NOTE: Use `get_db_persistent` instead of `get_db`. FastAPI-Events does not support generator-based DI.
You MUST call Session.close() once you are done with the database session.
NOTE: Keep in mind that exceptions raised in event handlers bubble up to the original emitter.
If it's okay for the handler to fail, then the handler should catch the error instead of raising it.
"""


@local_handler.register(event_name=ModifyAssignmentCrudEvent.__event_name__)
async def handle_sync_create_assignment(event: ModifyAssignmentCrudEvent):
@event_emitter.on("crud:assignment:*")
async def handle_master_repo_hook_update(event: AssignmentCrudEvent):
from app.services import GiteaService, StudentService, CourseService

event_name, payload = event
assignment = payload["assignment"]
assignment = event.assignment

with SessionLocal() as session:
course_service = CourseService(session)
Expand Down
3 changes: 1 addition & 2 deletions app/events/schemas/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
from .crud_events import *
from .sync_events import *
from .crud_events import *
50 changes: 16 additions & 34 deletions app/events/schemas/crud_events.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from enum import Enum
from pydantic import BaseModel
from fastapi_events.registry.payload_schema import registry
from app.events.emitter import PydanticEvent
from app.models import CourseModel, AssignmentModel, SubmissionModel, UserModel, StudentModel, InstructorModel
from app.models.user import UserType

Expand All @@ -26,33 +25,26 @@ class CrudEvents(Enum):
MODIFY_SUBMISSION = "crud:submission:modify"
DELETE_SUBMISSION = "crud:submission:delete"

class CrudEvent(BaseModel):
__event_name__: str
class CrudEvent(PydanticEvent):
modified_fields: list[str] | None = None

@property
def crud_type(self):
return self.__event_name__.split("_")[0]
return self.__event_name__.split(":")[2]

@property
def resource_type(self):
return self.__event_name__.split("_")[1]

class Config:
arbitrary_types_allowed = True
return self.__event_name__.split(":")[1]

class CourseCrudEvent(CrudEvent):
course: CourseModel

@registry.register
class CreateCourseCrudEvent(CourseCrudEvent):
__event_name__ = CrudEvents.CREATE_COURSE
@registry.register
__event_name__ = CrudEvents.CREATE_COURSE.value
class ModifyCourseCrudEvent(CourseCrudEvent):
__event_name__ = CrudEvents.MODIFY_COURSE
@registry.register
__event_name__ = CrudEvents.MODIFY_COURSE.value
class DeleteCourseCrudEvent(CourseCrudEvent):
__event_name__ = CrudEvents.DELETE_COURSE
__event_name__ = CrudEvents.DELETE_COURSE.value


class UserCrudEvent(CrudEvent):
Expand All @@ -68,41 +60,31 @@ def user_type(self):

raise NotImplementedError

@registry.register
class CreateUserCrudEvent(UserCrudEvent):
__event_name__ = CrudEvents.CREATE_USER
@registry.register
__event_name__ = CrudEvents.CREATE_USER.value
class ModifyUserCrudEvent(UserCrudEvent):
__event_name__ = CrudEvents.MODIFY_USER
@registry.register
__event_name__ = CrudEvents.MODIFY_USER.value
class DeleteUserCrudEvent(UserCrudEvent):
__event_name__ = CrudEvents.DELETE_USER
__event_name__ = CrudEvents.DELETE_USER.value


class AssignmentCrudEvent(CrudEvent):
assignment: AssignmentModel

@registry.register
class CreateAssignmentCrudEvent(AssignmentCrudEvent):
__event_name__ = CrudEvents.CREATE_ASSIGNMENT
@registry.register
__event_name__ = CrudEvents.CREATE_ASSIGNMENT.value
class ModifyAssignmentCrudEvent(AssignmentCrudEvent):
__event_name__ = CrudEvents.MODIFY_ASSIGNMENT
@registry.register
__event_name__ = CrudEvents.MODIFY_ASSIGNMENT.value
class DeleteAssignmentCrudEvent(AssignmentCrudEvent):
__event_name__ = CrudEvents.DELETE_ASSIGNMENT
__event_name__ = CrudEvents.DELETE_ASSIGNMENT.value


class SubmissionCrudEvent(CrudEvent):
submission: SubmissionModel

@registry.register
class CreateSubmissionCrudEvent(SubmissionCrudEvent):
__event_name__ = CrudEvents.CREATE_SUBMISSION
@registry.register
__event_name__ = CrudEvents.CREATE_SUBMISSION.value
class ModifySubmissionCrudEvent(SubmissionCrudEvent):
__event_name__ = CrudEvents.MODIFY_SUBMISSION
@registry.register
__event_name__ = CrudEvents.MODIFY_SUBMISSION.value
class DeleteSubmissionCrudEvent(SubmissionCrudEvent):
__event_name__ = CrudEvents.DELETE_SUBMISSION

__event_name__ = CrudEvents.DELETE_SUBMISSION.value
12 changes: 0 additions & 12 deletions app/events/schemas/schemas.py

This file was deleted.

12 changes: 0 additions & 12 deletions app/events/schemas/sync_events.py

This file was deleted.

6 changes: 0 additions & 6 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
from fastapi.responses import JSONResponse
from fastapi.middleware import Middleware
from fastapi_pagination import add_pagination
from fastapi_events.middleware import EventHandlerASGIMiddleware
from fastapi_events.handlers.local import local_handler
from starlette.middleware.cors import CORSMiddleware

from app.api.api_v1 import api_router
Expand Down Expand Up @@ -60,10 +58,6 @@ def make_middleware() -> List[Middleware]:
AuthenticationMiddleware,
backend=AuthBackend(),
on_error=on_auth_error
),
Middleware(
EventHandlerASGIMiddleware,
handlers=[local_handler]
)
]

Expand Down
3 changes: 2 additions & 1 deletion app/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
from .extra_time import *
from .course import *
from .jwt import *
from .settings import *
from .settings import *
from .gitea import *
Loading