Skip to content

Commit

Permalink
Fix potential deadlock during migrations caused by Celery Beat (#75)
Browse files Browse the repository at this point in the history
* Prepare `test_app` and tests for new implementation

* Implement scheduler

* Document deadlock caveat
  • Loading branch information
edg956 authored Apr 28, 2023
1 parent 2a448da commit 40712c6
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 6 deletions.
21 changes: 19 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,31 @@ That way you have full control over which schemas the task should be scheduled i


### Custom scheduler
You are using the standard `Scheduler` or `PersistentScheduler` classes provided by `celery`, you can transition to using this package's `TenantAwareScheduler` or `TenantAwarePersistentScheduler` classes. You should then specify the scheduler you want to use in your invocation to `beat`. i.e:
If you are using the standard `Scheduler` or `PersistentScheduler` classes provided by `celery`, you can transition to using this package's `TenantAwareScheduler` or `TenantAwarePersistentScheduler` classes. You should then specify the scheduler you want to use in the celery beat config or your invocation to `beat`. i.e:

```bash
celery -A proj beat --scheduler=tenant_schemas_celery.scheduler.TenantAwareScheduler
```

#### Caveats
`TenantAwareSchedulerMixin` uses a subclass of `SchedulerEntry` that allows the user to provide specific schemas to run a task on. This might prove useful if you have a task you only want to run in the `public` schema or to a subset of your tenants. In order to use set that, you must configure `tenant_schemas` in the tasks definition as such:
- There's a chance that celery beat will try to run a task for a newly created tenant before its migrations are ready, which could potentially lead to deadlocks. This is specially true with big projects with a lot of migrations and very frequent tasks (i.e: every minute). In order to mitigate it, one could do the following:
1. Subclass any of `TenantAwareScheduler` or `TenantAwarePersistentScheduler` and override the `get_queryset` method to match your definition of "ready" tenants. For example, imagine that you had a `ready` flag in your tenant model. You could do the following:

```python
# tenants_app/scheduler.py
class MyTenantAwareScheduler(TenantAwareScheduler):
@classmethod
def get_queryset(cls):
return super().get_queryset().filter(ready=True)
```

2. Use the new scheduler in your celery beat config or invocation:

```bash
celery -A proj beat --scheduler=tenants_app.scheduler.MyTenantAwareScheduler
```

- `TenantAwareSchedulerMixin` uses a subclass of `SchedulerEntry` that allows the user to provide specific schemas to run a task on. This might prove useful if you have a task you only want to run in the `public` schema or to a subset of your tenants. In order to use set that, you must configure `tenant_schemas` in the tasks definition as such:

```python
app.conf.beat_schedule = {
Expand Down
9 changes: 8 additions & 1 deletion tenant_schemas_celery/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@

from celery.beat import PersistentScheduler, ScheduleEntry, Scheduler
from django_tenants.utils import get_tenant_model, tenant_context, get_public_schema_name
from django.db import models

logger = logging.getLogger(__name__)

Tenant = get_tenant_model()


class TenantAwareScheduleEntry(ScheduleEntry):
tenant_schemas: Optional[List[str]] = None
Expand Down Expand Up @@ -68,12 +71,16 @@ def editable_fields_equal(self, other):
class TenantAwareSchedulerMixin:
Entry = TenantAwareScheduleEntry

@classmethod
def get_queryset(cls) -> models.QuerySet:
return Tenant.objects.all()

def apply_entry(self, entry: TenantAwareScheduleEntry, producer=None):
"""
See https://github.com/celery/celery/blob/c571848023be732a1a11d46198cf831a522cfb54/celery/beat.py#L277
"""

tenants = get_tenant_model().objects.all()
tenants = self.get_queryset()

if entry.tenant_schemas is None:
tenants = tenants.exclude(schema_name=get_public_schema_name())
Expand Down
32 changes: 29 additions & 3 deletions tenant_schemas_celery/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
TenantAwareSchedulerMixin,
)

Tenant = get_tenant_model()


class FakeScheduler(TenantAwareSchedulerMixin, Scheduler):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -93,10 +95,10 @@ def test_schedule_setup_properly(
@fixture
def tenants(self) -> None:
with schema_context(get_public_schema_name()):
get_tenant_model().objects.create(
Tenant.objects.create(
name="Tenant1", schema_name="tenant1"
)
get_tenant_model().objects.create(
Tenant.objects.create(
name="Tenant2", schema_name="tenant2"
)

Expand All @@ -107,7 +109,7 @@ def test_apply_entry(self, scheduler: FakeScheduler, tenants: None):

schemas = (
entry.tenant_schemas
or get_tenant_model().objects.values_list(
or Tenant.objects.values_list(
"schema_name", flat=True
)
)
Expand All @@ -117,6 +119,30 @@ def test_apply_entry(self, scheduler: FakeScheduler, tenants: None):

scheduler._sent.clear()

@mark.django_db
class TestCustomQuerySet:
@fixture
def scheduler(self, app: CeleryApp) -> FakeScheduler:
class WithCustomQuerySet(FakeScheduler):
@classmethod
def get_queryset(cls):
return super().get_queryset().filter(ready=True)

return WithCustomQuerySet(app)

def test_unready_tenants_are_not_sent(self, scheduler: FakeScheduler):
with schema_context(get_public_schema_name()):
Tenant.objects.create(
name="Tenant1",
schema_name="tenant1",
ready=False
)

for task_name, entry in scheduler.schedule.items():
scheduler.apply_entry(entry)

assert scheduler._sent == []


@COMMON_PARAMETERS
class TestTenantAwarePersistentScheduler:
Expand Down
1 change: 1 addition & 0 deletions test_app/test_app/shared/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

class Client(TenantMixin):
name = models.CharField(max_length=16)
ready = models.BooleanField(default=False)


class Domain(TenantMixin):
Expand Down

0 comments on commit 40712c6

Please sign in to comment.