This article can also be found on my blog maked.io
Concurrency and parallelism can sound really similar but in programming there is an important difference.
Immagine you are writing a book while cooking, even if it seems like you are doing both tasks at the same time, what you are doing is switching between the two tasks, while you wait for the water to boil you are writing your book, but while you are chopping some vegetables you pause your writing. This is called concurrency. The only way to do these two tasks in parallel is having two people, one writing and one cooking, which is what multicore CPU do.
Async programming allows you to write concurrent code that runs in a single thread. The first advantage compared to multiple threads is that you decide where the scheduler will switch from one task to another, which means that sharing data between tasks it's safer and easier.
def queue_push_back(x): if len(list) < max_size: list.append(x)
If we run the code above in a multithread program it's possible that two threads execute line 2 at the same time so 2 items will be added to the queue at the same time and potentially making the queue size bigger than
Another advantage of asycn programming is memory usage. Every time a new thread is created some memory is used to allow context switching, if we use async programming this is not a problem since the code runs in a single thread.
Asyncio has 3 main components: coroutines, event loop, and future
A coroutine is the result of an asynchronous function which can be declared using the keyword
async def my_task(args): pass my_coroutine = my_task(args)
When we declare a function using the
async keyword the function is not run, instead, a coroutine object is returned.
There are two ways to read the output of an async function from a coroutine.
The first way is to use the
await keyword, this is possible only inside async functions and will wait for the coroutine to terminate and return the result
result = await my_task(args)
The second way is to add it to an event loop as we will see in the next sections.
The event loop is the object which execute our asyncronous code and decide how to switch between async functions. After creating an event loop we can add multiple coroutines to it, this corutines will all be running concurrently when
run_forever is called.
# create loop loop = asyncio.new_event_loop() # add coroutine to the loop future = loop.create_task(my_coroutine) # stop the program and execute all coroutine added # to the loop concurrently loop.run_until_complete(future) loop.close()
A future is an object that works as a placeholder for the output of an asynchronous function and it gives us information about the function state.
A future is created when we add a corutine to an event loop. There are two way to this:
future1 = loop.create_task(my_coroutine) # or future2 = asyncio.ensure_future(my_coroutine)
The first method adds a coroutine to the loop and returns a
task which is a subtype of future. The second method is very similar, it takes a coroutine and it adds it to the default loop, the only difference is that it can also accept a future, in which case it will not do anything and return the future unchanged.
import asyncio async def my_task(args): pass def main(): loop = asyncio.new_event_loop() coroutine1 = my_task() coroutine2 = my_task() task1 = loop.create_task(coroutine1) task2 = loop.create_task(coroutine2) loop.run_until_complete(asycnio.wait([task1, task2])) print('task1 result:', task1.result()) print('task2 result:', task2.result()) loop.close()
As you can see to run an asynchronous function we first need to create a coroutine, then we add it to the event loop which create a future/task. Up to this point none of the code inside our async function has been executed, only when we call
loop.run_until_completed the event loop start executing all the coroutines that have been added to the loop with
loop.run_until_completed will block your program until the future you gave as argument is completed. In the example we used
asyncio.wait() to create a future which will be complete only when all the futures passed in the argument list are completed.
One thing to keep in mind while writing asynchronous functions in python is that just because you used
def it doesn't mean that your function will be run concurrently. If you take a normal function and add
async in front of it the event loop will run your function without interruption because you didn't specify where the loop is allowed to interrupt your function to run another coroutine. Specify where the event loop is allowed to change coroutine is really simple, every time you use the keyword await the event loop can stop running your function and run another coroutine registered to the loop.
async def print_numbers_async1(n, prefix): for i in range(n): print(prefix, i) async def print_numbers_async2(n, prefix): for i in range(n): print(prefix, i) if i % 5 == 0: await asyncio.sleep(0) loop1 = asyncio.new_event_loop() count1_1 = loop1.create_task(print_numbers_async1(10, 'c1_1') count2_1 = loop1.create_task(print_numbers_async1(10, 'c2_1') loop1.run_until_complete(asyncio.wait([count1_1, count2_1]) loop1.close() loop2 = asyncio.new_event_loop() count1_2 = loop1.create_task(print_numbers_async1(10, 'c1_2') count2_2 = loop1.create_task(print_numbers_async1(10, 'c2_2') loop2.run_until_complete(asyncio.wait([count1_2, count2_2]) loop2.close()
If we execute this code we will see that loop1 will print first print all numbers with prefix
c1_1 and then with the prefix
c2_1 while in the second loop every 5 numbers the loop will change task.
Now that we know the basics of asynchronous programming in python let's write some more realistic code which will download a list of pages from the internet and print a preview containing the first 3 lines of the page.
import aiohttp import asyncio async def print_preview(url): # connect to the server async with aiohttp.ClientSession() as session: # create get request async with session.get(url) as response: # wait for response response = await response.text() # print first 3 not empty lines count = 0 lines = list(filter(lambda x: len(x) > 0, response.split('\n'))) print('-'*80) for line in lines[:3]: print(line) print() def print_all_pages(): pages = [ 'http://textfiles.com/adventure/amforever.txt', 'http://textfiles.com/adventure/ballyhoo.txt', 'http://textfiles.com/adventure/bardstale.txt', ] tasks =  loop = asyncio.new_event_loop() for page in pages: tasks.append(loop.create_task(print_preview(page))) loop.run_until_complete(asyncio.wait(tasks)) loop.close()
This code should be pretty easy to understand, we start by creating an asynchronous function which downloads an URL and prints the first 3 not empty lines. Then we create a function which for each page in a list of pages call
print_preview, add the coroutine the to loop and store the future inside a list of tasks. Finally, we run the event loop which will run the coroutine we added to it and it will print the preview of all the pages.
The last feature I want to talk about is asynchronous generator. Implementing an asynchronous generator is quite simple.
import asyncio import math import random async def is_prime(n): if n < 2: return True for i in range(2, n): # allow event_loop to run other coroutine await asyncio.sleep(0) if n % i == 0: return False return True async def prime_generator(n_prime): counter = 0 n = 0 while counter < n_prime: n += 1 # wait for is_prime to finish prime = await is_prime(n) if prime: yield n counter += 1 async def check_email(limit): for i in range(limit): if random.random() > 0.8: print('1 new email') else: print('0 new email') await asyncio.sleep(2) async def print_prime(n): async for prime in prime_generator(n): print('new prime number found:', prime) def main(): loop = asyncio.new_event_loop() prime = loop.create_task(print_prime(3000)) email = loop.create_task(check_email(10)) loop.run_until_complete(asyncio.wait([prime, email])) loop.close()
When an unhandled exception is raised inside a coroutine it doesn't break our program as in normal synchronous programming, instead, it's stored inside the future and if you don't handle the exception before the program exit you will get the following error
Task exception was never retrieved
There are two ways to fix this, catch the exception when you access the future result or calling the future exception method.
try: # this will raise the exception raised during the coroutine execution my_promise.result() catch Exception: pass # this will return the exception raised during the coroutine execution my_promise.exception()
If you have read everything up to this point you should know how to use asyncio to write concurrent code, but if you wish to go deeper and understand how asyncio works I suggest you watch the following video
If you would like to see more complex uses of asyncio or if you have any question leave a comment and I will replay to you as soon as possible