Ever tried to check whether a Celery task is already running?
Here is the piece of code I created for checking whether a task is running by its name.
from celery import current_app
class CeleryHelper:
"""Contains helper functionalities to be used while interacting with Celery."""
@staticmethod
def is_being_executed(task_name: str) -> bool:
"""Returns whether the task with given task_name is already being executed.
Args:
task_name: Name of the task to check if it is running currently.
Returns: A boolean indicating whether the task with the given task name is
running currently.
"""
active_tasks = current_app.control.inspect().active()
for worker, running_tasks in active_tasks.items():
for task in running_tasks:
if task["name"] == task_name:
return True
return False
I searchedd for that a while ago and I couldn't really find a direct way to do that in a Django project but the above code is tested and works in Django-based projects.
Top comments (0)