DEV Community

Cover image for Mastering Python Async IO with FastAPI
Leapcell
Leapcell

Posted on

Mastering Python Async IO with FastAPI

Image description

Since Python is an interpreted language, when used for back-end development, such as in the combination of Python + Django, compared to Java + Spring, its response time will be a bit longer. However, as long as the code is reasonable, the difference is not too significant. Even when Django uses the multi-process mode, its concurrent processing ability is still much weaker. Python has some solutions to improve concurrent processing capabilities. For example, using the asynchronous framework FastAPI, with its asynchronous capabilities, the concurrent processing ability of I/O-intensive tasks can be greatly enhanced. FastAPI is one of the fastest Python frameworks.

FastAPI as example

Let's first take a brief look at how to use FastAPI.

Example 1: Default Network Asynchronous IO

Installation:

pip install fastapi
Enter fullscreen mode Exit fullscreen mode

Simple Server-side Code:

# app.py
from typing import Union

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
async def read_root():
    return {"Hello": "World"}
Enter fullscreen mode Exit fullscreen mode

Startup:

uvicorn app:app --reload
Enter fullscreen mode Exit fullscreen mode

We can see that, compared with other frameworks, the interface of FastAPI only has an additional async keyword. The async keyword defines the interface as asynchronous. From the return result alone, we can't tell the difference between FastAPI and other Python frameworks. The difference lies in concurrent access. When the server threads of FastAPI handle route requests, such as http://127.0.0.1:8000/, if they encounter network I/O, they will no longer wait for it but handle other requests instead. When the network I/O is completed, the execution will resume. This asynchronous ability improves the processing ability of I/O-intensive tasks.

Example 2: Explicit Network Asynchronous IO

Let's look at another example. In the business code, an explicit asynchronous network request is initiated. For this network I/O, just like route requests, FastAPI will also handle it asynchronously.

# app.py
from fastapi import FastAPI, HTTPException
import httpx

app = FastAPI()

# Example of an asynchronous GET request
@app.get("/external-api")
async def call_external_api():
    url = "https://leapcell.io"
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        if response.status_code!= 200:
            raise HTTPException(status_code=response.status_code, detail="Failed to fetch data")
        return response.json()
Enter fullscreen mode Exit fullscreen mode

If you want database I/O to be asynchronous, you need the support of asynchronous operations from the database driver or ORM.

Asynchronous IO

The core implementation of FastAPI's asynchrony is asynchronous I/O. We can start a server with asynchronous processing capabilities directly using asynchronous I/O without using FastAPI.

import asyncio

from aiohttp import web

async def index(request):
    await asyncio.sleep(1)  # Simulate I/O operation
    return web.Response(text='{"Hello": "World"}', content_type='application/json')

async def init(loop):
    # Use the event loop to monitor web requests
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', index)
    # Start the server, and the event loop monitors and processes web requests
    srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
    print('Server started at http://127.0.0.1:8000...')
    return srv

# Explicitly get an event loop
loop = asyncio.get_event_loop()
# Start the event loop
loop.run_until_complete(init(loop))
loop.run_forever()
Enter fullscreen mode Exit fullscreen mode

When this example is started, the return result of http://127.0.0.1:8000/ is the same as that of Example 1. The underlying implementation principle of asynchronous I/O is "coroutines" and "event loops".

Coroutines

async def index(request):
    await asyncio.sleep(1)  # Simulate I/O operation
    return web.Response(text='{"Hello": "World"}', content_type='application/json')
Enter fullscreen mode Exit fullscreen mode

The function index is defined with async def, which means it is a coroutine. The await keyword is used before an I/O operation to tell the execution thread not to wait for this I/O operation. The calls of normal functions are implemented through the stack, and functions can only be called and executed one by one. However, a coroutine is a special kind of function (not a collaborative thread). It allows the thread to pause execution at the await mark and switch to execute other tasks. When the I/O operation is completed, the execution will continue.

Let's take a look at the effect of multiple coroutines executing concurrently.

import asyncio
from datetime import datetime

async def coroutine3():
    print(f"Coroutine 3 started at {datetime.now()}")
    await asyncio.sleep(1)  # Simulate I/O operation
    print(f"Coroutine 3 finished at {datetime.now()}")

async def coroutine2():
    print(f"Coroutine 2 started at {datetime.now()}")
    await asyncio.sleep(1)  # Simulate I/O operation
    print(f"Coroutine 2 finished at {datetime.now()}")

async def coroutine1():
    print(f"Coroutine 1 started at {datetime.now()}")
    await asyncio.sleep(1)  # Simulate I/O operation
    print(f"Coroutine 1 finished at {datetime.now()}")

