Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job queue #83

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
10 changes: 10 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ DISABLE_AUTHENTICATION=false
IMPERSONATE_USER=""


#############
# Job Queue #
#############
BROKER_HOST="localhost"
BROKER_PORT="6379"
BROKER_USER="default"
BROKER_PASSWORD="default"

###########
## Gitea ##
###########
Expand All @@ -27,6 +35,8 @@ GITEA_ASSIST_AUTH_TOKEN="YOUR_BEARER_TOKEN"
CANVAS_API_KEY="YOUR_API_KEY"
CANVAS_API_URL="https://uncch.instructure.com/api/v1"
CANVAS_COURSE_ID="12345"
CANVAS_COURSE_START_DATE="1970-01-01T00:00:00Z"
Hoid marked this conversation as resolved.
Show resolved Hide resolved
CANVAS_COURSE_END_DATE="9999-01-01T00:00:00Z"

########################
## Authentication/JWT ##
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ A microservice supporting student submissions to otter grader within EduHeLx

### Installation
You'll need to have installed libpq (Postgres client) first before installing psycopg2 (Postgres driver).
You'll also need to have a message broker (e.g., Redis) installed and running.
```bash
# Setup virtual environment
python3 -m venv venv
Expand All @@ -23,11 +24,16 @@ vim .env
### Running
```bash
set -a && source .env
python start.py -r
# or
python -m app.main
# or
uvicorn --reload app.main:app --log-level=info
```

It is assumed that you've configured your .env properly. This includes the expectation that a Postgres
instance, Gitea Assist, message broker, and celery worker(s) are all configured and running.
frostyfan109 marked this conversation as resolved.
Show resolved Hide resolved

### Documentation
The OpenAPI UI is accessible under /docs. To login, first use the login endpoint to get access/refresh tokens.
Then, under Authorize (lock icon), set the value to `Bearer {access_token}`.
Expand Down
3 changes: 2 additions & 1 deletion app/api/api_v1/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from .endpoints import (
submission_router, assignment_router, user_router,
student_router, instructor_router, course_router,
settings_router, auth_router, lms_router
settings_router, auth_router, lms_router, job_router
)

api_router = APIRouter()
Expand All @@ -15,3 +15,4 @@
api_router.include_router(settings_router.router, tags=["settings"])
api_router.include_router(auth_router.router, tags=["auth"])
api_router.include_router(lms_router.router, tags=["lms"])
api_router.include_router(job_router.router, tags=["jobs"])
23 changes: 23 additions & 0 deletions app/api/api_v1/endpoints/job_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import List
from pydantic import BaseModel
from uuid import UUID
from fastapi import APIRouter, Request, Depends, UploadFile, File
from sqlalchemy.orm import Session
from app.celery import get_task_status_by_id
from app.celery.tasks import downsync_task
from app.services import LmsSyncService, AssignmentService
from app.schemas import JobSchema
from app.core.dependencies import (
get_db, PermissionDependency,
UserIsInstructorPermission
)

router = APIRouter()

@router.get("/jobs/{id}", response_model=JobSchema)
async def get_job(
*,
id: UUID,
perm: None = Depends(PermissionDependency(UserIsInstructorPermission))
):
return JobSchema.from_async_result(get_task_status_by_id(str(id)))
frostyfan109 marked this conversation as resolved.
Show resolved Hide resolved
8 changes: 6 additions & 2 deletions app/api/api_v1/endpoints/lms_router.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from typing import List
from pydantic import BaseModel
from uuid import UUID
from fastapi import APIRouter, Request, Depends, UploadFile, File
from sqlalchemy.orm import Session
from app.celery.tasks import downsync_task
from app.services import LmsSyncService, AssignmentService
from app.schemas import JobSchema
from app.core.dependencies import (
get_db, PermissionDependency,
UserIsInstructorPermission
Expand All @@ -17,13 +20,14 @@ class GradeUpload(BaseModel):
class UploadGradesBody(BaseModel):
grades: List[GradeUpload]

@router.post("/lms/downsync")
@router.post("/lms/downsync", response_model=UUID)
async def downsync(
*,
db: Session = Depends(get_db),
perm: None = Depends(PermissionDependency(UserIsInstructorPermission))
):
await LmsSyncService(db).downsync()
task = downsync_task.delay()
frostyfan109 marked this conversation as resolved.
Show resolved Hide resolved
return task.id
frostyfan109 marked this conversation as resolved.
Show resolved Hide resolved

@router.post("/lms/downsync/students")
async def downsync_students(
Expand Down
1 change: 1 addition & 0 deletions app/celery/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .worker import *
2 changes: 2 additions & 0 deletions app/celery/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .lms_tasks import *
from .misc_tasks import *
14 changes: 14 additions & 0 deletions app/celery/tasks/lms_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import asyncio
from datetime import timedelta
from celery_singleton import Singleton
from app.celery import celery_app
from app.database import SessionLocal
from app.services import LmsSyncService

# NOTE: Singleton defines a "singleton" task which can only enter the queue once at a time,
# so we can't end up with 2 downsync tasks simultaneously.
@celery_app.task(name="downsync", base=Singleton)
def downsync_task():
with SessionLocal() as session:
lms_sync_service = LmsSyncService(session)
return asyncio.run(lms_sync_service.downsync())
4 changes: 4 additions & 0 deletions app/celery/tasks/misc_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import asyncio
from datetime import timedelta
from app.celery import celery_app
from app.database import SessionLocal
60 changes: 60 additions & 0 deletions app/celery/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import json
from redis import Redis
from celery import Celery
from celery.result import AsyncResult
from app.core.config import settings

redis_client = Redis.from_url(settings.CELERY_BROKER_URI)
celery_app = Celery(
__name__,
broker=settings.CELERY_BROKER_URI,
backend=settings.CELERY_RESULT_BACKEND,
result_extended=True
)
celery_app.conf.update(
imports=["app.celery.tasks"],
broker_connection_retry_on_startup=True,
task_track_started=True,
worker_enable_remote_control=True,
beat_schedule={
"periodic-downsync": {
"task": "downsync",
"schedule": 60
}
}
)

def get_tasks_by_name(
frostyfan109 marked this conversation as resolved.
Show resolved Hide resolved
task_name: str,
# Currently running tasks
include_active=True,
# Reserved by a worker to run, but not running yet
include_reserved=True,
# Scheduled to run by a worker at some point in the future, but not running yet
include_scheduled=True,
# Entered into the broker queue but not yet picked up by a worker
include_pending=True
) -> list[str]:
inspect = celery_app.control.inspect()
tasks = {}
if include_active: tasks += inspect.active()
if include_reserved: tasks += inspect.reserved()
if include_scheduled: tasks += inspect.scheduled()

task_ids = [task["id"] for task in tasks.values() if task["name"] == task_name]

# Load pending tasks (still in queue)
if include_pending:
for item in redis_client.lrange("celery", 0, -1):
task_data = json.loads(item)
if task_data.get("headers", {}).get("task") == task_name:
task_id = task_data["headers"].get("id")
if task_id is not None: task_ids.append(task_id)

return task_ids

def get_task_status_by_id(task_id: str) -> AsyncResult:
frostyfan109 marked this conversation as resolved.
Show resolved Hide resolved
return celery_app.AsyncResult(task_id)

def cancel_task_by_id(task_id: str):
celery_app.control.revoke(task_id, terminate=True, signal="SIGKILL")
ptlharit2 marked this conversation as resolved.
Show resolved Hide resolved
27 changes: 27 additions & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ class Settings(BaseSettings):
# Setup wizard (JSON-serialized string)
SETUP_WIZARD_DATA: Optional[SetupWizardData] = None

# Job queue
BROKER_HOST: str
BROKER_PORT: int
BROKER_USER: str
BROKER_PASSWORD: str

CELERY_BROKER_URI: Optional[str] = None # computed
CELERY_RESULT_BACKEND: Optional[str] = None # computed

# Gitea
GITEA_SSH_URL: str
GITEA_ASSIST_API_URL: str
Expand Down Expand Up @@ -91,6 +100,24 @@ def compute_instructor_appstore_api_url(cls, v: Optional[str], values: Dict[str,
if isinstance(v, str): return v
return values.get("INSTRUCTOR_APPSTORE_HOST") + "/api/v1"

@validator("CELERY_BROKER_URI", pre=True)
def assemble_broker_uri(cls, v: Optional[str], values: Dict[str, Any]) -> Any:
frostyfan109 marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(v, str): return v
user = values.get("BROKER_USER")
pw = values.get("BROKER_PASSWORD")
host = values.get("BROKER_HOST")
port = values.get("BROKER_PORT")
return f"redis://{ user }:{ pw }@{ host }:{ port }/0"

@validator("CELERY_RESULT_BACKEND", pre=True)
def assemble_result_backend(cls, v: Optional[str], values: Dict[str, Any]) -> Any:
if isinstance(v, str): return v
user = values.get("BROKER_USER")
pw = values.get("BROKER_PASSWORD")
host = values.get("BROKER_HOST")
port = values.get("BROKER_PORT")
return f"redis://{ user }:{ pw }@{ host }:{ port }/1"

@validator("SQLALCHEMY_DATABASE_URI", pre=True)
def assemble_db_connection(cls, v: Optional[str], values: Dict[str, Any]) -> Any:
if isinstance(v, str): return v
Expand Down
3 changes: 2 additions & 1 deletion app/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
from .jwt import *
from .commit import *
from .settings import *
from .grade_report import *
from .grade_report import *
from .job import *
45 changes: 45 additions & 0 deletions app/schemas/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from __future__ import annotations
from typing import Any
from pydantic import BaseModel
from enum import Enum
from uuid import UUID
from datetime import datetime
from dateutil import parser as dateparser
from celery.result import AsyncResult

class JobStatus(str, Enum):
PENDING = "pending"
STARTED = "started"
RETRY = "retry"
FAILURE = "failure"
SUCCESS = "success"

class JobSchema(BaseModel):
id: UUID
status: JobStatus
result: Any
ready: bool
successful: bool
failed: bool
# Task metadata, not necessarily defined (e.g., when in reserved state)
name: str | None
queue: str | None
retries: int | None
traceback: str | None
finished_date: datetime | None

@classmethod
def from_async_result(cls, result: AsyncResult) -> JobSchema:
return JobSchema(
id=result.task_id,
name=result.name,
status=JobStatus[result.status],
result=result.result,
ready=result.ready(),
successful=result.successful(),
failed=result.failed(),
queue=result.queue,
retries=result.retries,
traceback=result.traceback,
finished_date=result.date_done
)
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ fastapi-events==0.11.1
jupyter-server
ipykernel
nbconvert==7.8.0
celery[redis]==5.4.0
celery-singleton==0.3.1
flower==2.0.1
git+https://github.com/helxplatform/eduhelx-utils.git
9 changes: 0 additions & 9 deletions start.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,6 @@ def main(host: str, port: int, reload: bool, workers: int | None=None):
alembic_cfg = Config("alembic.ini")
command.upgrade(alembic_cfg, "head")


# Run setup wizard, if required.
try:
frostyfan109 marked this conversation as resolved.
Show resolved Hide resolved
with SessionLocal() as session:
lms_sync_service = LmsSyncService(session)
asyncio.run(lms_sync_service.downsync())
except ValueError as e:
print(str(e))

# Start the application
uvicorn.run("app.main:app", host=host, port=port, reload=reload, workers=workers)

Expand Down