Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tasks and Jobs backend #15

Merged
merged 4 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion invenio_jobs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,49 @@

"""Configuration."""

JOBS_ADMINISTRATION_DISABLED = False
from invenio_i18n import lazy_gettext as _

from .services.permissions import (
JobPermissionPolicy,
RunPermissionPolicy,
TasksPermissionPolicy,
)

JOBS_TASKS_PERMISSION_POLICY = TasksPermissionPolicy
"""Permission policy for tasks."""

JOBS_PERMISSION_POLICY = JobPermissionPolicy
"""Permission policy for jobs."""

JOBS_RUNS_PERMISSION_POLICY = RunPermissionPolicy
"""Permission policy for job runs."""

JOBS_ADMINISTRATION_DISABLED = False
"""Disable Jobs administration views if ``True``."""

JOBS_FACETS = {}
"""Facets/aggregations for Jobs results."""

JOBS_QUEUES = {
"celery": {
"name": "celery",
"title": _("Default"),
"description": _("Default queue"),
},
"low": {
"name": "low",
"title": _("Low"),
"description": _("Low priority queue"),
},
}
"""List of available Celery queues.

This doesn't create any of the queues, but just controls to which Celery queue a job
is pushed to. You still need to configure Celery workers to listen to these queues.
"""

JOBS_DEFAULT_QUEUE = None
"""Default Celery queue."""

