Celery (using Redis)
Also see Dramatiq (using Redis) for an alternative to Celery which we are using for one of our Windows projects (still needs scheduling and Salt states).
To use a Celery queue in your project…
Add the following to requirements/base.txt
:
celery
redis
Create a celery.py
file in the project
folder:
# -*- encoding: utf-8 -*-
from celery import Celery
from django.conf import settings
app = Celery('project')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
Add the following to your project/__init__.py
file:
from .celery import app as celery_app
In your settings/production.py
file, you will have the following:
DOMAIN = get_env_variable('DOMAIN')
DATABASE = DOMAIN.replace('.', '_').replace('-', '_')
DATABASES = {
...
Under DATABASES
, add the following:
# Celery
from kombu import Exchange, Queue
# transport
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
# number of worker processes (will be 3 == controller, worker and beat)
CELERYD_CONCURRENCY = 1
# rate limits
CELERY_DISABLE_RATE_LIMITS = True
# serializer
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
# queue
CELERY_DEFAULT_QUEUE = DATABASE
CELERY_QUEUES = (
Queue(DATABASE, Exchange(DATABASE), routing_key=DATABASE),
)
Note
If you are writing an example application, then just use
CELERY_ALWAYS_EAGER
(as shown below).
Tip
If you want to test this locally, then copy DOMAIN
, DATABASE
,
and the Celery section into your dev file e.g. dev_patrick.py
.
Check your environment settings to make sure the DOMAIN
is set to
the value you want it set to.
Note
To use RabbitMQ, just remove BROKER_URL
and
CELERY_RESULT_BACKEND
.
In your settings/dev_test.py
file (below DATABASES
), add the
following:
# http://docs.celeryproject.org/en/2.5/django/unit-testing.html
CELERY_ALWAYS_EAGER = True
To start the task queue on your development system:
celery -A project worker --loglevel=info
If you are using an example app, then replace project
with the folder of the
example app e.g:
celery -A example_xero worker --loglevel=info --logfile="logger-worker.log"
To deploy, add celery
and redis
to your pillar e.g:
# sites/my.sls
sites:
www.hatherleigh.info:
package: hatherleigh_info
profile: django
celery: True
Create a redis
sls
:
# config/redis.sls
redis:
True
And add it to the config for the server e.g:
# top.sls
'test-a':
- config.redis
- sites.my
cron
To create a periodic (cron
like task), start by create a function in your
app/tasks.py
file (where app
is an application in your project):
from celery import task
@task()
def process_periodic_task():
"""Nothing to do... just testing."""
pass
In your settings/base.py
file, set-up the schedule e.g:
# periodic tasks (requires 'beat')
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'process-every-minute': {
'task': 'app.tasks.process_periodic_task',
'schedule': crontab(minute='*/1'),
},
}
Warning
If the tasks should only be run on a production system, then add
to settings/production.py
To start the cron queue on your development system:
celery -A project beat --loglevel=info
Development
Tasks
Warning
Remember to use the correct pattern for transactions when adding tasks to the queue. For details, see below…
To create a task, create a function in your app/tasks.py
file (where
app
is an application in your project) e.g:
from celery import task
@task()
def sync_document():
# some example code
with transaction.atomic():
qs = TestModel.objects.select_for_update().filter(complete=True)
To add this task to the queue (if you are not in a transaction):
from workflow.tasks import create_workflows
create_workflows.apply_async(args=[workflow.pk], countdown=1)
# or
create_workflows.delay()
Django provides the on_commit
function to register callback functions that
should be executed after a transaction is successfully committed:
from django.db import transaction
from dash.tasks import sync_document
with transaction.atomic():
# if you are in a transaction
transaction.on_commit(lambda: sync_document.apply_async(
args=[self.object.pk],
countdown=2,
))
# or
transaction.on_commit(lambda: sync_document.delay(self.object.pk))
# or
transaction.on_commit(lambda: process_mail.delay())
countdown
is a shortcut to set the estimated time of arrival by seconds into the future.
To get the ID of the current task (from How do I get the task ID):
@app.task(bind=True)
def mytask(self):
# self.request.id is the ID of the current task
cache.set(self.request.id, "Running")
Logging
Just append the logfile
option e.g:
celery -A project worker --loglevel=info --logfile="celery.log"
Monitor
List the queues:
redis-cli keys \*
List the number of messages in a queue:
redis-cli llen www_hatherleigh_info
Purge
To purge existing tasks:
celery -A project purge