Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge "hotfixes" to "unstable" with tsvector command incompatibility fix #3936

Merged
merged 22 commits into from
Jan 30, 2023
Merged
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7e3a110
Merge pull request #3864 from learningequality/hotfixes
bjester Dec 7, 2022
54f06ad
Add task signature and locking when processing fetch_or_enqueue
bjester Dec 9, 2022
00ca70f
Add status code to sentry error messages
bjester Dec 16, 2022
646bcb0
Report disallowed changes to sentry
bjester Dec 16, 2022
bfe9481
Chunk request params when loading clipboard nodes
bjester Dec 16, 2022
9e2a392
Merge pull request #3888 from bjester/sentry-insights-and-chunking
rtibbles Dec 16, 2022
f3c9a01
Merge branch 'master' into hotfixes
bjester Dec 16, 2022
e150a15
Merge pull request #3889 from learningequality/hotfixes
bjester Dec 16, 2022
0c20063
Optimized tsvectors insertion 🚀
vkWeb Dec 20, 2022
ec99d47
Don't create tsvectors for incomplete and unpublished nodes
vkWeb Dec 27, 2022
15971bb
Merge pull request #3892 from vkWeb/optimize-tsvectors
bjester Jan 5, 2023
154cb17
Correct integer bounds
bjester Jan 6, 2023
4f282bf
Reset elector on duplicate, and capture errors in Sentry
bjester Jan 13, 2023
a918204
Merge pull request #3907 from bjester/leader-election-fix
rtibbles Jan 13, 2023
dc85747
Use OrderedDict for task_kwargs and update docstrings
bjester Jan 17, 2023
b77bea1
Merge pull request #3875 from bjester/task-signature
bjester Jan 17, 2023
a1ba192
Merge branch 'master' into hotfixes
bjester Jan 17, 2023
90e41a9
Add line that prevents Django reapplying the migration
bjester Jan 18, 2023
395afee
Update translation in JSON file
bjester Jan 18, 2023
a32a357
Merge pull request #3911 from bjester/fix-task-signature-migration
bjester Jan 18, 2023
3d89859
Merge pull request #3913 from bjester/reflexionar-fix
bjester Jan 18, 2023
b8efa5e
Merge branch 'hotfixes' into hot-to-un
vkWeb Jan 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add task signature and locking when processing fetch_or_enqueue
bjester committed Dec 9, 2022
commit 54f06ad50fe2ac87241de22706591db229ae5cea
5 changes: 5 additions & 0 deletions contentcuration/contentcuration/constants/locking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""
Constants for locking behaviors, like advisory locking in Postgres, and mutexes
"""
TREE_LOCK = 1001
TASK_LOCK = 1002
32 changes: 29 additions & 3 deletions contentcuration/contentcuration/db/advisory_lock.py
Original file line number Diff line number Diff line change
@@ -6,11 +6,36 @@

logging = logger.getLogger(__name__)

# signed limits are 2**32 or 2**64, so one less power of 2
# to become unsigned limits (half above 0, half below 0)
INT_32BIT = 2**31
INT_64BIT = 2**63


class AdvisoryLockBusy(RuntimeError):
pass


def _prepare_keys(keys):
"""
Ensures that integers do not exceed postgres constraints:
- signed 64bit allowed with single key
- signed 32bit allowed with two keys
:param keys: A list of unsigned integers
:return: A list of signed integers
"""
limit = INT_64BIT if len(keys) == 1 else INT_32BIT
new_keys = []
for key in keys:
# if key is over the limit, convert to negative int
if key >= limit:
key = limit - key
if abs(key) >= limit:
raise OverflowError(f"Advisory lock key '{key}' is too large")
new_keys.append(key)
return new_keys


@contextmanager
def execute_lock(key1, key2=None, unlock=False, session=False, shared=False, wait=True):
"""
@@ -32,6 +57,7 @@ def execute_lock(key1, key2=None, unlock=False, session=False, shared=False, wai
keys = [key1]
if key2 is not None:
keys.append(key2)
keys = _prepare_keys(keys)

query = "SELECT pg{_try}_advisory_{xact_}{lock}{_shared}({keys}) AS lock;".format(
_try="" if wait else "_try",
@@ -41,11 +67,11 @@ def execute_lock(key1, key2=None, unlock=False, session=False, shared=False, wai
keys=", ".join(["%s" for i in range(0, 2 if key2 is not None else 1)])
)

log_query = "'{}' with params {}".format(query, keys)
logging.debug("Acquiring advisory lock: {}".format(query, log_query))
log_query = f"'{query}' with params {keys}"
logging.debug(f"Acquiring advisory lock: {log_query}")
with connection.cursor() as c:
c.execute(query, keys)
logging.debug("Acquired advisory lock: {}".format(query, log_query))
logging.debug(f"Acquired advisory lock: {log_query}")
yield c


2 changes: 1 addition & 1 deletion contentcuration/contentcuration/db/models/manager.py
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
from mptt.managers import TreeManager
from mptt.signals import node_moved

from contentcuration.constants.locking import TREE_LOCK
from contentcuration.db.advisory_lock import advisory_lock
from contentcuration.db.models.query import CustomTreeQuerySet
from contentcuration.utils.cache import ResourceSizeCache
@@ -32,7 +33,6 @@
# The exact optimum batch size is probably highly dependent on tree
# topology also, so these rudimentary tests are likely insufficient
BATCH_SIZE = 100
TREE_LOCK = 1001


class CustomManager(Manager.from_queryset(CTEQuerySet)):
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Generated by Django 3.2.14 on 2022-12-09 16:09
from django.db import migrations
from django.db import models


class Migration(migrations.Migration):

def __init__(self, name, app_label):
super(Migration, self).__init__(name, 'django_celery_results')

dependencies = [
('contentcuration', '0140_delete_task'),
('django_celery_results', '0011_taskresult_periodic_task_name'),
]

operations = [
migrations.AddField(
model_name='taskresult',
name='signature',
field=models.CharField(max_length=32, null=True),
),
migrations.AddIndex(
model_name='taskresult',
index=models.Index(condition=models.Q(('status__in', frozenset(['STARTED', 'REJECTED', 'RETRY', 'RECEIVED', 'PENDING']))), fields=['signature'], name='task_result_signature_idx'),
),
]
40 changes: 39 additions & 1 deletion contentcuration/contentcuration/models.py
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@
from datetime import datetime

import pytz
from celery import states as celery_states
from django.conf import settings
from django.contrib.auth.base_user import AbstractBaseUser
from django.contrib.auth.base_user import BaseUserManager
@@ -74,6 +75,7 @@
from contentcuration.db.models.manager import CustomManager
from contentcuration.statistics import record_channel_stats
from contentcuration.utils.cache import delete_public_channel_cache_keys
from contentcuration.utils.celery.tasks import generate_task_signature
from contentcuration.utils.parser import load_json_string
from contentcuration.viewsets.sync.constants import ALL_CHANGES
from contentcuration.viewsets.sync.constants import ALL_TABLES
@@ -2436,13 +2438,20 @@ def serialize_to_change_dict(self):
class TaskResultCustom(object):
"""
Custom fields to add to django_celery_results's TaskResult model

If adding fields to this class, run `makemigrations` then move the generated migration from the
`django_celery_results` app to the `contentcuration` app and override the constructor to change
the app_label. See `0141_add_task_signature` for an example
"""
# user shouldn't be null, but in order to append the field, this needs to be allowed
user = models.ForeignKey(settings.AUTH_USER_MODEL, related_name="tasks", on_delete=models.CASCADE, null=True)
channel_id = DjangoUUIDField(db_index=True, null=True, blank=True)
progress = models.IntegerField(null=True, blank=True, validators=[MinValueValidator(0), MaxValueValidator(100)])
# a hash of the task name and kwargs for identifying repeat tasks
signature = models.CharField(null=True, blank=False, max_length=32)

super_as_dict = TaskResult.as_dict
super_save = TaskResult.save

def as_dict(self):
"""
@@ -2456,16 +2465,45 @@ def as_dict(self):
)
return super_dict

def set_signature(self):
"""
Generates and sets the signature for the task if it isn't set
"""
if self.signature is not None:
# nothing to do
return
self.signature = generate_task_signature(self.task_name, task_kwargs=self.task_kwargs, channel_id=self.channel_id)

def save(self, *args, **kwargs):
"""
Override save to ensure signature is generated
"""
self.set_signature()
return self.super_save(*args, **kwargs)

@classmethod
def contribute_to_class(cls, model_class=TaskResult):
"""
Adds fields to model, by default TaskResult
:param model_class: TaskResult model
"""
for field in dir(cls):
if not field.startswith("_"):
if not field.startswith("_") and field not in ('contribute_to_class', 'Meta'):
model_class.add_to_class(field, getattr(cls, field))

# manually add Meta afterwards
setattr(model_class._meta, 'indexes', getattr(model_class._meta, 'indexes', []) + cls.Meta.indexes)

class Meta:
indexes = [
# add index that matches query usage for signature
models.Index(
fields=['signature'],
name='task_result_signature_idx',
condition=Q(status__in=celery_states.UNREADY_STATES),
),
]


# trigger class contributions immediately
TaskResultCustom.contribute_to_class()
3 changes: 2 additions & 1 deletion contentcuration/contentcuration/tests/test_asynctask.py
Original file line number Diff line number Diff line change
@@ -234,7 +234,8 @@ def test_fetch_or_enqueue_task__channel_id__uuid_then_hex(self):
self.assertEqual(expected_task.task_id, async_result.task_id)

def test_requeue_task(self):
existing_task_ids = requeue_test_task.find_ids()
signature = requeue_test_task._generate_signature({})
existing_task_ids = requeue_test_task.find_ids(signature)
self.assertEqual(len(existing_task_ids), 0)

first_async_result = requeue_test_task.enqueue(self.user, requeue=True)
Loading