Celery is a powerful task queue with many great features already built-in. Unfortunately, debouncing tasks is not one of them (or I didn't find it in the docs), but fortunately, it's not that hard to build it yourself.
What is debouncing
What is debouncing, you may ask? Let's start with a simple example that, while not directly applicable to the Python world, should make it easier to understand the concept. Say you have an input element on a webpage. In order to implement auto-completion, you have to query a server-side API with the user's input. If you would do that on every keystroke, you would probably fry your server. You probably only want to send it e.g. 500ms after the user stopped typing.
To do this, you could start a timeout after a keystroke that, when expired, would query the server API. Instead of starting a separate timeout after every keystroke, you are re-starting the same timeout over and over again.
This way, the timeout will expire and query the server 500ms after the user stopped typing.
Why I needed it
Another real-world application is the so-called Downlinks functionality on Datacake. Datacake is an IoT-platform that not only allows you to receive data from devices ("Uplink") but also send data back to devices ("Downlink"). Downlinks on Datacake can be triggered by different events, which could lead to the same Downlink being sent in a short period of time. To avoid this, I implemented debouncing (this time for a Celery task, finally), in order to only send the last Downlink for a given period of time.
Ingredients used
Asides from Celery itself I've used Redis to store a temporary lock. Since we used Redis already as our broker for Celery, it was available anyway.
Just show me the code
Say you have a task like this that you want to debounce:
@celery_app.task
def send_downlink(device, downlink):
# Do some magic
First, I renamed the real task to a "private, do not use directly" one:
@celery_app.task
def _send_downlink(device, downlink):
# Do some magic
I then created a new task with the official name, which increments a Redis counter named after a unique identifier for the task including its arguments (what you want to debounce) every time it is called and finally schedules the internal task with a timeout (your debounce time):
@celery_app.task
def send_downlink(device, downlink):
redis_con.incr(f"debounce-downlink-{device}-{downlink}")
_send_downlink.apply_async([device, downlink], countdown=1)
Finally, in our internal task, I am decrementing the counter and check, if the new value is <= 0, in which case this is the "last call" during the debounce period:
@celery_app.task
def send_downlink(device, downlink):
if redis_con.decr(f"debounce-downlink-{device}-{downlink}") > 0:
logger.debug(f"Debounce hit for {downlink} on {device}")
return
# Do some magic
The result is a Celery-task that you can call as many times as you want but only gets executed once after a call-break of your set timeout.
Top comments (0)