APScheduler (with Redis)

Tip

Before you use APScheduler, Check Celery has been removed from your project

Install Dramatiq (using Redis).

Add the following to requirements/base.txt:

APScheduler==

Tip

See Requirements for the current version.

Create a management command to start the scheduler:

project/management/commands/start_scheduler.py

Warning

The scheduler cannot access the Django database, but it can push tasks to the Dramatiq queue. To workaround this issue, you need to create schedule functions e.g. schedule_rebuild_contact_index (see below).

Tip

I have started using max_instances=1 and replace_existing=True to try and stop multiple processes from running at the same time.

This command will create the scheduler and add jobs e.g:

# project/management/commands/start_scheduler.py
from django.core.management.base import BaseCommand
from pytz import utc

from base.scheduler_utils import create_scheduler


class Command(BaseCommand):
    """Start APScheduler scheduler."""

    help = "Start APScheduler..."

    def handle(self, *args, **options):
        self.stdout.write("{}...".format(self.help))
        scheduler = create_scheduler()
        self.stdout.write("Scheduler, adding jobs...")
        # process_mail
        scheduler.add_job(
            "mail.tasks:schedule_process_mail",
            "interval",
            minutes=60,
            id="schedule_process_mail",
            max_instances=1,
            replace_existing=True,
        )
        # rebuild_contact_index
        scheduler.add_job(
            "contact.tasks:schedule_rebuild_contact_index",
            "cron",
            hour=1,
            minute=10,
            id="schedule_rebuild_contact_index",
            max_instances=1,
            replace_existing=True,
        )
        self.stdout.write("Scheduler, starting...")
        scheduler.start()
        self.stdout.write("{} - Complete".format(self.help))
# from contact/tasks.py'
import logging
from django.utils import timezone

logger = logging.getLogger(__name__)

def schedule_rebuild_contact_index():
    logger.info(
        ">>> schedule_rebuild_contact_index: {}".format(
            timezone.localtime(timezone.now()).strftime("%d/%m/%Y %H:%M")
        )
    )
    rebuild_contact_index.send()

In your task (e.g. rebuild_contact_index) you may want to catch and log exceptions rather than letting Dramatiq retry them over and over again. Settings max_retries=0 may also be a useful strategy e.g:

@dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME, max_retries=0)
def rebuild_contact_index():
    count = None
    logger.info("'rebuild_contact_index'...")
    try:
        count = Contact.objects.rebuild_contact_index()
    except Exception as e:
        logger.exception(e)
    logger.info(f"'rebuild_contact_index' - {count} records)
    return count

Check Celery has been removed from your project

To remove Celery from your project:

# 1. remove the following line from 'project/__init__.py'
from .celery import app as celery_app
# 2. delete 'project/celery.py'
# 3. Remove Celery from settings files...
# 4. Remove ``celery`` from requirements.