Dramatiq (using Redis)
We are working on a Windows project and it would be great if we could find a reliable queuing solution which supports Linux and Windows. I think Dramatiq meets this criteria, but we still have work to do before it can be used for projects in production.
Also see Celery (using Redis) for our current queuing solution for Linux based projects.
Note
We started using dramatiq
for a Windows project…
(because the new version of Celery isn’t supported on Windows).
Provision
Add the following to your pillar
file
(dramatiq: True
, scheduler: True
and celery: False
) e.g.
sites:
hatherleigh_net:
profile: django
celery: False
dramatiq: True
scheduler: True
env:
redis_host: localhost
redis_port: 6379
Configure
Add the following to requirements/base.txt
:
# linux etc...
django-dramatiq==
# or for windows
kb-django-dramatiq==
dramatiq[redis, watch]==
Tip
For windows deployments we use kb-django-dramatiq
instead of
django-dramatiq
.
For details, see
https://github.com/Bogdanp/django_dramatiq/issues/109
and
https://github.com/pkimber/django_dramatiq/blob/master/README.rst
Tip
See Requirements for the current version.
Add the following to .env.fish
:
set -x DOMAIN "dev"
set -x LOG_FOLDER ""
set -x LOG_SUFFIX "dev"
Add the following to .env.fish
(if you are Development with Kubernetes):
set -x REDIS_HOST (kubectl get nodes --namespace default -o jsonpath="{.items[0].status.addresses[0].address}")
set -x REDIS_PORT (kubectl get --namespace default -o jsonpath="{.spec.ports[0].nodePort}" services kb-redis-master)
Add the following to .gitlab-ci.yml
:
- export DOMAIN=dev
- export LOG_FOLDER=
- export LOG_SUFFIX=dev
Add the following to settings/base.py
:
THIRD_PARTY_APPS = (
# add django_dramatiq to installed apps before any of your custom apps
"django_dramatiq",
LOGGING = {
"handlers": {
"logfile": {
"level": "WARNING",
"class": "logging.handlers.RotatingFileHandler",
"filename": os.path.join(
get_env_variable("LOG_FOLDER"),
"{}-{}-logger.log".format(
get_env_variable("DOMAIN").replace("_", "-"),
get_env_variable("LOG_SUFFIX"),
),
),
"maxBytes": 100000000,
"backupCount": 10,
"formatter": "standard",
},
},
Tip
See Logging for a sample LOGGING
configuration.
In your user settings e.g. settings/dev_patrick.py
:
DATABASE_NAME = get_env_variable("DATABASE_NAME")
REDIS_HOST = get_env_variable("REDIS_HOST")
REDIS_PORT = get_env_variable("REDIS_PORT")
# https://dramatiq.io/reference.html#middleware
DRAMATIQ_BROKER = {
"BROKER": "dramatiq.brokers.redis.RedisBroker",
"OPTIONS": {"url": "redis://{}:{}/0".format(REDIS_HOST, REDIS_PORT)},
"MIDDLEWARE": [
# drops messages that have been in the queue for too long
"dramatiq.middleware.AgeLimit",
# cancels actors that run for too long
"dramatiq.middleware.TimeLimit",
# lets you chain success and failure callbacks
"dramatiq.middleware.Callbacks",
# automatically retries failed tasks with exponential backoff
"dramatiq.middleware.Retries",
#
# Cleans up db connections on worker shutdown.
#
# This middleware is vital in taking care of closing expired
# connections after each message is processed.
"django_dramatiq.middleware.DbConnectionsMiddleware",
],
}
# KB Software queue name (to allow multiple sites on one server)
DRAMATIQ_QUEUE_NAME = DATABASE_NAME
DRAMATIQ_QUEUE_NAME_PIPELINE = DRAMATIQ_QUEUE_NAME
In settings/dev_test.py
:
DRAMATIQ_BROKER = {
"BROKER": "dramatiq.brokers.stub.StubBroker",
"OPTIONS": {},
"MIDDLEWARE": [
"dramatiq.middleware.AgeLimit",
"dramatiq.middleware.TimeLimit",
"dramatiq.middleware.Callbacks",
"dramatiq.middleware.Pipelines",
"dramatiq.middleware.Retries",
],
}
# KB Software queue name (to allow multiple sites on one server)
DRAMATIQ_QUEUE_NAME = DATABASE_NAME
Tip
Make sure you use a DATABASE_NAME
variable in the DATABASES
settings.
In settings/production.py
:
REDIS_HOST = get_env_variable("REDIS_HOST")
REDIS_PORT = get_env_variable("REDIS_PORT")
# https://dramatiq.io/reference.html#middleware
DRAMATIQ_BROKER = {
"BROKER": "dramatiq.brokers.redis.RedisBroker",
"OPTIONS": {"url": "redis://{}:{}/0".format(REDIS_HOST, REDIS_PORT)},
"MIDDLEWARE": [
# drops messages that have been in the queue for too long
"dramatiq.middleware.AgeLimit",
# cancels actors that run for too long
"dramatiq.middleware.TimeLimit",
# lets you chain success and failure callbacks
"dramatiq.middleware.Callbacks",
# automatically retries failed tasks with exponential backoff
"dramatiq.middleware.Retries",
#
# Cleans up db connections on worker shutdown.
#
# This middleware is vital in taking care of closing expired
# connections after each message is processed.
"django_dramatiq.middleware.DbConnectionsMiddleware",
],
}
# KB Software queue name (to allow multiple sites on one server)
DRAMATIQ_QUEUE_NAME = DATABASE_NAME
DRAMATIQ_QUEUE_NAME_PIPELINE = "{}_pipeline".format(DATABASE_NAME)
Tip
Make sure you use a DATABASE_NAME
variable in the DATABASES
settings (the DATABASE_NAME
is used by APScheduler) e.g:
DOMAIN = get_env_variable("DOMAIN")
DATABASE_NAME = DOMAIN.replace(".", "_").replace("-", "_")
Maintenance / Debug
Flush all / drop all messages from the queue:
import dramatiq
broker = dramatiq.get_broker()
broker.flush_all()
Tip
There is a flush(queue_name)
method if you don’t want to clear all
of the queues.
Outstanding messages:
import dramatiq
from django.conf import settings
broker = dramatiq.get_broker()
# list the queues
broker.get_declared_queues()
consumer = broker.consume(settings.DRAMATIQ_QUEUE_NAME)
# how many outstanding messages (does this work)
consumer.outstanding_message_count
Tasks
import dramatiq
import logging
logger = logging.getLogger(__name__)
# https://dramatiq.io/cookbook.html#binding-worker-groups-to-queues
@dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME)
def process_steps():
logger.info("pipeline.tasks.process_steps")
from pipeline.models import PipelineProcessStep
count = PipelineProcessStep.objects.process()
logger.info(f"'pipeline.tasks.process_steps' - {count} records - complete")
return count
Tip
To add error handling, see Error / Exception Handling (below)
Tip
To limit the retry count:
@dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME, max_retries=0)
Tip
To extend the timeout:
@dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE, max_retries=0, time_limit=3600000)
Error / Exception Handling
To handle errors, create an exception_notify_by_email
function:
from mail.models import Notify
from mail.service import queue_mail_message
logger = logging.getLogger(__name__)
def exception_notify_by_email(decscription, e, system_user):
email_addresses = [n.email for n in Notify.objects.all()]
if email_addresses:
queue_mail_message(
system_user,
email_addresses,
f"Error '{decscription}'",
f"Cannot '{decscription}': {e}",
)
else:
logger.error(
"Cannot send email notification. "
"No email addresses set-up in 'mail.models.Notify'"
)
And catch any exceptions e.g:
from django.contrib.auth import get_user_model
from login.models import SYSTEM_GENERATED
from somewhere.models import exception_notify_by_email
user = get_user_model().objects.get(username=SYSTEM_GENERATED)
try:
count = PipelineProcessStep.objects.process()
except Exception as e:
logger.exception(e)
exception_notify_by_email("pipeline.tasks.process_steps", e, user)
raise
In your tests, you will need:
from login.models import SYSTEM_GENERATED
from login.tests.factories import UserFactory
UserFactory(username=SYSTEM_GENERATED)
Examples
Here are some example tasks: https://gitlab.com/kb/pipeline/blob/master/pipeline/tasks.py
To call the task:
transaction.on_commit(
lambda: process_step.send_with_options(
args=(pipeline_process_step.pk,)
)
)
Tip
You can add a delay… e.g.
count_words.send_with_options(args=("example",), delay=10000)
or:
transaction.on_commit(lambda: process_mail.send())
or:
process_steps.send()
Note
Copied from https://gitlab.com/kb/pipeline/blob/master/pipeline/models.py
Note
It is important to set the DRAMATIQ_QUEUE_NAME
when defining the
actor
e.g.
@dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME)
.
Note
It doesn’t seem to be possible to select the queue name when calling
send_with_options
(I tried and failed).
Management Commands
Create a start_dramatiq_workers
management command in your project e.g:
# -*- encoding: utf-8 -*-
from django.conf import settings
from base.dramatiq_utils import DramatiqBaseCommandMixin
class Command(DramatiqBaseCommandMixin):
"""Start Dramatiq workers - using the correct queue name."""
PROCESSES = 3
def get_queue_name(self):
return settings.DRAMATIQ_QUEUE_NAME
Tip
To reload the Dramatiq workers when the code changes,
add the --reload
parameter.