DEV Community

loading...

Debouncing a Celery task

lukasklein profile image Lukas Klein ・2 min read

Celery is a powerful task queue with many great features already built-in. Unfortunately, debouncing tasks is not one of them (or I didn't find it in the docs), but fortunately, it's not that hard to build it yourself.

What is debouncing

What is debouncing, you may ask? Let's start with a simple example that, while not directly applicable to the Python world, should make it easier to understand the concept. Say you have an input element on a webpage. In order to implement auto-completion, you have to query a server-side API with the user's input. If you would do that on every keystroke, you would probably fry your server. You probably only want to send it e.g. 500ms after the user stopped typing.

To do this, you could start a timeout after a keystroke that, when expired, would query the server API. Instead of starting a separate timeout after every keystroke, you are re-starting the same timeout over and over again.
This way, the timeout will expire and query the server 500ms after the user stopped typing.

Why I needed it

Another real-world application is the so-called Downlinks functionality on Datacake. Datacake is an IoT-platform that not only allows you to receive data from devices ("Uplink") but also send data back to devices ("Downlink"). Downlinks on Datacake can be triggered by different events, which could lead to the same Downlink being sent in a short period of time. To avoid this, I implemented debouncing (this time for a Celery task, finally), in order to only send the last Downlink for a given period of time.

Ingredients used

Asides from Celery itself I've used Redis to store a temporary lock. Since we used Redis already as our broker for Celery, it was available anyway.

Just show me the code

Say you have a task like this that you want to debounce:

@celery_app.task
def send_downlink(device, downlink):
    # Do some magic
Enter fullscreen mode Exit fullscreen mode

First, I renamed the real task to a "private, do not use directly" one:

@celery_app.task
def _send_downlink(device, downlink):
    # Do some magic
Enter fullscreen mode Exit fullscreen mode

I then created a new task with the official name, which increments a Redis counter named after a unique identifier for the task including its arguments (what you want to debounce) every time it is called and finally schedules the internal task with a timeout (your debounce time):

@celery_app.task
def send_downlink(device, downlink):
    redis_con.incr(f"debounce-downlink-{device}-{downlink}")
    _send_downlink.apply_async([device, downlink], countdown=1)
Enter fullscreen mode Exit fullscreen mode

Finally, in our internal task, I am decrementing the counter and check, if the new value is <= 0, in which case this is the "last call" during the debounce period:

@celery_app.task
def send_downlink(device, downlink):
    if redis_con.decr(f"debounce-downlink-{device}-{downlink}") > 0:
        logger.debug(f"Debounce hit for {downlink} on {device}")
        return
    # Do some magic
Enter fullscreen mode Exit fullscreen mode

The result is a Celery-task that you can call as many times as you want but only gets executed once after a call-break of your set timeout.

Discussion (0)

pic
Editor guide