I came into a network I/O bound optimization problem and manage to solve it using Multi-threading solution here. In the middle of research, I came into Asyncio — Asynchronous I/O library in Python, which brings into the question it may be a better solution.
What is Asyncio
There are a lot of articles out there explaining what is Asyncio. The core concept to bring away is that Asyncio provides us an event loop. The event loop tracks different I/O events and switches to tasks that are ready and pauses the ones which are waiting on I/O. Thus we don’t waste time on tasks that are not ready to run right now.
Sounds like a thread? What are their similarities and differences?
They're both limited by the Global Interpreter Lock and are both single process, multi-threaded. They are both forms of concurrency but not parallelism - Concurrency makes progress together and parallelism makes progress in parallel. Parallelism is in fact a form of concurrency but concurrency doesn't imply parallelism.
Threading, via concurrent.futures
It employs time-slicing of CPU. All threads are given a slot of CPU time to do work. If the thread is blocking (sleeping or blocked on sockets), then off it goes to the next thread. The issue of threading is when many threads that are blocked for long periods, it begins to degrade into polling (polling vs. interrupt)
Asyncio
Asyncio uses an event loop and you can imagine it as a pub-sub - a push notification model. Threads will announce they're blocked by using Asyncio methods. The next available thread at the top of the queue is then processed on until it completes or is blocked again. This has reduced concurrency and can allow one thread to starve out the others. If the access pattern is threads that are blocked for a long time, this model will ensure that you don't bother checking a thread, you wait for it to announce it's available.
What about Multi-processing?
Multi-processing is indeed useful when it comes to CPU-bounded kind of optimization problem, as it is the only real way to achieve true parallelism without the restriction of Global Interpreter Lock. For our problem which is I/O bound operations, GIL usually doesn’t harm much.
Benchmark
I have modified my code from my previous blog into Asyncio implementation. Both implementations are shown below.
from urllib3 import HTTPConnectionPool
import concurrent.futures
import Retry
import json
url = "www.example.com/employees/"
# Assume you have obtained the total number of pages
total_page = N
result = []
number_of_workers = W
retry = Retry(
total=3,
read=3,
connect=3,
backoff_factor=0.1,
status_forcelist=(500, 502, 504))
pool = HTTPSConnectionPool(url, maxsize=1, headers={'API-Auth-Secret-Key':API_SECRET_KEY}, retries=retry)
def get_data_from_api(pool, page_number):
return pool.request('GET',
url='/v1/employee',
fields={'page': page_number}
)
with concurrent.futures.ThreadPoolExecutor(max_workers=worker) as executor:
future_result = {executor.submit(get_data_from_api, pool, i): i for i in range(2, total_page+1)}
for future in concurrent.futures.as_completed(future_result):
result += json.loads(future.result().data.decode('utf-8'))
from aiohttp.connector import TCPConnector
from aiohttp_retry import RetryClient
import asyncio
import json
url = "www.example.com/employees/v1/employee"
# Assume you have obtained the total number of pages
total_page = N
result = []
number_of_workers = W
conn = TCPConnector(limit=number_of_workers)
async def execute(url, session, page_number):
async with session.get(url+'?page='+str(page_number), headers={'API-Auth-Secret-Key':API_SECRET_KEY}, retry_attempts=3, retry_factor=0.1, retry_for_statuses=(500, 502, 504)) as response:
return await response.read()
async def bound_fetch(sem, url, session, page_number):
# Getter function with semaphore.
async with sem:
return await execute(url, session, page_number)
async def run():
url = 'https://cms.lifestyle.rea-asia.com/wp-json/rea-content/v1/contents'
tasks = []
# Limits to 50 async tasks to compare the performance of 50 threads
sem = asyncio.Semaphore(50)
global result
# Fetch all responses within 50 sessions,
# keep connection alive for all requests.
async with RetryClient(connector=conn) as session:
for i in range (total_page+1):
task = asyncio.ensure_future(bound_fetch(sem, url.format(i), session, i))
tasks.append(task)
responses = await asyncio.gather(*tasks)
# you now have all response bodies in this variable
for res in responses:
result += json.loads(res.decode("utf-8"))
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run())
loop.run_until_complete(future)
Result
The time taken for 6000 pages(calls) of API took around 6 minutes for both approaches. The improvement is not significant. Asyncio is helpful when the thread is blocked for a long time however I believe that the server has a prompt response therefore multi-threading is fine. As a rule of thumb, always benchmark the performance when it comes to an optimization problem.
Conclusion
Should I use Asyncio then? The purpose of many applications is to act on network packets entering an interface, timeouts expiring, mouse clicks, or other types of events. Such applications are often very well suited to use an event loop.
Applications that need to churn massively parallel algorithms are more suitable for running multiple (independent) threads on several CPU cores. However, threaded applications must deal with the side effects of concurrency, like race conditions, deadlocks, live locks, etc. Writing error-free threaded applications is hard, debugging them can be even harder.
We can follow this pseudo code to make an intuitive approach:
#IO Bound
if io_bound:
if io_very_slow:
print("Use Asyncio")
else:
print("Use Multi-threading")
# CPU bound
else:
print("Multi-processing")
Top comments (0)