async def main():
    print("Main started")

    # Create tasks to make coroutines execute concurrently
    task1 = asyncio.create_task(coroutine1())
    task2 = asyncio.create_task(coroutine2())
    task3 = asyncio.create_task(coroutine3())

    # Wait for all tasks to complete
    await task1
    await task2
    await task3

    print("Main finished")

# Run the main coroutine
asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Output:

Main started
Coroutine 1 started at 2024-12-27 12:28:01.661251
Coroutine 2 started at 2024-12-27 12:28:01.661276
Coroutine 3 started at 2024-12-27 12:28:01.665012
Coroutine 1 finished at 2024-12-27 12:28:02.665125
Coroutine 2 finished at 2024-12-27 12:28:02.665120
Coroutine 3 finished at 2024-12-27 12:28:02.665120
Main finished
Enter fullscreen mode Exit fullscreen mode

We can see that the thread does not execute the three tasks one by one. When it encounters an I/O operation, it switches to execute other tasks. After the I/O operation is completed, it continues to execute. It can also be seen that the three coroutines basically start waiting for the I/O operation at the same time, so the final execution completion times are basically the same. Although the event loop is not used explicitly here, asyncio.run will use it implicitly.

Generators

Coroutines are implemented through generators. Generators can pause the execution of functions and also resume it, which are the characteristics of coroutines.

def simple_generator():
    print("First value")
    yield 1
    print("Second value")
    yield 2
    print("Third value")
    yield 3

# simple_generator is a generator function, gen is a generator
gen = simple_generator() 

print(next(gen))  # Output: First value \n 1
print(next(gen))  # Output: Second value \n 2
print(next(gen))  # Output: Third value \n 3
Enter fullscreen mode Exit fullscreen mode

When running the generator with next(), when it encounters yield, it will pause. When next() is run again, it will continue running from the yield where it was paused last time. Before Python 3.5, coroutines were also written with "annotations" + yeild. Starting from Python 3.5, async def + await are used.

import asyncio
from datetime import datetime

@asyncio.coroutine
def my_coroutine():
    print("Start coroutine", datetime.now())
    # Asynchronous call to asyncio.sleep(1):
    yield from asyncio.sleep(1)
    print("End coroutine", datetime.now())

# Get the EventLoop
loop = asyncio.get_event_loop()
# Execute the coroutine
loop.run_until_complete(my_coroutine())
loop.close()
Enter fullscreen mode Exit fullscreen mode

The pause and resume features of generators can be used for many things besides coroutines. For example, it can calculate while looping and store algorithms. For instance, implementing a Pascal's triangle (both ends of each row are 1, and the numbers in other positions are the sum of the two numbers above it).

def pascal_triangle():
    row = [1]
    while True:
        yield row
        new_row = [1]  # The first element of each row is always 1
        for i in range(1, len(row)):
            new_row.append(row[i - 1] + row[i])
        new_row.append(1)  # The last element of each row is always 1
        row = new_row

# Generate and print the first 5 rows of Pascal's triangle
triangle = pascal_triangle()
for _ in range(5):
    print(next(triangle))
Enter fullscreen mode Exit fullscreen mode

Output:

[1]
[1, 1]
[1, 2, 1]
[1, 3, 3, 1]
[1, 4, 6, 4, 1]
Enter fullscreen mode Exit fullscreen mode

Event Loops

Since coroutine execution can be paused, when will the coroutine resume execution? This requires the use of an event loop to tell the execution thread.

# Get the EventLoop
loop = asyncio.get_event_loop()
# The event loop executes the coroutine
loop.run_until_complete(my_coroutine())
loop.close()
Enter fullscreen mode Exit fullscreen mode

The event loop uses the I/O multiplexing technology, constantly cycling to monitor events where coroutines can continue to execute. When they can be executed, the thread will continue to execute the coroutines.

I/O Multiplexing Technology

To understand I/O multiplexing in a simple way: I'm the boss of a courier station. I don't need to actively ask each courier about the completion of their tasks. Instead, the couriers will come to me on their own after completing their tasks. This improves my task processing ability, and I can do more things.

Image description

select, poll, and epoll can all achieve I/O multiplexing. Compared with select and poll, epoll has better performance. Linux generally uses epoll by default, and macOS uses kqueue, which is similar to epoll and has similar performance.

Socket Server Using Event Loops

import selectors
import socket

# Create a selectors object, equivalent to the implementation of epoll, when running on Linux
sel = selectors.DefaultSelector()

