Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade celery and use database backend for task results #3332

Merged
merged 6 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ dummyusers:
prodceleryworkers:
cd contentcuration/ && celery -A contentcuration worker -l info --concurrency=3 --task-events

prodcelerydashboard:
# connect to the celery dashboard by visiting http://localhost:5555
kubectl port-forward deployment/master-studio-celery-dashboard 5555

devserver:
yarn run devserver

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ make dcservicesdown

To develop on Kolibri Studio, you'll need:

* Python 3.6
* Python 3.6+

Managing Python installations can be quite tricky. We *highly* recommend using package managers like `Homebrew <http://brew.sh/>`__ on Mac or ``apt`` on Debian for this. Never modify your system's built-in version of Python

Expand Down
3 changes: 0 additions & 3 deletions contentcuration/contentcuration/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ class ContentConfig(AppConfig):
name = 'contentcuration'

def ready(self):
# see note in the celery_signals.py file for why we import here.
import contentcuration.utils.celery.signals # noqa

if settings.AWS_AUTO_CREATE_BUCKET and not is_gcs_backend():
from contentcuration.utils.minio_utils import ensure_storage_bucket_public
ensure_storage_bucket_public()
17 changes: 3 additions & 14 deletions contentcuration/contentcuration/celery.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,11 @@
from __future__ import absolute_import

import os

import django
from django.conf import settings

from contentcuration.utils.celery.app import CeleryApp

# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "contentcuration.settings")

app = CeleryApp('contentcuration')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings', namespace='CELERY')
django.setup()


