diff --git a/README.md b/README.md index 7dfba88..c59178d 100644 --- a/README.md +++ b/README.md @@ -131,14 +131,31 @@ That way you have full control over which schemas the task should be scheduled i ### Custom scheduler -You are using the standard `Scheduler` or `PersistentScheduler` classes provided by `celery`, you can transition to using this package's `TenantAwareScheduler` or `TenantAwarePersistentScheduler` classes. You should then specify the scheduler you want to use in your invocation to `beat`. i.e: +If you are using the standard `Scheduler` or `PersistentScheduler` classes provided by `celery`, you can transition to using this package's `TenantAwareScheduler` or `TenantAwarePersistentScheduler` classes. You should then specify the scheduler you want to use in the celery beat config or your invocation to `beat`. i.e: ```bash celery -A proj beat --scheduler=tenant_schemas_celery.scheduler.TenantAwareScheduler ``` #### Caveats -`TenantAwareSchedulerMixin` uses a subclass of `SchedulerEntry` that allows the user to provide specific schemas to run a task on. This might prove useful if you have a task you only want to run in the `public` schema or to a subset of your tenants. In order to use set that, you must configure `tenant_schemas` in the tasks definition as such: +- There's a chance that celery beat will try to run a task for a newly created tenant before its migrations are ready, which could potentially lead to deadlocks. This is specially true with big projects with a lot of migrations and very frequent tasks (i.e: every minute). In order to mitigate it, one could do the following: + 1. Subclass any of `TenantAwareScheduler` or `TenantAwarePersistentScheduler` and override the `get_queryset` method to match your definition of "ready" tenants. For example, imagine that you had a `ready` flag in your tenant model. You could do the following: + + ```python + # tenants_app/scheduler.py + class MyTenantAwareScheduler(TenantAwareScheduler): + @classmethod + def get_queryset(cls): + return super().get_queryset().filter(ready=True) + ``` + + 2. Use the new scheduler in your celery beat config or invocation: + + ```bash + celery -A proj beat --scheduler=tenants_app.scheduler.MyTenantAwareScheduler + ``` + +- `TenantAwareSchedulerMixin` uses a subclass of `SchedulerEntry` that allows the user to provide specific schemas to run a task on. This might prove useful if you have a task you only want to run in the `public` schema or to a subset of your tenants. In order to use set that, you must configure `tenant_schemas` in the tasks definition as such: ```python app.conf.beat_schedule = { diff --git a/tenant_schemas_celery/scheduler.py b/tenant_schemas_celery/scheduler.py index 35bc734..f95db35 100644 --- a/tenant_schemas_celery/scheduler.py +++ b/tenant_schemas_celery/scheduler.py @@ -3,9 +3,12 @@ from celery.beat import PersistentScheduler, ScheduleEntry, Scheduler from django_tenants.utils import get_tenant_model, tenant_context, get_public_schema_name +from django.db import models logger = logging.getLogger(__name__) +Tenant = get_tenant_model() + class TenantAwareScheduleEntry(ScheduleEntry): tenant_schemas: Optional[List[str]] = None @@ -68,12 +71,16 @@ def editable_fields_equal(self, other): class TenantAwareSchedulerMixin: Entry = TenantAwareScheduleEntry + @classmethod + def get_queryset(cls) -> models.QuerySet: + return Tenant.objects.all() + def apply_entry(self, entry: TenantAwareScheduleEntry, producer=None): """ See https://github.com/celery/celery/blob/c571848023be732a1a11d46198cf831a522cfb54/celery/beat.py#L277 """ - tenants = get_tenant_model().objects.all() + tenants = self.get_queryset() if entry.tenant_schemas is None: tenants = tenants.exclude(schema_name=get_public_schema_name()) diff --git a/tenant_schemas_celery/test_scheduler.py b/tenant_schemas_celery/test_scheduler.py index a563f4a..cab435a 100644 --- a/tenant_schemas_celery/test_scheduler.py +++ b/tenant_schemas_celery/test_scheduler.py @@ -13,6 +13,8 @@ TenantAwareSchedulerMixin, ) +Tenant = get_tenant_model() + class FakeScheduler(TenantAwareSchedulerMixin, Scheduler): def __init__(self, *args, **kwargs): @@ -93,10 +95,10 @@ def test_schedule_setup_properly( @fixture def tenants(self) -> None: with schema_context(get_public_schema_name()): - get_tenant_model().objects.create( + Tenant.objects.create( name="Tenant1", schema_name="tenant1" ) - get_tenant_model().objects.create( + Tenant.objects.create( name="Tenant2", schema_name="tenant2" ) @@ -107,7 +109,7 @@ def test_apply_entry(self, scheduler: FakeScheduler, tenants: None): schemas = ( entry.tenant_schemas - or get_tenant_model().objects.values_list( + or Tenant.objects.values_list( "schema_name", flat=True ) ) @@ -117,6 +119,30 @@ def test_apply_entry(self, scheduler: FakeScheduler, tenants: None): scheduler._sent.clear() + @mark.django_db + class TestCustomQuerySet: + @fixture + def scheduler(self, app: CeleryApp) -> FakeScheduler: + class WithCustomQuerySet(FakeScheduler): + @classmethod + def get_queryset(cls): + return super().get_queryset().filter(ready=True) + + return WithCustomQuerySet(app) + + def test_unready_tenants_are_not_sent(self, scheduler: FakeScheduler): + with schema_context(get_public_schema_name()): + Tenant.objects.create( + name="Tenant1", + schema_name="tenant1", + ready=False + ) + + for task_name, entry in scheduler.schedule.items(): + scheduler.apply_entry(entry) + + assert scheduler._sent == [] + @COMMON_PARAMETERS class TestTenantAwarePersistentScheduler: diff --git a/test_app/test_app/shared/models.py b/test_app/test_app/shared/models.py index 18b2197..691e285 100644 --- a/test_app/test_app/shared/models.py +++ b/test_app/test_app/shared/models.py @@ -6,6 +6,7 @@ class Client(TenantMixin): name = models.CharField(max_length=16) + ready = models.BooleanField(default=False) class Domain(TenantMixin):