APScheduler (with Redis)
Tip
Before you use APScheduler, Check Celery has been removed from your project …
Install Dramatiq (using Redis).
Add the following to requirements/base.txt
:
APScheduler==
Tip
See Requirements for the current version.
Create a management command to start the scheduler:
project/management/commands/start_scheduler.py
Warning
The scheduler cannot access the Django database, but it can push
tasks to the Dramatiq queue.
To workaround this issue, you need to create schedule functions
e.g. schedule_rebuild_contact_index
(see below).
Tip
I have started using max_instances=1
and replace_existing=True
to try and stop multiple processes from running at the same time.
This command will create the scheduler and add jobs e.g:
# project/management/commands/start_scheduler.py
from django.core.management.base import BaseCommand
from pytz import utc
from base.scheduler_utils import create_scheduler
class Command(BaseCommand):
"""Start APScheduler scheduler."""
help = "Start APScheduler..."
def handle(self, *args, **options):
self.stdout.write("{}...".format(self.help))
scheduler = create_scheduler()
self.stdout.write("Scheduler, adding jobs...")
# process_mail
scheduler.add_job(
"mail.tasks:schedule_process_mail",
"interval",
minutes=60,
id="schedule_process_mail",
max_instances=1,
replace_existing=True,
)
# rebuild_contact_index
scheduler.add_job(
"contact.tasks:schedule_rebuild_contact_index",
"cron",
hour=1,
minute=10,
id="schedule_rebuild_contact_index",
max_instances=1,
replace_existing=True,
)
self.stdout.write("Scheduler, starting...")
scheduler.start()
self.stdout.write("{} - Complete".format(self.help))
# from contact/tasks.py'
import logging
from django.utils import timezone
logger = logging.getLogger(__name__)
def schedule_rebuild_contact_index():
logger.info(
">>> schedule_rebuild_contact_index: {}".format(
timezone.localtime(timezone.now()).strftime("%d/%m/%Y %H:%M")
)
)
rebuild_contact_index.send()
In your task (e.g. rebuild_contact_index
) you may want to catch and log
exceptions rather than letting Dramatiq retry them over and over again.
Settings max_retries=0
may also be a useful strategy e.g:
@dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME, max_retries=0)
def rebuild_contact_index():
count = None
logger.info("'rebuild_contact_index'...")
try:
count = Contact.objects.rebuild_contact_index()
except Exception as e:
logger.exception(e)
logger.info(f"'rebuild_contact_index' - {count} records)
return count
Check Celery has been removed from your project
To remove Celery from your project:
# 1. remove the following line from 'project/__init__.py'
from .celery import app as celery_app
# 2. delete 'project/celery.py'
# 3. Remove Celery from settings files...
# 4. Remove ``celery`` from requirements.