@app.task(bind=True)
def debug_task(self):
print("Request: {0!r}".format(self.request))
app = CeleryApp("contentcuration")
app.config_from_object(settings.CELERY)
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
sizeCalculationTask(task) {
if (task && task.status === 'SUCCESS') {
this.loading = false;
this.size = task.metadata.result;
this.size = task.result;
}
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
...mapGetters('task', ['getAsyncTask', 'getPublishTaskForChannel']),
...mapGetters('currentChannel', ['currentChannel', 'canManage']),
isSyncing() {
return this.currentTask && this.currentTask.task_type === 'sync-channel';
return this.currentTask && this.currentTask.task_name === 'sync-channel';
},
isPublishing() {
// add condition so that publishing modal is only visible for users
Expand All @@ -64,7 +64,7 @@
return this.getAsyncTask(this.currentChannel[TASK_ID]) || null;
},
progressPercent() {
const progressPercent = get(this.currentTask, ['metadata', 'progress'], 0);
const progressPercent = get(this.currentTask, ['progress'], 0);
return this.$formatNumber(Math.round(progressPercent || 0) / 100, {
style: 'percent',
minimumFractionDigits: 0,
Expand All @@ -74,14 +74,14 @@
currentPublishTaskError() {
const publishTask = this.getPublishTaskForChannel(this.currentChannel.id);
return Boolean(
(publishTask && get(publishTask, ['metadata', 'error'], null)) ||
(publishTask && get(publishTask, ['traceback'], null)) ||
get(publishTask, 'status') === 'FAILURE'
);
},
syncError() {
return (
this.isSyncing &&
(get(this.currentTask, ['metadata', 'error'], null) ||
(get(this.currentTask, ['traceback'], null) ||
get(this.currentTask, 'status') === 'FAILURE')
);
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@
return this.progress >= 100 && !this.currentTaskError;
},
currentTaskError() {
return this.task ? get(this.task, ['metadata', 'error']) : null;
return this.task ? get(this.task, ['traceback']) : null;
},
progress() {
return this.task ? get(this.task, ['metadata', 'progress']) : 0;
return this.task ? get(this.task, ['progress']) : 0;
},
},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ const CHANNEL_ID = 'test';

const PUBLISH_TASK = {
task_id: 'task',
task_type: 'export-channel',
metadata: { progress: 0 },
task_name: 'export-channel',
progress: 0,
channel_id: CHANNEL_ID,
};
const SYNC_TASK = {
task_id: 'task',
task_type: 'sync-channel',
metadata: { progress: 0 },
task_name: 'sync-channel',
progress: 0,
channel_id: CHANNEL_ID,
};

Expand Down Expand Up @@ -111,7 +111,7 @@ describe('ProgressModal', () => {

beforeEach(() => {
const publishTask = cloneDeep(PUBLISH_TASK);
publishTask.metadata.progress = 100;
publishTask.progress = 100;
const store = storeFactory(storeConfig);
store.commit('task/ADD_ASYNC_TASK', publishTask);
wrapper = makeWrapper({ store });
Expand Down Expand Up @@ -198,7 +198,7 @@ describe('ProgressModal', () => {

beforeEach(() => {
const syncTask = cloneDeep(SYNC_TASK);
syncTask.metadata.progress = 100;
syncTask.progress = 100;
const store = storeFactory(storeConfig);
store.commit('task/ADD_ASYNC_TASK', syncTask);
wrapper = makeWrapper({ propsData, store });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export function publishChannel(context, version_notes) {
}

export function stopTask(context, task) {
if (task && task.task_type === 'export-channel') {
if (task && task.task_name === 'export-channel') {
return Channel.clearPublish(context.state.currentChannelId).then(() => {
return context.dispatch('task/deleteTask', task, { root: true });
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export default {
getPublishTaskForChannel(state) {
return function(channelId) {
return Object.values(state.asyncTasksMap).find(
t => t.channel_id.replace('-', '') === channelId && t.task_type === 'export-channel'
t => t.channel_id.replace('-', '') === channelId && t.task_name === 'export-channel'
);
};
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Generated by Django 3.2.5 on 2021-12-03 20:48
import django.core.validators
import django.db.models.deletion
from django.conf import settings
from django.db import migrations
from django.db import models


class Migration(migrations.Migration):

replaces = [('django_celery_results', '0138_change'),]

def __init__(self, name, app_label):
super(Migration, self).__init__(name, 'django_celery_results')

dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('contentcuration', '0138_change'),
('django_celery_results', '0001_initial'),
]

operations = [
migrations.AddField(
model_name='taskresult',
name='channel_id',
field=models.UUIDField(blank=True, db_index=True, null=True),
),
migrations.AddField(
model_name='taskresult',
name='progress',
field=models.IntegerField(blank=True, null=True, validators=[django.core.validators.MinValueValidator(0), django.core.validators.MaxValueValidator(100)]),
),
migrations.AddField(
model_name='taskresult',
name='user',
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='tasks', to=settings.AUTH_USER_MODEL),
),
]
15 changes: 15 additions & 0 deletions contentcuration/contentcuration/migrations/0140_delete_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Generated by Django 3.2.13 on 2022-07-26 18:18
from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
('contentcuration', '0139_django_celery_results'),
]

operations = [
migrations.DeleteModel(
name='Task',
),
]
62 changes: 44 additions & 18 deletions contentcuration/contentcuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from django.core.files.storage import default_storage
from django.core.files.storage import FileSystemStorage
from django.core.mail import send_mail
from django.core.validators import MaxValueValidator
from django.core.validators import MinValueValidator
from django.db import IntegrityError
from django.db import models
from django.db.models import Count
Expand All @@ -45,6 +47,7 @@
from django.dispatch import receiver
from django.utils import timezone
from django.utils.translation import gettext as _
from django_celery_results.models import TaskResult
from django_cte import With
from le_utils import proquint
from le_utils.constants import content_kinds
Expand Down Expand Up @@ -2236,7 +2239,9 @@ def clean(self, *args, **kwargs):
% (self.target_node, self.prerequisite))
# distant cyclic exception
# elif <this is a nice to have exception, may implement in the future when the priority raises.>
# raise Exception('Note: Prerequisite relationship is acyclic! %s and %s forms a closed loop!' % (self.target_node, self.prerequisite))
# raise Exception('Note: Prerequisite relationship is acyclic! %s and %s forms a closed loop!' % (
# self.target_node, self.prerequisite
# ))
super(PrerequisiteContentRelationship, self).clean(*args, **kwargs)

def save(self, *args, **kwargs):
Expand Down Expand Up @@ -2326,23 +2331,6 @@ def filter_view_queryset(cls, queryset, user):
).distinct()


class Task(models.Model):
"""Asynchronous tasks"""
task_id = UUIDField(db_index=True, default=uuid.uuid4) # This ID is used as the Celery task ID
task_type = models.CharField(max_length=50)
created = models.DateTimeField(default=timezone.now)
status = models.CharField(max_length=10)
is_progress_tracking = models.BooleanField(default=False)
user = models.ForeignKey(settings.AUTH_USER_MODEL, related_name="task", on_delete=models.CASCADE)
metadata = JSONField()
channel_id = DjangoUUIDField(db_index=True, null=True, blank=True)

@classmethod
def find_incomplete(cls, task_type, **filters):
filters.update(task_type=task_type, status__in=["QUEUED", states.PENDING, states.RECEIVED, states.STARTED])
return cls.objects.filter(**filters)


class Change(models.Model):
server_rev = models.BigAutoField(primary_key=True)
# We need to store the user who is applying this change
Expand Down Expand Up @@ -2423,3 +2411,41 @@ def serialize(cls, change):

def serialize_to_change_dict(self):
return self.serialize(self)


class TaskResultCustom(object):
"""
Custom fields to add to django_celery_results's TaskResult model
"""
# user shouldn't be null, but in order to append the field, this needs to be allowed
user = models.ForeignKey(settings.AUTH_USER_MODEL, related_name="tasks", on_delete=models.CASCADE, null=True)
channel_id = DjangoUUIDField(db_index=True, null=True, blank=True)
progress = models.IntegerField(null=True, blank=True, validators=[MinValueValidator(0), MaxValueValidator(100)])

super_as_dict = TaskResult.as_dict

def as_dict(self):
"""
:return: A dictionary representation
"""
super_dict = self.super_as_dict()
super_dict.update(
user_id=self.user_id,
channel_id=self.channel_id,
progress=self.progress,
)
return super_dict

@classmethod
def contribute_to_class(cls, model_class=TaskResult):
"""
Adds fields to model, by default TaskResult
:param model_class: TaskResult model
"""
for field in dir(cls):
if not field.startswith("_"):
model_class.add_to_class(field, getattr(cls, field))


# trigger class contributions immediately
TaskResultCustom.contribute_to_class()
5 changes: 0 additions & 5 deletions contentcuration/contentcuration/not_production_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@

from .settings import * # noqa

if RUNNING_TESTS: # noqa
# if we're running tests, run Celery tests synchronously so tests won't complete before the process
# is finished.
CELERY_TASK_ALWAYS_EAGER = True

ALLOWED_HOSTS = ["studio.local", "192.168.31.9", "127.0.0.1", "*"]

ACCOUNT_ACTIVATION_DAYS = 7
Expand Down
36 changes: 17 additions & 19 deletions contentcuration/contentcuration/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
'webpack_loader',
'django_filters',
'mathfilters',
'django_celery_results',
)

SESSION_ENGINE = "django.contrib.sessions.backends.cached_db"
Expand Down Expand Up @@ -342,26 +343,23 @@ def gettext(s):
]

# CELERY CONFIGURATIONS
CELERY_BROKER_URL = REDIS_URL
# with a redis broker, tasks will be re-sent if not completed within the duration of this timeout
CELERY_BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 4 * 3600}
CELERY_RESULT_BACKEND = REDIS_URL
CELERY_REDIS_DB = os.getenv("CELERY_REDIS_DB") or "0"
CELERY_BROKER_URL = "{url}{db}".format(
url=REDIS_URL,
db=CELERY_REDIS_DB
)
CELERY_RESULT_BACKEND = CELERY_BROKER_URL
CELERY_TIMEZONE = os.getenv("CELERY_TIMEZONE") or 'Africa/Nairobi'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# If this is True, Celery tasks are run synchronously. This is set to True in the unit tests,
# as it is not possible to correctly test Celery tasks asynchronously currently.
CELERY_TASK_ALWAYS_EAGER = False
# We hook into task events to update the Task DB records with the updated state.
# See celerysignals.py for more info.
CELERY_WORKER_SEND_TASK_EVENTS = True
CELERY = {
"broker_url": "{url}{db}".format(
url=REDIS_URL,
rtibbles marked this conversation as resolved.
Show resolved Hide resolved
db=CELERY_REDIS_DB
),
# with a redis broker, tasks will be re-sent if not completed within the duration of this timeout
"broker_transport_options": {"visibility_timeout": 4 * 3600},
"redis_db": CELERY_REDIS_DB,
"result_backend": "django-db",
"redis_backend_health_check_interval": 600,
"timezone": os.getenv("CELERY_TIMEZONE") or 'Africa/Nairobi',
"accept_content": ['application/json'],
"task_serializer": "json",
"result_serializer": "json",
"worker_send_task_events": True,
}

# When cleaning up orphan nodes, only clean up any that have been last modified
# since this date
Expand Down
Loading