The problem
Suppose you have the following task:
import time
from django_project.celery import app # set up according to https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html#using-celery-with-django
@app.task
def my_task():
print("I'm running!")
time.sleep(10)
print("I've finished!")
Suppose further my_task
runs once in several days using django-celery-beat in a single worker process.
The question is: how can my_task
get the last time it was run?
Take number one: Bind and Get
The first thing that comes to mind is, find where django-celery-beat
puts the last running time of a PariodicTask
and take that value.
So first you need to bind the task in order to get its name, then get the appropriate PeriodicTask
and take its last_run_at
attribute, like so:
@app.task(bind=True)
def my_task(self):
last_run = PeriodicTask.objects.get(
task=self.name # notice PeriodicTask.name and self.name are different things
).last_run_at
print(f"I'm running! Last time I ran at {last_run_at}")
time.sleep(10)
print("I've finished!")
The problem: PeriodicTask
s will accumulate each time you restart celery process. Soon you'll be getting
django_celery_beat.models.PeriodicTask.MultipleObjectsReturned: get() returned more than one PeriodicTask -- it returned 16!
Take number two: Bind and Filter
In order to avoid MultipleObjectsReturned
exception, we can filter out all irrelevant periodic tasks:
@app.task(bind=True)
def my_task(self):
last_run = PeriodicTask.objects.filter(
task=self.name
).order_by('last_run_at').last().last_run_at
print(f"I'm running! Last time I ran at {last_run_at}")
time.sleep(10)
print("I've finished!")
The first problem: If we run my_task
more often than once in five minutes, last_run_at
is going to get duplicated:
[2020-02-13 20:15:41,884: WARNING/ForkPoolWorker-2] I'm running! Last time I ran at 2020-02-13 17:13:00.616557+00:00
[2020-02-13 20:15:51,886: WARNING/ForkPoolWorker-2] I've finished!
[2020-02-13 20:15:51,956: INFO/ForkPoolWorker-2] Task integration.tasks.test_task[1e413b12-455c-4740-8438-d6d1f8b18a35] succeeded in 10.409016128978692s: None
[2020-02-13 20:16:01,787: WARNING/ForkPoolWorker-2] I'm running! Last time I ran at 2020-02-13 17:13:00.616557+00:00
[2020-02-13 20:16:11,788: WARNING/ForkPoolWorker-2] I've finished!
[2020-02-13 20:16:11,825: INFO/ForkPoolWorker-2] Task integration.tasks.test_task[80a4467f-d0b2-4839-a0a4-1270bf02c2e8] succeeded in 10.18468949093949s: None
[2020-02-13 20:16:21,734: INFO/MainProcess] Received task: integration.tasks.test_task[5d4c126e-da9d-4986-8d37-d2d9a072bf60]
[2020-02-13 20:16:21,906: WARNING/ForkPoolWorker-2] I'm running! Last time I ran at 2020-02-13 17:16:01.635268+00:00
[2020-02-13 20:16:31,907: WARNING/ForkPoolWorker-2] I've finished!
[2020-02-13 20:16:31,939: INFO/ForkPoolWorker-2] Task integration.tasks.test_task[5d4c126e-da9d-4986-8d37-d2d9a072bf60] succeeded in 10.203773486078717s: None
Fortunately, the problem does not persist if the task is run once in several days. So it would be a valid solution, as long as you don't need to change the schedule of the periodic task.
The second problem: Suppose you do need to change the schedule of the periodic task. You actually cannot do that, or at least I haven't found a way to do that with code. So after PeriodicTask gets reinstantiated with the new schedule, last_run_at
becomes None.
The third problem: last_run_at
also becomes None
if you want to change the time zone.
So what I'm saying is, last_run_at
is not a reliable solution for a long-running task.
Take number three: Store it somewhere else
By this time I knew what I wanted: a reliable way to store the last running time of a task that gets executed rarely in a single process. So I've decided to just store it in the database:
# models.py
class PeriodicTaskRun(models.Model):
task = models.CharField(max_length=200, verbose_name='Task Name')
created_at = models.DateTime(auto_now_add=True)
# tasks.py
@app.task(bind=True)
def my_task(self):
last_run = PeriodicTaskRun.objects.filter(task=self.name).latest()
PeriodicTaskRun.objects.create(task=self.name)
print(f"I'm running! Last time I ran at {last_run.created_at}")
time.sleep(10)
print("I've finished!")
If you know any caveats to this solution I should know about, or if you have a better one, please let me know in the comments!
Top comments (5)
My application has 2 periodic tasks which run for a couple of seconds, and run once per day.
Do you recommend using this method ? It's the most natural thing that comes to mind
Yeah! Especially if you don't really need to scale this solution to thousands different tasks that launch millions of times.
Since writing this article I've moved on to a different project with a big load of users. And you know what? It uses the same solution! Except on this project we store
last_run
in Redis, not Postgres, since we don't need to persist it forever.sorry for the late reply and thank you so much for your reply. I guess I will adopt this strategy for my task. Right now it is just the one!
happy new year ^^
Curious what is the scale of your tasks? Django celery beat was not necessarily built to handle load, though if you have separated this database to only be used for beat, that may help.
My tasks are network-intensive, and are supposed to run for a month or so.
A separate database is a great idea for this kind of tasks, thanks!