DEV Community

Kelvin Wangonya
Kelvin Wangonya

Posted on • Originally published at wangonya.com

Working with celery signals

If you're new to celery, start here.

Sometimes when using celery, you may want to get notified when a task running in the background executes successfully or when it fails. You may also want to run a function each time before the celery task runs or after it completes. These and many others along the same line are all situations where signals would come in handy.

I'll create a simple file tasks.py and set up celery to demonstrate how to use celery signals.

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0') 

@app.task
def add(x, y):
    return x + y
Enter fullscreen mode Exit fullscreen mode

Make sure your redis server is running and start your celery worker:

(env) $ celery -A tasks worker --loglevel=INFO
Enter fullscreen mode Exit fullscreen mode

Then run your tasks.py file and execute the add task:

(env) $ python -i tasks.py

>>> add.delay(4, 4)
<AsyncResult: ce1ee079-6434-4f54-ace2-360ff316546b>
>>> 
Enter fullscreen mode Exit fullscreen mode

By default, what is returned is an AsyncResult instance but that's not what we're interested in. On the terminal with your Celery worker running, you should see something similar to this:

[2020-11-03 07:01:02,024: INFO/ForkPoolWorker-2] Task tasks.add[ce1ee079-6434-4f54-ace2-360ff316546b] succeeded in 0.0005510429999979749s: 8
Enter fullscreen mode Exit fullscreen mode

The task executes successfully, and 8 is the result as expected.

Signals

There are a lot of signals that celery offers but I'll focus on 4 simple ones to demonstrate how signals work in general.

  1. task_prerun
  2. task_postrun
  3. task_success
  4. task_failure

task_prerun

This signal is dispatched before a task is executed.

from celery import Celery
from celery.signals import task_prerun

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

@task_prerun.connect(sender=add)
def task_prerun_notifier(sender=None, **kwargs):
    print("From task_prerun_notifier ==> Running just before add() executes")
Enter fullscreen mode Exit fullscreen mode

Sender is the task object being executed (the add function in this case).

Running add.delay(4, 4) like before now gives the following output on the celery terminal:

[2020-11-03 07:23:19,183: WARNING/ForkPoolWorker-2] From task_prerun_notifier ==> Running just before add() executes
[2020-11-03 07:23:19,184: INFO/ForkPoolWorker-2] Task tasks.add[1ef11c46-f461-4eb8-84ca-5c5cdab62a74] succeeded in 0.0016491969999998801s: 8
Enter fullscreen mode Exit fullscreen mode

Just before the task runs, the signal dispatches and prints as expected.

task_postrun

Dispatched after a task has been executed.

from celery.signals import task_prerun, task_postrun

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

@task_prerun.connect(sender=add)
def task_prerun_notifier(sender=None, **kwargs):
    print("From task_prerun_notifier ==> Running just before add() executes")

@task_postrun.connect(sender=add)
def task_postrun_notifier(sender=None, **kwargs):
    print("From task_postrun_notifier ==> Ok, done!")
Enter fullscreen mode Exit fullscreen mode

Running this should give the following result:

[2020-11-03 17:03:51,655: WARNING/ForkPoolWorker-2] From task_prerun_notifier ==> Running just before add() executes
[2020-11-03 17:03:51,656: INFO/ForkPoolWorker-2] Task tasks.add[7da6ee71-1941-4a87-b993-8136d94ac067] succeeded in 0.0017917519999999243s: 8
[2020-11-03 17:03:51,657: WARNING/ForkPoolWorker-2] From task_postrun_notifier ==> Ok, done!
Enter fullscreen mode Exit fullscreen mode

task_success

Dispatched when a task succeeds.

from celery import Celery
from celery.signals import task_prerun, task_postrun, task_success

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

@task_prerun.connect(sender=add)
def task_prerun_notifier(sender=None, **kwargs):
    print("From task_prerun_notifier ==> Running just before add() executes")

@task_postrun.connect(sender=add)
def task_postrun_notifier(sender=None, **kwargs):
    print("From task_postrun_notifier ==> Ok, done!")

@task_success.connect(sender=add)
def task_success_notifier(sender=None, **kwargs):
    print("From task_success_notifier ==> Task run successfully!")
Enter fullscreen mode Exit fullscreen mode

Result:

[2020-11-03 17:40:47,276: INFO/MainProcess] Received task: tasks.add[6603eb49-75ab-4653-b32f-ebe760a52de0]  
[2020-11-03 17:40:47,279: WARNING/ForkPoolWorker-2] From task_prerun_notifier ==> Running just before add() executes
[2020-11-03 17:40:47,281: WARNING/ForkPoolWorker-2] From task_success_notifier ==> Task run successfully!
[2020-11-03 17:40:47,281: INFO/ForkPoolWorker-2] Task tasks.add[6603eb49-75ab-4653-b32f-ebe760a52de0] succeeded in 0.00201471799999986s: 8
[2020-11-03 17:40:47,282: WARNING/ForkPoolWorker-2] From task_postrun_notifier ==> Ok, done!
Enter fullscreen mode Exit fullscreen mode

task_failure

Dispatched when a task fails.

from celery import Celery
from celery.signals import task_prerun, task_postrun, task_failure

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    raise Exception

@task_prerun.connect(sender=add)
def task_prerun_notifier(sender=None, **kwargs):
    print("From task_prerun_notifier ==> Running just before add() executes")

@task_postrun.connect(sender=add)
def task_postrun_notifier(sender=None, **kwargs):
    print("From task_postrun_notifier ==> Ok, done!")

@task_failure.connect(sender=add)
def task_failure_notifier(sender=None, **kwargs):
    print("From task_failure_notifier ==> Task failed successfully! 😅")
Enter fullscreen mode Exit fullscreen mode

Result:

[2020-11-03 17:44:36,082: INFO/MainProcess] Received task: tasks.add[da4a03e8-5530-4c9e-afeb-75f8e0b1be5d]  
[2020-11-03 17:44:36,085: WARNING/ForkPoolWorker-2] From task_prerun_notifier ==> Running just before add() executes
[2020-11-03 17:44:36,096: WARNING/ForkPoolWorker-2] From task_failure_notifier ==> Task failed successfully! 😅
[2020-11-03 17:44:36,096: ERROR/ForkPoolWorker-2] Task tasks.add[da4a03e8-5530-4c9e-afeb-75f8e0b1be5d] raised unexpected: Exception()
Traceback (most recent call last):
  ...
   in add
    raise Exception
Exception
[2020-11-03 17:44:36,097: WARNING/ForkPoolWorker-2] From task_postrun_notifier ==> Ok, done!
Enter fullscreen mode Exit fullscreen mode

Top comments (1)

Collapse
 
steelwolf180 profile image
Max Ong Zong Bao

Interesting cause I thought there was async.io for Python in recent versions but great that it provides a good place to learn how it was used.