⚠️ BETA Software: Working on being production-ready soon.
django-qstash is a drop-in replacement for Celery's shared_task
.
In tasks.py
in your apps:
from django_qstash import shared_task
@shared_task
def my_task():
pass
To use Celery too, you can use
@stashed_task
instead of@shared_task
more below.
To do this we need:
- Upstash QStash
- A single public webhook to call
@stashed_task
functions automatically
This allows us to:
- Nearly identical usage to Celery's
@shared_task
with far less configuration and overhead - Focus just on Django
- Drop Celery completely, scale it down, or use it as normal. django-qstash can work hand-in-hand with Celery
- Unlock true serverless and scale-to-zero for Django
- Run background tasks through webhooks
- Cut costs
- Trigger GitHub Actions Workflows or GitLab CI/CD pipelines for handling other kinds of background tasks based on our project's code.
- django-qstash
pip install django-qstash
INSTALLED_APPS = [
##...
"django_qstash",
"django_qstash.results",
"django_qstash.schedules",
##...
]
django_qstash
Includes the@shared_task
and@stashed_task
decorators and webhook viewdjango_qstash.results
(Optional): Store task results in Django DBdjango_qstash.schedules
(Optional): Use QStash Schedules to run yourdjango_qstash
tasks. Out of the box support for django_qstash@stashed_task
. Schedule tasks using cron (e.g.0 0 * * *
) format which is required based on QStash Schedules. use contrab.guru for writing the cron format.
In your ROOT_URLCONF
(e.g. urls.py
), add the following:
from django_qstash.views import qstash_webhook_view
urlpatterns = [
# ...
path("qstash/webhook/", qstash_webhook_view),
# ...
]
Be sure to use this path in your DJANGO_QSTASH_WEBHOOK_PATH
environment variable.
Get your QStash token and signing keys from Upstash.
QSTASH_TOKEN = "your_token"
QSTASH_CURRENT_SIGNING_KEY = "your_current_signing_key"
QSTASH_NEXT_SIGNING_KEY = "your_next_signing_key"
# required for django-qstash
DJANGO_QSTASH_DOMAIN = "https://example.com"
DJANGO_QSTASH_WEBHOOK_PATH = "/qstash/webhook/"
Review .env.sample to see all the environment variables you need to set.
There is a sample project in sample_project/ that shows how all this is implemented.
- Python 3.10+
- Django 5+
- qstash-py
- Upstash account
Django-QStash revolves around the stashed_task
decorator. The goal is to be a drop-in replacement for Celery's shared_task
decorator.
Here's how it works:
- Define a Task
- Call a Task with
.delay()
or.apply_async()
# from celery import shared_task
from django_qstash import shared_task
from django_qstash import stashed_task
@stashed_task
def hello_world(name: str, age: int = None, activity: str = None):
if age is None:
print(f"Hello {name}! I see you're {activity}.")
return
print(f"Hello {name}! I see you're {activity} at {age} years old.")
@shared_task
def hello_world_redux(name: str, age: int = None, activity: str = None):
if age is None:
print(f"Hello {name}! I see you're {activity}.")
return
print(f"Hello {name}! I see you're {activity} at {age} years old.")
hello_world
andhello_world_redux
work the same with django-qstash.- If you use Celery's
@shared_task
instead, Celery would handle onlyhello_world_redux
and django-qstash would handle onlyhello_world
.
Nothing special here. Just call the function like any other to verify it works.
# normal function call
hello_world("Tony Stark", age=40, activity="building in a cave with a box of scraps.")
Using .delay()
or .apply_async()
is how you trigger a background task. These background tasks are actually setting up a QStash message that will be delivered via webhook to your Django application. django-qstash handles the webhook and the message delivery assuming installed correctly.
This functionality is modeled after Celery and it works as you'd expect.
hello_world.delay(
"Tony Stark", age=40, activity="building in a cave with a box of scraps."
)
hello_world.apply_async(
args=("Tony Stark",),
kwargs={"activity": "building in a cave with a box of scraps."},
)
Just use the countdown
parameter to delay the task by N seconds. (always in seconds): .apply_async(*args, **kwargs, countdown=N)
# async task delayed 35 seconds
delay_35_seconds = 35
hello_world.apply_async(
args=("Tony Stark",),
kwargs={"activity": "building in a cave with a box of scraps."},
countdown=delay_35_seconds,
)
Arguments to django-qstash managed functions must be JSON serializable.
The way you find out:
import json
data = {
"args": ("Tony Stark",),
"kwargs": {"activity": "building in a cave with a box of scraps."},
}
print(json.dumps(data))
# no errors, you're good to go.
If you have errors
you'll need to fix them. Here's a few common errors you might see:
- Using a Django queryset directly as an argument
- Using a Django model instance directly as an argument
- Using a datetime object directly as an argument (e.g.
datetime.datetime
ordatetime.date
) instead of a timestamp or date string (e.g.datetime.datetime.now().timestamp()
ordatetime.datetime.now.strftime("%Y-%m-%d")
)
# from celery import shared_task
# becomes
# from django_qstash import shared_task
# or
from django_qstash import stashed_task
@stashed_task
def math_add_task(a, b, save_to_file=False, *args, **kwargs):
logger.info(f"Adding {a} and {b}")
if save_to_file:
with open("math-add-result.txt", "w") as f:
f.write(f"{a} + {b} = {a + b}")
return a + b
Calling:
math_add_task.apply_async(args=(12, 454), save_to_file=True)
is the same as
math_add_task.delay(12, 454, save_to_file=True)
But if you need to delay the task, use .apply_async()
with the countdown
parameter.
five_hours = 5 * 60 * 60
math_add_task.apply_async(
args=(12, 454), kwargs={"save_to_file": True}, countdown=five_hours
)
The .delay()
method does not support a countdown parameter because it simply passes the arguments (*args, **kwargs) to the apply_async()
method.
python manage.py available_tasks
to view all available tasks found by django-qstash. Unlike Celery, django-qstash does not assign tasks to a specific Celery app (e.g.app = Celery()
).
Requires django_qstash.schedules
installed.
python manage.py task_schedules --list
see all schedules relate to theDJANGO_QSTASH_DOMAIN
python manage.py task_schedules --sync
sync schedules based on theDJANGO_QSTASH_DOMAIN
to store in the Django Admin.
django-qstash requires a publicly accessible domain to work (e.g. https://djangoqstash.com
). There are many ways to do this, we recommend:
- Cloudflare Tunnels with a domain name you control.
- ngrok
Once you have a domain name, you can configure the DJANGO_QSTASH_DOMAIN
setting in your Django settings.
Various options are available to configure django-qstash.
- Required: Yes
- Default:
None
- Description: Must be a valid and publicly accessible domain. For example
https://djangoqstash.com
. Review Development usage for setting up a domain name during development.
- Required: Yes
- Default:
/qstash/webhook/
- Description: The path where QStash will send webhooks to your Django application.
- Required: No
- Default:
True
- Description: Whether to force HTTPS for the webhook.
###DJANGO_QSTASH_RESULT_TTL
- Required: No
- Default:
604800
- Description: A number of seconds after which task result data can be safely deleted. Defaults to 604800 seconds (7 days or 7 * 24 * 60 * 60).
For a complete example, review sample_project/settings.py where python-decouple is used to set the environment variables via the .env
file or system environment variables (for production use).
Using os.environ
:
import os
###########################
# django settings
###########################
DJANGO_DEBUG = str(os.environ.get("DJANGO_DEBUG")) == "1"
DJANGO_SECRET_KEY = os.environ.get("DJANGO_SECRET_KEY")
ALLOWED_HOSTS = [os.environ.get("ALLOWED_HOST")]
CSRF_TRUSTED_ORIGINS = [os.environ.get("CSRF_TRUSTED_ORIGIN")]
###########################
# qstash-py settings
###########################
QSTASH_TOKEN = os.environ.get("QSTASH_TOKEN")
QSTASH_CURRENT_SIGNING_KEY = os.environ.get("QSTASH_CURRENT_SIGNING_KEY")
QSTASH_NEXT_SIGNING_KEY = os.environ.get("QSTASH_NEXT_SIGNING_KEY")
###########################
# django_qstash settings
###########################
DJANGO_QSTASH_DOMAIN = os.environ.get("DJANGO_QSTASH_DOMAIN")
DJANGO_QSTASH_WEBHOOK_PATH = os.environ.get("DJANGO_QSTASH_WEBHOOK_PATH")
DJANGO_QSTASH_FORCE_HTTPS = True
DJANGO_QSTASH_RESULT_TTL = 604800
Run background tasks on a CRON schedule.
The django_qstash.schedules
app schedules tasks using Upstash QStash Schedules via @shared_task
or @stashed_task
decorators along with the TaskSchedule
model.
Update your INSTALLED_APPS
setting to include django_qstash.schedules
.
INSTALLED_APPS = [
# ...
"django_qstash", # required
"django_qstash.schedules",
# ...
]
Run migrations:
python manage.py migrate django_qstash_schedules
Tasks must exist before you can schedule them. Review Define a Task for more information.
Here's how you can schedule a task:
- Django Admin (
/admin/django_qstash_schedules/taskschedule/add/
) - Django shell (
python manage.py shell
)
from django_qstash.schedules.models import TaskSchedule
from django_qstash.discovery.utils import discover_tasks
all_available_tasks = discover_tasks(paths_only=True)
desired_task = "django_qstash.results.clear_stale_results_task"
# or desired_task = "example_app.tasks.my_task"
task_to_use = desired_task
if desired_task not in available_task_locations:
task_to_use = available_task_locations[0]
print(f"Using task: {task_to_use}")
TaskSchedule.objects.create(
name="My Schedule",
cron="0 0 * * *",
task_name=task_to_use,
args=["arg1", "arg2"],
kwargs={"kwarg1": "value1", "kwarg2": "value2"},
)
django_qstash.results.clear_stale_results_task
is a built-in task thatdjango_qstash.results
providesargs
andkwargs
are the arguments to pass to the taskcron
is the cron schedule to run the task. Use contrab.guru for writing the cron format.
Retain the results of background tasks in the database with clear-out functionality.
In django_qstash.results.models
we have the TaskResult
model class that can be used to track async task results. These entries are created via the django-qstash webhook view handler (qstash_webhook_view
).
To install it, just add django_qstash.results
to your INSTALLED_APPS
setting.
INSTALLED_APPS = [
# ...
"django_qstash",
"django_qstash.results",
# ...
]
Run migrations:
python manage.py migrate django_qstash_results
Key configuration:
We recommend purging the TaskResult
model after a certain amount of time.
python manage.py clear_stale_results --since 604800
Args:
--since
is the number of seconds ago to clear results for. Defaults to 604800 seconds (7 days or theDJANGO_QSTASH_RESULT_TTL
setting).--no-input
is a flag to skip the confirmation prompt to delete the results.
- Background Task: A function or task that is not part of the request/response cycle.
- Examples include as sending an email, running a report, or updating a database.
- Pro: Background tasks can drastically improve the end-user experience since they can move on with their day while the task runs in the background.
- Con: Processes that run background tasks (like Celery) typically have to run 24/7.
- Scale-to-Zero: Depending on the amount of traffic, Django can be effectively turned off. If done right, when more traffic comes in, Django can be turned back on very quickly.
- Serverless: A cloud computing model where code runs without server management, with scaling and billing tied to usage. Often used interchangeably with "scale-to-zero".
TLDR - Celery cannot be serverless. I want serverless "Celery" so I only pay for the apps that have attention and traffic. Upstash created QStash to help solve the problem of message queues in a serverless environment. django-qstash is the goldilocks that combines the functionality of Celery with the functionality of QStash all to unlock fully serverless Django.
I run a lot of side projects with Django. Some as demos for tutorials based on my work at @codingforentrepreneurs and some are new businesses that haven't found much traction yet.
Most web apps can benefit from async background tasks such as sending emails, running reports, or updating databases.
But how?
Traditionally, I'd reach for Celery but that can get expensive really quick. Running a lot of Django projects can add up too -- "death by a thousand cuts" if you will. A server for Django, for celery worker, for celery beat scheduler, and so on. It adds up fast.
I think serverless is the answer. Pay for what you use and scale to zero when you don't need it and scale up when you do -- all automated.
Django can be serverless and is pretty easy to do thanks to Docker and the countless hosting options and services out there. Celery cannot be serverless, at least yet.
Let's face it. Celery is a powerful tool to run async background tasks but it comes at a cost. It needs at least one server running 24/7. For best performance it needs 2 (one worker, one beat). It also needs Redis or RabbitMQ. Most background processes that are tied to web apps are not serverless; they have to "listen" for their next task.
To make Django truly scale-to-zero and serverless, we need to drop Celery.
Enter django-qstash.
django-qstash is designed to be a near drop-in replacement for Celery's shared_task
decorator.
It works by leveraging Upstash QStash to deliver messages about your tasks (e.g. the function's arguments) via webhooks to your Django application. In the QStash docs, it is described as:
QStash is a serverless messaging and scheduling solution. It fits easily into your existing workflow and allows you to build reliable systems without managing infrastructure.
Instead of calling an endpoint directly, QStash acts as a middleman between you and an API to guarantee delivery, perform automatic retries on failure, and more.
django-qstash has a webhook handler that converts a QStash message to run a specific @shared_task
function (the one that called .delay()
or .apply_async()
). It's easy, it's cheap, it's effective, and best of all, it unlocks the scale-to-zero potential of Django as a serverless app.