JOBS_SORT_OPTIONS = {
"jobs": dict(
Expand Down
21 changes: 21 additions & 0 deletions invenio_jobs/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@

"""Jobs extension."""

from celery import current_app as current_celery_app
from flask import current_app
from invenio_i18n import gettext as _

from . import config
from .models import Task
from .resources import (
JobsResource,
JobsResourceConfig,
Expand Down Expand Up @@ -66,6 +69,24 @@ def init_resource(self, app):
TasksResourceConfig.build(app), self.tasks_service
)

@property
def queues(self):
"""Return the queues."""
return current_app.config["JOBS_QUEUES"]

@property
def default_queue(self):
"""Return the default queue."""
return (
current_app.config.get("JOBS_DEFAULT_QUEUE")
or current_celery_app.conf.task_default_queue
)

@property
def tasks(self):
"""Return the tasks."""
return Task.all()


def finalize_app(app):
"""Finalize app."""
Expand Down
53 changes: 49 additions & 4 deletions invenio_jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
"""Models."""

import enum
import uuid
from inspect import signature

from celery import current_app as current_celery_app
from invenio_accounts.models import User
from invenio_db import db
from sqlalchemy.dialects import postgresql
from sqlalchemy_utils import Timestamp
from sqlalchemy_utils.types import ChoiceType, JSONType, UUIDType
from werkzeug.utils import cached_property

JSON = (
db.JSON()
Expand All @@ -26,15 +30,15 @@
class Job(db.Model, Timestamp):
"""Job model."""

id = db.Column(UUIDType, primary_key=True)
id = db.Column(UUIDType, primary_key=True, default=uuid.uuid4)
active = db.Column(db.Boolean, default=True, nullable=False)
title = db.Column(db.String(255), nullable=False)
description = db.Column(db.Text)

celery_tasks = db.Column(db.String(255))
task = db.Column(db.String(255))
default_queue = db.Column(db.String(64))
default_args = db.Column(JSON, default=lambda: dict(), nullable=True)
schedule = db.Column(JSON, default=lambda: dict(), nullable=True)
schedule = db.Column(JSON, nullable=True)

# TODO: See if we move this to an API class
@property
Expand All @@ -57,7 +61,7 @@ class RunStatusEnum(enum.Enum):
class Run(db.Model, Timestamp):
"""Run model."""

id = db.Column(UUIDType, primary_key=True)
id = db.Column(UUIDType, primary_key=True, default=uuid.uuid4)

job_id = db.Column(UUIDType, db.ForeignKey(Job.id))
job = db.relationship(Job, backref=db.backref("runs", lazy="dynamic"))
Expand All @@ -79,3 +83,44 @@ class Run(db.Model, Timestamp):
task_id = db.Column(UUIDType, nullable=True)
args = db.Column(JSON, default=lambda: dict(), nullable=True)
queue = db.Column(db.String(64), nullable=True)


class Task:
"""Celery Task model."""

_all_tasks = None

def __init__(self, obj):
"""Initialize model."""
self._obj = obj

def __getattr__(self, name):
"""Proxy attribute access to the task object."""
# TODO: See if we want to limit what attributes are exposed
return getattr(self._obj, name)

@cached_property
def description(self):
"""Return description."""
if not self._obj.__doc__:
return ""
return self._obj.__doc__.split("\n")[0]

@cached_property
def parameters(self):
"""Return the task's parameters."""
# TODO: Make this result more user friendly or enhance with type information
return signature(self._obj).parameters

@classmethod
def all(cls):
"""Return all tasks."""
if getattr(cls, "_all_tasks", None) is None:
# Cache results
cls._all_tasks = {
k: cls(task)
for k, task in current_celery_app.tasks.items()
# Filter outer Celery internal tasks
if not k.startswith("celery.")
}
return cls._all_tasks
20 changes: 20 additions & 0 deletions invenio_jobs/proxies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2024 CERN.
#
# Invenio-Jobs is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Proxies."""

from flask import current_app
from werkzeug.local import LocalProxy

current_jobs = LocalProxy(lambda: current_app.extensions["invenio-jobs"])
"""Jobs extension."""

current_jobs_service = LocalProxy(lambda: current_jobs.service)
"""Jobs service."""

current_runs_service = LocalProxy(lambda: current_jobs.runs_service)
"""Runs service."""
38 changes: 35 additions & 3 deletions invenio_jobs/resources/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,21 @@
from invenio_records_resources.resources.records.args import SearchRequestArgsSchema
from invenio_records_resources.services.base.config import ConfiguratorMixin

from ..services.errors import JobNotFoundError

response_handlers = {
**ResourceConfig.response_handlers,
"application/vnd.inveniordm.v1+json": ResourceConfig.response_handlers[
"application/json"
],
}
request_body_parsers = {
**ResourceConfig.request_body_parsers,
"application/vnd.inveniordm.v1+json": ResourceConfig.request_body_parsers[
"application/json"
],
}


class TasksResourceConfig(ResourceConfig, ConfiguratorMixin):
"""Celery tasks resource config."""
Expand All @@ -23,6 +38,13 @@ class TasksResourceConfig(ResourceConfig, ConfiguratorMixin):
url_prefix = "/tasks"
routes = {"list": ""}

# Request handling
request_search_args = SearchRequestArgsSchema
request_body_parsers = request_body_parsers

# Response handling
response_handlers = response_handlers


class JobsSearchRequestArgsSchema(SearchRequestArgsSchema):
"""Jobs search request parameters."""
Expand All @@ -41,14 +63,20 @@ class JobsResourceConfig(ResourceConfig, ConfiguratorMixin):
"item": "/<job_id>",
}

# Request parsing
# Request handling
request_read_args = {}
request_view_args = {"job_id": ma.fields.UUID()}
request_search_args = JobsSearchRequestArgsSchema
request_body_parsers = request_body_parsers

# Response handling
response_handlers = response_handlers

error_handlers = {
**ErrorHandlersMixin.error_handlers,
# TODO: Add custom error handlers here
JobNotFoundError: create_error_handler(
lambda e: HTTPJSONException(code=404, description=e.description)
),
}


Expand All @@ -66,11 +94,15 @@ class RunsResourceConfig(ResourceConfig, ConfiguratorMixin):
"actions_stop": "/jobs/<job_id>/runs/<run_id>/actions/stop",
}

# Request parsing
# Request handling
request_view_args = {
"job_id": ma.fields.UUID(),
"run_id": ma.fields.UUID(),
}
request_body_parsers = request_body_parsers

# Response handling
response_handlers = response_handlers

error_handlers = {
**ErrorHandlersMixin.error_handlers,
Expand Down
Loading