DEV Community

Ebram Shehata
Ebram Shehata

Posted on

Checking Whether a Celery Task is Already Running

Ever tried to check whether a Celery task is already running?
Here is the piece of code I created for checking whether a task is running by its name.

from celery import current_app


class CeleryHelper:
    """Contains helper functionalities to be used while interacting with Celery."""
    @staticmethod
    def is_being_executed(task_name: str) -> bool:
        """Returns whether the task with given task_name is already being executed.

        Args:
            task_name: Name of the task to check if it is running currently.
        Returns: A boolean indicating whether the task with the given task name is
            running currently.
        """
        active_tasks = current_app.control.inspect().active()
        for worker, running_tasks in active_tasks.items():
            for task in running_tasks:
                if task["name"] == task_name:
                    return True

        return False
Enter fullscreen mode Exit fullscreen mode

I searchedd for that a while ago and I couldn't really find a direct way to do that in a Django project but the above code is tested and works in Django-based projects.

Hostinger image

Get n8n VPS hosting 3x cheaper than a cloud solution

Get fast, easy, secure n8n VPS hosting from $4.99/mo at Hostinger. Automate any workflow using a pre-installed n8n application and no-code customization.

Start now

Top comments (0)

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay