Dramatiq (using Redis)

We are working on a Windows project and it would be great if we could find a reliable queuing solution which supports Linux and Windows. I think Dramatiq meets this criteria, but we still have work to do before it can be used for projects in production.

Note

We started using dramatiq for a Windows project… (because the new version of Celery isn’t supported on Windows).

Provision

Add the following to your pillar file (dramatiq: True, scheduler: True and celery: False) e.g.

sites:
  hatherleigh_net:
    profile: django
    celery: False
    dramatiq: True
    scheduler: True
    env:
      redis_host: localhost
      redis_port: 6379

Configure

Add the following to requirements/base.txt:

# linux etc...
django-dramatiq==
# or for windows
kb-django-dramatiq==

dramatiq[redis, watch]==

Tip

For windows deployments we use kb-django-dramatiq instead of django-dramatiq. For details, see https://github.com/Bogdanp/django_dramatiq/issues/109 and https://github.com/pkimber/django_dramatiq/blob/master/README.rst

Tip

See Requirements for the current version.

Add the following to .env.fish:

set -x DOMAIN "dev"
set -x LOG_FOLDER ""
set -x LOG_SUFFIX "dev"

Add the following to .env.fish (if you are Development with Kubernetes):

set -x REDIS_HOST (kubectl get nodes --namespace default -o jsonpath="{.items[0].status.addresses[0].address}")
set -x REDIS_PORT (kubectl get --namespace default -o jsonpath="{.spec.ports[0].nodePort}" services kb-redis-master)

Add the following to .gitlab-ci.yml:

- export DOMAIN=dev
- export LOG_FOLDER=
- export LOG_SUFFIX=dev

Add the following to settings/base.py:

THIRD_PARTY_APPS = (
    # add django_dramatiq to installed apps before any of your custom apps
    "django_dramatiq",

LOGGING = {
    "handlers": {
        "logfile": {
            "level": "WARNING",
            "class": "logging.handlers.RotatingFileHandler",
            "filename": os.path.join(
                get_env_variable("LOG_FOLDER"),
                "{}-{}-logger.log".format(
                    get_env_variable("DOMAIN").replace("_", "-"),
                    get_env_variable("LOG_SUFFIX"),
                ),
            ),
            "maxBytes": 100000000,
            "backupCount": 10,
            "formatter": "standard",
        },
    },

Tip

See Logging for a sample LOGGING configuration.

In your user settings e.g. settings/dev_patrick.py:

DATABASE_NAME = get_env_variable("DATABASE_NAME")

REDIS_HOST = get_env_variable("REDIS_HOST")
REDIS_PORT = get_env_variable("REDIS_PORT")
# https://dramatiq.io/reference.html#middleware
DRAMATIQ_BROKER = {
    "BROKER": "dramatiq.brokers.redis.RedisBroker",
    "OPTIONS": {"url": "redis://{}:{}/0".format(REDIS_HOST, REDIS_PORT)},
    "MIDDLEWARE": [
        # drops messages that have been in the queue for too long
        "dramatiq.middleware.AgeLimit",
        # cancels actors that run for too long
        "dramatiq.middleware.TimeLimit",
        # lets you chain success and failure callbacks
        "dramatiq.middleware.Callbacks",
        # automatically retries failed tasks with exponential backoff
        "dramatiq.middleware.Retries",
        #
        # Cleans up db connections on worker shutdown.
        #
        # This middleware is vital in taking care of closing expired
        # connections after each message is processed.
        "django_dramatiq.middleware.DbConnectionsMiddleware",
    ],
}
# KB Software queue name (to allow multiple sites on one server)
DRAMATIQ_QUEUE_NAME = DATABASE_NAME
DRAMATIQ_QUEUE_NAME_PIPELINE = DRAMATIQ_QUEUE_NAME

In settings/dev_test.py:

DRAMATIQ_BROKER = {
    "BROKER": "dramatiq.brokers.stub.StubBroker",
    "OPTIONS": {},
    "MIDDLEWARE": [
        "dramatiq.middleware.AgeLimit",
        "dramatiq.middleware.TimeLimit",
        "dramatiq.middleware.Callbacks",
        "dramatiq.middleware.Pipelines",
        "dramatiq.middleware.Retries",
    ],
}
# KB Software queue name (to allow multiple sites on one server)
DRAMATIQ_QUEUE_NAME = DATABASE_NAME

Tip

Make sure you use a DATABASE_NAME variable in the DATABASES settings.

In settings/production.py:

REDIS_HOST = get_env_variable("REDIS_HOST")
REDIS_PORT = get_env_variable("REDIS_PORT")
# https://dramatiq.io/reference.html#middleware
DRAMATIQ_BROKER = {
    "BROKER": "dramatiq.brokers.redis.RedisBroker",
    "OPTIONS": {"url": "redis://{}:{}/0".format(REDIS_HOST, REDIS_PORT)},
    "MIDDLEWARE": [
        # drops messages that have been in the queue for too long
        "dramatiq.middleware.AgeLimit",
        # cancels actors that run for too long
        "dramatiq.middleware.TimeLimit",
        # lets you chain success and failure callbacks
        "dramatiq.middleware.Callbacks",
        # automatically retries failed tasks with exponential backoff
        "dramatiq.middleware.Retries",
        #
        # Cleans up db connections on worker shutdown.
        #
        # This middleware is vital in taking care of closing expired
        # connections after each message is processed.
        "django_dramatiq.middleware.DbConnectionsMiddleware",
    ],
}
# KB Software queue name (to allow multiple sites on one server)
DRAMATIQ_QUEUE_NAME = DATABASE_NAME
DRAMATIQ_QUEUE_NAME_PIPELINE = "{}_pipeline".format(DATABASE_NAME)

Tip

Make sure you use a DATABASE_NAME variable in the DATABASES settings (the DATABASE_NAME is used by APScheduler) e.g:

DOMAIN = get_env_variable("DOMAIN")
DATABASE_NAME = DOMAIN.replace(".", "_").replace("-", "_")

Maintenance / Debug

Flush all / drop all messages from the queue:

import dramatiq
broker = dramatiq.get_broker()
broker.flush_all()

Tip

There is a flush(queue_name) method if you don’t want to clear all of the queues.

Outstanding messages:

import dramatiq
from django.conf import settings

broker = dramatiq.get_broker()
# list the queues
broker.get_declared_queues()
consumer = broker.consume(settings.DRAMATIQ_QUEUE_NAME)
# how many outstanding messages (does this work)
consumer.outstanding_message_count

Tasks

import dramatiq
import logging

logger = logging.getLogger(__name__)

# https://dramatiq.io/cookbook.html#binding-worker-groups-to-queues
@dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME)
def process_steps():
    logger.info("pipeline.tasks.process_steps")
    from pipeline.models import PipelineProcessStep
    count = PipelineProcessStep.objects.process()
    logger.info(f"'pipeline.tasks.process_steps' - {count} records - complete")
    return count

Tip

To add error handling, see Error / Exception Handling (below)

Tip

To limit the retry count:

@dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME, max_retries=0)

