Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scheduler to be able to schedule worker jobs to certain times #3570

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,7 @@ class Settings(BaseSettings):

backend_node_url: str | None = None

is_worker: bool = False


settings = Settings()
16 changes: 16 additions & 0 deletions backend/app/cron.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from apscheduler.triggers.cron import CronTrigger

JOBS = []


def cron(crontab):
"""Wrap a Dramatiq actor in a cron schedule."""
trigger = CronTrigger.from_crontab(crontab)

def decorator(actor):
module_path = actor.fn.__module__
func_name = actor.fn.__name__
JOBS.append((trigger, module_path, func_name))
return actor

return decorator
1 change: 0 additions & 1 deletion backend/app/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def register_to_app(app: FastAPI):
async def update():
worker.update.send()
worker.update_quality_moderation.send()
worker.update_app_picks.send()


@router.post("/stats", tags=["update"])
Expand Down
15 changes: 15 additions & 0 deletions backend/app/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import requests
import sentry_dramatiq
import sentry_sdk
from apscheduler.schedulers.blocking import BlockingScheduler
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker

from . import (
apps,
config,
cron,
db,
exceptions,
logins,
Expand Down Expand Up @@ -325,6 +327,7 @@ def refresh_github_repo_list(gh_access_token: str, accountId: int):
sqldb.session.commit()


@cron.cron("0 3 * * *") # every day at 3am
@dramatiq.actor
def update_app_picks():
with WorkerDB() as sqldb:
Expand Down Expand Up @@ -387,3 +390,15 @@ def pick_app_of_the_day_automatically(sqldb, day):

if len(oldest_apps) > 0:
models.AppOfTheDay.set_app_of_the_day(sqldb, oldest_apps[0], day)


if settings.is_worker:
scheduler = BlockingScheduler()
for trigger, module_path, func_name in cron.JOBS:
job_path = f"{module_path}:{func_name}.send"
job_name = f"{module_path}.{func_name}"
scheduler.add_job(job_path, trigger=trigger, name=job_name)
try:
scheduler.start()
except KeyboardInterrupt:
scheduler.shutdown()
1 change: 1 addition & 0 deletions backend/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ services:
- SMTP_PASSWORD=$SMTP_PASSWORD
- EMAIL_FROM=$EMAIL_FROM
- BACKEND_NODE_URL=${BACKEND_NODE_URL-http://backend-node:8001}
- IS_WORKER=true
depends_on:
- redis
- db
Expand Down
323 changes: 195 additions & 128 deletions backend/poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ PyJWT = "^2.9.0"
meilisearch = "^0.31.5"
orjson = "^3.10.7"
python-multipart = "^0.0.9"
dramatiq = {extras = ["redis", "watch"], version = "^1.17.0"}
dramatiq = { extras = ["redis", "watch"], version = "^1.17.0" }
sqlalchemy = "2.0.33"
sentry-dramatiq = "^0.3.3"
pydantic-settings = "^2.4.0"
psycopg2-binary = "^2.9.9"
publicsuffixlist = "^1.0.2.20240903"
apscheduler = "^3.10.4"

[tool.poetry.group.dev.dependencies]
httpx = "^0.27.2"
Expand Down
Loading