# Request reception event handling function. Accept new connections and register read events
def accept(sock, mask):
    conn, addr = sock.accept()  # Accept the connection
    print('Accepted connection from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)  # Register the read event

# Request reading event handling function. Read request data and send an HTTP response, then close the connection.
def read(conn, mask):
    data = conn.recv(100)  # Read data from the connection
    print('response to')
    response = "HTTP/1.1 200 OK\r\n" \
            "Content-Type: application/json\r\n" \
            "Content-Length: 18\r\n" \
            "Connection: close\r\n" \
            "\r\n" \
            "{\"Hello": \"World\"}"
    conn.send(response.encode())  # Echo the data
    print('Closing connection')
    sel.unregister(conn)  # Unregister the event
    conn.close()  # Close the connection

# Create a server socket
sock = socket.socket()
sock.bind(('localhost', 8000))
sock.listen()
sock.setblocking(False)

# Register the accept event
sel.register(sock, selectors.EVENT_READ, accept)

print("Server is running on port 8000...")

# Event loop
while True:
    # This will block when there are no requests
    events = sel.select()  # Select the file descriptors (events) that are ready
    print("events length: ", len(events))
    for key, mask in events:
        callback = key.data  # Get the event handling function
        print("handler_name:", callback.__name__)
        callback(key.fileobj, mask)  # Call the event handling function
Enter fullscreen mode Exit fullscreen mode

Start the server socket to monitor the specified port. If running on a Linux system, selectors uses epoll as its implementation by default. The code uses epoll to register a request reception event (accept event). When a new request arrives, epoll will trigger and execute the event handling function, and at the same time, register a read event (read event) to process and respond to the request data. When accessed from the web side with http://127.0.0.1:8000/, the return result is the same as that of Example 1. Server running log:

Server is running on port 8000...
events length:  1
handler_name: accept
Accepted connection from ('127.0.0.1', 60941)
events length:  1
handler_name: read
response to
Closing connection
Enter fullscreen mode Exit fullscreen mode

Socket Server

Directly use Socket to start a server. When accessed with a browser at http://127.0.0.1:8080/ or using curl http://127.0.0.1:8080/, it will return {"Hello": "World"}

import socket
from datetime import datetime

# Create a TCP socket
server_socket = socket.socket()

# Bind the socket to the specified IP address and port number
server_socket.bind(('127.0.0.1', 8001))

# Start listening for incoming connections
server_socket.listen(5)

# Loop to accept client connections
while True:
    print("%s Waiting for a connection..." % datetime.now())
    client_socket, addr = server_socket.accept() # This will block, waiting for client connections
    print(f"{datetime.now()} Got connection from {addr}")

    # Receive client data
    data = client_socket.recv(1024)
    print(f"Received: {data.decode()}")

    # Send response data
    response = "HTTP/1.1 200 OK\r\n" \
               "Content-Type: application/json\r\n" \
               "Content-Length: 18\r\n" \
               "Connection: close\r\n" \
               "\r\n" \
               "{\"Hello": \"World\"}"

    client_socket.sendall(response.encode())

    # Close the client socket
    client_socket.close()
Enter fullscreen mode Exit fullscreen mode

When accessed with curl http://127.0.0.1:8001/, Server running log:

2024-12-27 12:53:36.711732 Waiting for a connection...
2024-12-27 12:54:30.715928 Got connection from ('127.0.0.1', 64361)
Received: GET / HTTP/1.1
Host: 127.0.0.1:8001
User-Agent: curl/8.4.0
Accept: */*
Enter fullscreen mode Exit fullscreen mode

Summary

Asynchronous I/O is implemented at the bottom layer using "coroutines" and "event loops". "Coroutines" ensure that when the thread encounters marked I/O operations during execution, it doesn't have to wait for the I/O to complete but can pause and let the thread execute other tasks without blocking. "Event loops" use the I/O multiplexing technology, constantly cycling to monitor I/O events. When a certain I/O event is completed, the corresponding callback is triggered, allowing the coroutine to continue execution.


Leapcell: The Ideal Platform for FastAPI and Other Python Applications:

Finally, let me introduce the ideal platform for deploying Flask/FastAPI: Leapcell.

Leapcell is a cloud computing platform designed specifically for modern distributed applications. Its pay-as-you-go pricing model ensures no idle costs, meaning users only pay for the resources they actually use.

Image description

The unique advantages of Leapcell for WSGI/ASGI applications:

1. Multi-Language Support

  • Supports development in JavaScript, Python, Go, or Rust.

2. Free Deployment of Unlimited Projects

  • Only charge based on usage. No charge when there are no requests.

3. Unmatched Cost-Effectiveness

  • Pay-as-you-go, with no idle fees.
  • For example, $25 can support 6.94 million requests, with an average response time of 60 milliseconds.

4. Simplified Developer Experience

  • Intuitive user interface for easy setup.
  • Fully automated CI/CD pipelines and GitOps integration.
  • Real-time metrics and logs, providing actionable insights.

5. Effortless Scalability and High Performance

  • Automatic scaling to handle high concurrency with ease.
  • Zero operation overhead, allowing developers to focus on development.

Learn more in the documentation!

Leapcell Twitter: https://x.com/LeapcellHQ

Top comments (0)