Skip to content

Commit

Permalink
Fix migrations & initializations when using multiple workers (#336)
Browse files Browse the repository at this point in the history
### Description

This PR fixes migrations when working with multiple workers.

It allows to run migrations and initializations in a synchronous way
before lifespan. Thus, gunicorn can preload the app to run them before
creating workers.

### Checklist

- [ ] Created tests which fail without the change (if possible)
- [x] All tests passing
- [ ] Extended the documentation, if necessary

---------

Co-authored-by: Petitoto <[email protected]>
  • Loading branch information
Petitoto and Petitoto authored Feb 15, 2024
1 parent 5aed570 commit 4c926dd
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 75 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ curl --location --request POST 'http://127.0.0.1:8000/users/make-admin'

Install docker and the compose plugin (https://docs.docker.com/compose/install/)

> During dev docker is used to run the database, the redis server etc... If you really want to run the project without docker, you can do it but you will have to install the database, redis, etc ... yourself or disable some features in the .env file (which is not recommended).
`docker-compose.yaml` includes the minimal settings required to run Hyperion using docker compose. Docker images are based on [tiangolo/uvicorn-gunicorn-fastapi](https://github.com/tiangolo/uvicorn-gunicorn-fastapi-docker), and preloading the app is required to perform migrations and database initializations when running multiple workers.

> During dev, `docker-compose-dev.yaml` can be used to run the database, the redis server etc... If you really want to run the project without docker, you can do it but you will have to install the database, redis, etc ... yourself or disable corresponding features in the .env file (which is not recommended).
---

Expand Down
131 changes: 63 additions & 68 deletions app/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""File defining the Metadata. And the basic functions creating the database tables and calling the router"""

import logging
import os
import uuid
Expand All @@ -14,22 +15,17 @@
from fastapi.exceptions import RequestValidationError
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from sqlalchemy.engine import Connection
from sqlalchemy.engine import Connection, Engine
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy.orm import Session

from app import api
from app.core.config import Settings
from app.core.log import LogConfig
from app.cruds import cruds_core, cruds_groups
from app.database import Base
from app.dependencies import (
get_db_engine,
get_redis_client,
get_session_maker,
get_settings,
)
from app.dependencies import get_db_engine, get_redis_client, get_settings
from app.models import models_core
from app.utils import initialization
from app.utils.redis import limiter
from app.utils.types.groups_type import GroupType
from app.utils.types.module_list import ModuleList
Expand Down Expand Up @@ -102,7 +98,7 @@ def run_alembic_upgrade(connection: Connection) -> None:
alembic_command.upgrade(alembic_cfg, "head")


async def update_db_tables(engine: AsyncEngine, drop_db: bool = False):
def update_db_tables(engine: Engine, drop_db: bool = False) -> None:
"""
If the database is not initialized, create the tables and stamp the database with the latest revision.
Otherwise, run the alembic upgrade command to upgrade the database to the latest version (`head`).
Expand All @@ -113,18 +109,15 @@ async def update_db_tables(engine: AsyncEngine, drop_db: bool = False):
hyperion_error_logger = logging.getLogger("hyperion.error")

try:
async with engine.begin() as conn:
with engine.begin() as conn:
if drop_db:
# All tables should be dropped, including the alembic_version table
# or Hyperion will think that the database is up to date and will not initialize it
# when running tests a second time.
# To let SQLAlchemy drop the alembic_version table, we created a AlembicVersion model.
await conn.run_sync(Base.metadata.drop_all)
Base.metadata.drop_all(conn)

# run_sync is used to run a synchronous function in an asynchronous context
# the function `get_alembic_current_revision` will be called with "a synchronous-style Connection as the first argument"
# See https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#sqlalchemy.ext.asyncio.AsyncConnection.run_sync
alembic_current_revision = await conn.run_sync(get_alembic_current_revision)
alembic_current_revision = get_alembic_current_revision(conn)

if alembic_current_revision is None:
# We generate the database using SQLAlchemy
Expand All @@ -135,15 +128,15 @@ async def update_db_tables(engine: AsyncEngine, drop_db: bool = False):
)

# Create all tables
await conn.run_sync(Base.metadata.create_all)
Base.metadata.create_all(conn)
# We stamp the database with the latest revision so that
# alembic knows that the database is up to date
await conn.run_sync(stamp_alembic_head)
stamp_alembic_head(conn)
else:
hyperion_error_logger.info(
f"Startup: Database tables already created (current revision: {alembic_current_revision}), running migrations"
)
await conn.run_sync(run_alembic_upgrade)
run_alembic_upgrade(conn)

hyperion_error_logger.info("Startup: Database tables updated")
except Exception as error:
Expand All @@ -153,60 +146,65 @@ async def update_db_tables(engine: AsyncEngine, drop_db: bool = False):
raise


async def initialize_groups(SessionLocal, hyperion_error_logger):
def initialize_groups(engine: Engine) -> None:
"""Add the necessary groups for account types"""
async with SessionLocal() as db:

hyperion_error_logger = logging.getLogger("hyperion.error")

hyperion_error_logger.info("Startup: Adding new groups to the database")
with Session(engine) as db:
for id in GroupType:
exists = await cruds_groups.get_group_by_id(group_id=id, db=db)
exists = initialization.get_group_by_id_sync(group_id=id, db=db)
# We don't want to recreate the groups if they already exist
if not exists:
group = models_core.CoreGroup(
id=id, name=id.name, description="Group type"
)

try:
db.add(group)
await db.commit()
initialization.create_group_sync(group=group, db=db)
except IntegrityError as error:
hyperion_error_logger.fatal(
f"Startup: Could not add group {group.name}<{group.id}> in the database: {error}"
)
await db.rollback()


async def initialize_module_visibility(SessionLocal, hyperion_error_logger):
def initialize_module_visibility(engine: Engine) -> None:
"""Add the default module visibilities for Titan"""
async with SessionLocal() as db:

hyperion_error_logger = logging.getLogger("hyperion.error")

with Session(engine) as db:
# Is run to create default module visibilies or when the table is empty
haveBeenInitialized = (
len(await cruds_core.get_all_module_visibility_membership(db)) > 0
len(initialization.get_all_module_visibility_membership_sync(db)) > 0
)
if haveBeenInitialized:
hyperion_error_logger.info(
"Startup: Modules visibility settings have already been initialized"
)
return

hyperion_error_logger.info(
"Startup: Modules visibility settings are empty, initializing them"
)
for module in ModuleList:
for default_group_id in module.value.default_allowed_groups_ids:
module_visibility_exists = await cruds_core.get_module_visibility(
root=module.value.root, group_id=default_group_id, db=db
module_visibility = models_core.ModuleVisibility(
root=module.value.root, allowed_group_id=default_group_id.value
)

# We don't want to recreate the module visibility if they already exist
if not module_visibility_exists:
module_visibility = models_core.ModuleVisibility(
root=module.value.root, allowed_group_id=default_group_id.value
try:
initialization.create_module_visibility_sync(module_visibility, db)
except IntegrityError as error:
hyperion_error_logger.fatal(
f"Startup: Could not add module visibility {module.root}<{default_group_id}> in the database: {error}"
)
try:
db.add(module_visibility)
await db.commit()
except IntegrityError as error:
hyperion_error_logger.fatal(
f"Startup: Could not add module visibility {module.root}<{default_group_id}> in the database: {error}"
)
await db.rollback()


# We wrap the application in a function to be able to pass the settings and drop_db parameters
# The drop_db parameter is used to drop the database tables before creating them again
def get_application(settings: Settings, drop_db: bool = False) -> FastAPI:
# Initialize loggers
LogConfig().initialize_loggers(settings=settings)

hyperion_access_logger = logging.getLogger("hyperion.access")
Expand All @@ -223,34 +221,12 @@ def get_application(settings: Settings, drop_db: bool = False) -> FastAPI:
# Creating a lifespan which will be called when the application starts then shuts down
# https://fastapi.tiangolo.com/advanced/events/
@asynccontextmanager
async def startup(app: FastAPI):
# Initialize loggers

if (
app.dependency_overrides.get(get_redis_client, get_redis_client)(
settings=settings
)
is False
):
hyperion_error_logger.info("Redis client not configured")

# Update database tables
engine = app.dependency_overrides.get(get_db_engine, get_db_engine)(
settings=settings
)
await update_db_tables(engine, drop_db)

# Initialize database tables
SessionLocal = app.dependency_overrides.get(
get_session_maker, get_session_maker
)()
await initialize_groups(SessionLocal, hyperion_error_logger)
await initialize_module_visibility(SessionLocal, hyperion_error_logger)

async def lifespan(app: FastAPI):
yield
hyperion_error_logger.info("Shutting down")

app = FastAPI(lifespan=startup)
# Initialize app
app = FastAPI(lifespan=lifespan)
app.include_router(api.api_router)

app.add_middleware(
Expand All @@ -261,6 +237,25 @@ async def startup(app: FastAPI):
allow_headers=["*"],
)

# Initialize database connection
app.dependency_overrides.get(get_db_engine, get_db_engine)(
settings=settings
) # Initialize the async engine
sync_engine = initialization.get_sync_db_engine(settings=settings)

# Update database tables
update_db_tables(sync_engine, drop_db)

# Initialize database tables
initialize_groups(sync_engine)
initialize_module_visibility(sync_engine)

# Initialize Redis
if not app.dependency_overrides.get(get_redis_client, get_redis_client)(
settings=settings
):
hyperion_error_logger.info("Redis client not configured")

@app.middleware("http")
async def logging_middleware(
request: Request,
Expand Down
5 changes: 3 additions & 2 deletions app/cruds/cruds_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
async def get_all_module_visibility_membership(
db: AsyncSession,
):
"""Return the every module with their visibility"""
result = await db.execute(select(models_core.ModuleVisibility))
return result.unique().scalars().all()

Expand All @@ -18,7 +19,7 @@ async def get_modules_by_user(
user: models_core.CoreUser,
db: AsyncSession,
) -> Sequence[str]:
"""Return the every module with their visibility"""
"""Return the modules a user has access to"""

userGroupIds = list(map(lambda group: group.id, user.groups))

Expand All @@ -35,7 +36,7 @@ async def get_allowed_groups_by_root(
root: str,
db: AsyncSession,
) -> Sequence[str]:
"""Return the every module with their visibility"""
"""Return the groups allowed to access to a specific root"""

result = await db.execute(
select(
Expand Down
6 changes: 4 additions & 2 deletions app/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ async def get_request_id(request: Request) -> str:


def get_db_engine(settings: Settings) -> AsyncEngine:
"""Return the database engine, if the engine doesn't exit yet it will create one based on the settings"""
"""
Return the (asynchronous) database engine, if the engine doesn't exit yet it will create one based on the settings
"""
global engine
global SessionLocal
if settings.SQLITE_DB:
SQLALCHEMY_DATABASE_URL = f"sqlite+aiosqlite:///./{settings.SQLITE_DB}" # Connect to the test's database
SQLALCHEMY_DATABASE_URL = f"sqlite+aiosqlite:///./{settings.SQLITE_DB}"
else:
SQLALCHEMY_DATABASE_URL = f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_HOST}/{settings.POSTGRES_DB}"

Expand Down
77 changes: 77 additions & 0 deletions app/utils/initialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from sqlalchemy import select
from sqlalchemy.engine import Engine, create_engine
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, selectinload

from app.core.config import Settings
from app.models import models_core

# These utils are used at startup to run database initializations & migrations


def get_sync_db_engine(settings: Settings) -> Engine:
"""
Create a synchronous database engine
"""
if settings.SQLITE_DB:
SQLALCHEMY_DATABASE_URL = f"sqlite:///./{settings.SQLITE_DB}"
else:
SQLALCHEMY_DATABASE_URL = f"postgresql://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_HOST}/{settings.POSTGRES_DB}"

engine = create_engine(SQLALCHEMY_DATABASE_URL, echo=settings.DATABASE_DEBUG)
return engine


def get_all_module_visibility_membership_sync(
db: Session,
):
"""
Return the every module with their visibility
"""
result = db.execute(select(models_core.ModuleVisibility))
return result.unique().scalars().all()


def create_module_visibility_sync(
module_visibility: models_core.ModuleVisibility,
db: Session,
) -> models_core.ModuleVisibility:
"""
Create a new module visibility in database and return it
"""
db.add(module_visibility)
try:
db.commit()
return module_visibility
except IntegrityError as error:
db.rollback()
raise ValueError(error)


def get_group_by_id_sync(group_id: str, db: Session) -> models_core.CoreGroup | None:
"""
Return group with id from database
"""
result = db.execute(
select(models_core.CoreGroup)
.where(models_core.CoreGroup.id == group_id)
.options(
selectinload(models_core.CoreGroup.members)
) # needed to load the members from the relationship
)
return result.scalars().first()


def create_group_sync(
group: models_core.CoreGroup, db: Session
) -> models_core.CoreGroup:
"""
Create a new group in database and return it
"""
db.add(group)
try:
db.commit()
return group
except IntegrityError:
db.rollback()
raise
3 changes: 2 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ services:
condition: service_healthy
hyperion-redis:
condition: service_started
environment:
- GUNICORN_CMD_ARGS="--preload"
ports:
- 8000:80
volumes:
- ./logs:/app/logs:Z
- ./data:/app/data:Z
- ./.env:/app/.env:Z
- ./firebase.json:/app/firebase.json:Z
- ./migrations/versions:/app/migrations/versions:Z
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ unidecode==1.3.7
uvicorn[standard]==0.23.2
redis==5.0.1
icalendar == 5.0.11
asyncpg==0.28.0
psycopg2==2.9.9 # PostgreSQL adapter for synchronous operations at startup (database initializations & migrations)
asyncpg==0.28.0 # PostgreSQL adapter for asynchronous operations
firebase-admin==6.2.0 # Firebase is used for push notification

0 comments on commit 4c926dd

Please sign in to comment.