Tip

To extend the timeout:

@dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE, max_retries=0, time_limit=3600000)

Error / Exception Handling

To handle errors, create an exception_notify_by_email function:

from mail.models import Notify
from mail.service import queue_mail_message

logger = logging.getLogger(__name__)

def exception_notify_by_email(decscription, e, system_user):
    email_addresses = [n.email for n in Notify.objects.all()]
    if email_addresses:
        queue_mail_message(
            system_user,
            email_addresses,
            f"Error '{decscription}'",
            f"Cannot '{decscription}': {e}",
        )
    else:
        logger.error(
            "Cannot send email notification.  "
            "No email addresses set-up in 'mail.models.Notify'"
        )

And catch any exceptions e.g:

from django.contrib.auth import get_user_model
from login.models import SYSTEM_GENERATED
from somewhere.models import exception_notify_by_email

user = get_user_model().objects.get(username=SYSTEM_GENERATED)
try:
    count = PipelineProcessStep.objects.process()
except Exception as e:
    logger.exception(e)
    exception_notify_by_email("pipeline.tasks.process_steps", e, user)
    raise

In your tests, you will need:

from login.models import SYSTEM_GENERATED
from login.tests.factories import UserFactory
UserFactory(username=SYSTEM_GENERATED)

Examples

Here are some example tasks: https://gitlab.com/kb/pipeline/blob/master/pipeline/tasks.py

To call the task:

transaction.on_commit(
    lambda: process_step.send_with_options(
        args=(pipeline_process_step.pk,)
    )
)

Tip

You can add a delay… e.g. count_words.send_with_options(args=("example",), delay=10000)

or:

transaction.on_commit(lambda: process_mail.send())

or:

process_steps.send()

Note

It is important to set the DRAMATIQ_QUEUE_NAME when defining the actor e.g. @dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME).

Note

It doesn’t seem to be possible to select the queue name when calling send_with_options (I tried and failed).

Management Commands

Create a start_dramatiq_workers management command in your project e.g:

# -*- encoding: utf-8 -*-
from django.conf import settings

from base.dramatiq_utils import DramatiqBaseCommandMixin

class Command(DramatiqBaseCommandMixin):
    """Start Dramatiq workers - using the correct queue name."""

    PROCESSES = 3

    def get_queue_name(self):
        return settings.DRAMATIQ_QUEUE_NAME

Tip

To reload the Dramatiq workers when the code changes, add the --reload parameter.