DEV Community

Serpent7776
Serpent7776

Posted on • Edited on

1

Launching python's async functions in deterministic way

Recently I had a problem, where I had to run multiple tasks in python. The issue was each task consisted of smaller operations, one of which was waiting for external service.

Let's say we have three tasks:

   task_A:           task_B:           task_C:
+------------+    +------------+    +------------+
|            |    |            |    |            |
|    A_1     |    |    B_1     |    |    C_1     |
|            |    |            |    |            |
+------------+    +------------+    +------------+
|            |    |            |    |            |
|   A_wait   |    |   B_wait   |    |   C_wait   |
|            |    |            |    |            |
+------------+    +------------+    +------------+
|            |    |            |    |            |
|    A_2     |    |    B_2     |    |    C_2     |
|            |    |            |    |            |
+------------+    +------------+    +------------+
Enter fullscreen mode Exit fullscreen mode

Tasks A_1, B_1, C_1, A_2, B_2, C_2 are what we actually want to execute, but we also need to wait for some period of time between the tasks - that's represented as tasks A_wait, B_wait, C_wait.

If we were to execute these in order we would execute

+-------+  +-----------------------+  +-------+
|       |  |                       |  |       |
|  A_1  |  |         A_wait        |  |  A_2  |
|       |  |                       |  |       |
+-------+  +-----------------------+  +-------+

                                                 +-------+  +-----------------------+  +-------+
                                                 |       |  |                       |  |       |
                                                 |  B_1  |  |         B_wait        |  |  B_2  |
                                                 |       |  |                       |  |       |
                                                 +-------+  +-----------------------+  +-------+

                                                                                                  +-------+  +-----------------------+  +-------+
                                                                                                  |       |  |                       |  |       |
                                                                                                  |  C_1  |  |         C_wait        |  |  C_2  |
                                                                                                  |       |  |                       |  |       |
                                                                                                  +-------+  +-----------------------+  +-------+
Enter fullscreen mode Exit fullscreen mode

Here's the gist with source:

import asyncio
from time import sleep
def do_stuff_a_1():
print('a-1')
def do_stuff_a_2():
print('a-2')
def do_stuff_b_1():
print('b-1')
def do_stuff_b_2():
print('b-2')
def do_stuff_c_1():
print('c-1')
def do_stuff_c_2():
print('c-2')
def a_wait():
sleep(1)
def b_wait():
sleep(1)
def c_wait():
sleep(1)
def task_a():
do_stuff_a_1()
a_wait()
do_stuff_a_2()
def task_b():
do_stuff_b_1()
b_wait()
do_stuff_b_2()
def task_c():
do_stuff_c_1()
c_wait()
do_stuff_c_2()
def run_tasks():
task_a()
task_b()
task_c()
if __name__ == "__main__":
run_tasks()
view raw demo0.py hosted with ❤ by GitHub

So that means if, e.g. each wait lasts for one minute, the total time of waiting will be three minutes. And if we add another task, this time will then be four minutes.

What we could do instead is we could start executing task_B while we're waiting for A_wait to complete, and when we hit B_wait, we can start task_C. If we do that we could reduce the time of waiting to 1 minute in total, even after adding another task.

This is exactly what we can do using python's asyncio module. It allows us to write asynchronous code.

The order of execution that I wanted to achieve was:

+-------+  +-----------------------+  +-------+
|       |  |                       |  |       |
|  A_1  |  |         A_wait        |  |  A_2  |
|       |  |                       |  |       |
+-------+  +-----------------------+  +-------+

           +-------+  +-----------------------+  +-------+
           |       |  |                       |  |       |
           |  B_1  |  |         B_wait        |  |  B_2  |
           |       |  |                       |  |       |
           +-------+  +-----------------------+  +-------+

                      +-------+  +-----------------------+  +-------+
                      |       |  |                       |  |       |
                      |  C_1  |  |         C_wait        |  |  C_2  |
                      |       |  |                       |  |       |
                      +-------+  +-----------------------+  +-------+
Enter fullscreen mode Exit fullscreen mode

here's my first attempt:

import asyncio
def do_stuff_a_1():
print('a-1')
def do_stuff_a_2():
print('a-2')
def do_stuff_b_1():
print('b-1')
def do_stuff_b_2():
print('b-2')
def do_stuff_c_1():
print('c-1')
def do_stuff_c_2():
print('c-2')
async def a_wait():
await asyncio.sleep(1)
async def b_wait():
await asyncio.sleep(1)
async def c_wait():
await asyncio.sleep(1)
async def task_a():
do_stuff_a_1()
await a_wait()
do_stuff_a_2()
async def task_b():
do_stuff_b_1()
await b_wait()
do_stuff_b_2()
async def task_c():
do_stuff_c_1()
await c_wait()
do_stuff_c_2()
def run_tasks():
loop = asyncio.get_event_loop()
tasks = [task_a(), task_b(), task_c()]
future = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(future)
if __name__ == "__main__":
run_tasks()
view raw demo1.py hosted with ❤ by GitHub

Basically, we need to define our tasks as async and perform await on some other async call. In our case it will be wait call lasting for one minute. This will cause current async task to be suspended and next one to be launched.

The issue with this code is that the order in which tasks are executed is not defined. So, the first task to execute could be task_C, we just can't know.
Fortunately there's a way to specify the order in which we want our tasks to be executed. All we need to do is to create Task objects in the order in which we want them to be scheduled for execution.

import asyncio
def do_stuff_a_1():
print('a-1')
def do_stuff_a_2():
print('a-2')
def do_stuff_b_1():
print('b-1')
def do_stuff_b_2():
print('b-2')
def do_stuff_c_1():
print('c-1')
def do_stuff_c_2():
print('c-2')
async def a_wait():
await asyncio.sleep(1)
async def b_wait():
await asyncio.sleep(1)
async def c_wait():
await asyncio.sleep(1)
async def task_a():
do_stuff_a_1()
await a_wait()
do_stuff_a_2()
async def task_b():
do_stuff_b_1()
await b_wait()
do_stuff_b_2()
async def task_c():
do_stuff_c_1()
await c_wait()
do_stuff_c_2()
def run_tasks():
loop = asyncio.get_event_loop()
tasks = []
tasks.append(loop.create_task(task_a()))
tasks.append(loop.create_task(task_b()))
tasks.append(loop.create_task(task_c()))
future = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(future)
if __name__ == "__main__":
run_tasks()
view raw demo2.py hosted with ❤ by GitHub

With that I have asynchronous execution of tasks with predictable task scheduling.

Heroku

This site is built on Heroku

Join the ranks of developers at Salesforce, Airbase, DEV, and more who deploy their mission critical applications on Heroku. Sign up today and launch your first app!

Get Started

Top comments (0)