I have developed a Python client/library
for aria2
called aria2p
.
To test my code, I spawn aria2c
processes
to interact with them and see if everything works correctly.
I know I could mock the calls, but I feel like actually
testing the interaction between my client and aria2c
is a better way to ensure my code does its best.
Until recently the test suite fed actual downloads to aria2c
(like Linux distributions ISOs, metalinks, magnets, or torrents),
but of course the inevitable happened: some URLs broke.
So I decided to go full local.
This post describes my attempt at spawning local HTTP servers
to serve fake/virtual files efficiently,
files that are then fed to aria2c
.
The basics: http.server
As you may already know, Python's standard library provides
a ready-to-go HTTP server that can serve files from a folder.
It lives in the http.server
module, and you can run it in
a single command:
python -m http.server
This will serve the contents of your current working directory
on http://0.0.0.0:8000
.
I was already using it in some tests when I needed to feed
very small files to aria2c
. I was serving aria2p
's code folder
to use the LICENSE file: 759 bytes.
So, the simplest solution that immediately came to mind was:
dump text into files with predefined sizes,
like 1B, 100B, 1KiB, 100KiB, 1MiB, 10MiB, 100MiB, etc.
And do this each time you run the test suite,
in a temporary directory that you will serve using
Python's built-in HTTP server.
But that sounds a bit wasteful, right? Wasting disk cycles on
expensive SSDs is not really attractive.
So it led me to the next solution.
Serving files from memory
Instead of creating files on the disk,
why not just create them in memory?
So I looked at the sources in http.server
,
particularly how SimpleHTTPRequestHandler
sends the files contents to the client,
and eventually ended up with this code:
# http_server.py
import shutil
import socketserver
import sys
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler
from io import BytesIO
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
size = self.path.lstrip("/")
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", "application/octet-stream")
self.send_header("Content-Length", size)
self.end_headers()
shutil.copyfileobj(BytesIO(b"1" * int(size)), self.wfile)
def run_http_server(port):
with socketserver.TCPServer(("", port), RequestHandler) as httpd:
try:
httpd.serve_forever()
finally:
httpd.server_close()
if __name__ == "__main__":
print("Running HTTP server on port 8000")
run_http_server(8000)
This snippet can be run directly with python http_server.py
.
The run_http_server
can also be imported somewhere else.
This server accepts GET requests on the path /{size}
,
which allows us to choose the size of the file we want to download.
For example: GET http://localhost:8000/1024
for 1Kib.
The files are then only composed of ones.
There is a problem however:
this server will not support a heavy load
(lots of tests running in parallel),
because it can handle only one request at a time.
Multi-threading
Thankfully, socketserver
also provides a ThreadingTCPServer
we can use. It's as simple as:
def run_http_server(port=PORT):
with socketserver.ThreadingTCPServer(("", port), RequestHandler) as httpd:
try:
httpd.serve_forever()
finally:
httpd.server_close()
Reusable address
Even with the finally
clause,
I noticed that the address kept being "in use"
for up to a minute sometimes, after the server was killed.
This can be alleviated with the allow_reuse_address
attribute
of the socket server:
class Server(socketserver.ThreadingTCPServer):
allow_reuse_address = True
def run_http_server(port=PORT):
with Server(("", port), RequestHandler) as httpd:
try:
httpd.serve_forever()
finally:
httpd.server_close()
Still not enough
But even with all this, many tests were still failing,
and the standard error captured by pytest
showed
a lot of BrokenPipe
errors coming from the server.
I think it's because this server is way too simple,
and handles only GET requests. Maybe aria2c
is using
HEAD as well? What about its "pausing" feature,
allowing to pause downloads and resume them?
Will the server handle it correctly?
So, instead of digging more into this,
I decided to try running a proper server.
With FastAPI
I've been using FastAPI recently,
so I gave it a try. A very small amount of code was needed:
# http_server.py
from io import BytesIO
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/{size}")
async def get(size):
return StreamingResponse(BytesIO(b"1" * size), media_type="application/octet-stream")
I went with a StreamingResponse
because the FileResponse
only accepts paths to actual files.
To run the server, I run this command: uvicorn http_server:app
.
And it seems to work great!
All the tests that were previously failing now pass.
Trying to be clever
I'm still not really satisfied with this solution.
It feels wasteful to allocate memory for each handled request.
I tried storing the BytesIO
object into a dictionary,
with sizes as keys, but I guess their buffer get consumed
when sending the response, so this doesn't work
on subsequent requests using the same size.
Instead of storing the BytesIO
objects,
we can store the strings themselves!
from io import BytesIO
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
allocated = {}
@app.get("/{size}")
async def get(size: int):
if size not in allocated:
allocated[size] = b"1" * size
return StreamingResponse(BytesIO(allocated[size]), media_type="application/octet-stream")
But even then, why allocating memory at all?
Couldn't we have a really "virtual" file,
some kind of generator that generates chunks of data,
sent as a streaming response, and that stops at the desired size?
Well the StreamingResponse
accepts async generators,
so here's what I tried:
async def virtual_file(size, chunks=4096):
while size > 0:
yield b"1" * min(size, chunks)
size -= chunks
@app.get("/{size}")
async def get(size: int):
return StreamingResponse(virtual_file(size), media_type="application/octet-stream")
It seems to work, and the memory footprint is much, much lower,
but I get a lot of socket.send() raised exception.
messages from FastAPI,
and a test occasionally fails. Anyway, I think it's good enough.
Additional niceties
To make it easier to choose a medium-to-big file size,
we can accept a unit suffix:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
def translate_size(size):
try:
return int(size)
except ValueError:
pass
size = size.lower()
if size.endswith("k"):
multiplier = 2 ** 10
elif size.endswith("m"):
multiplier = 2 ** 20
elif size.endswith("g"):
multiplier = 2 ** 30
else:
raise ValueError("size unit not supported:", size)
return int(size.rstrip("kmg")) * multiplier
async def virtual_file(size, chunks=4096):
while size > 0:
yield b"1" * min(size, chunks)
size -= chunks
@app.get("/{size}")
async def get(size: str):
return StreamingResponse(
virtual_file(translate_size(size)),
media_type="application/octet-stream",
)
Note that the route size
argument is not an integer anymore but a string.
It allows us to send GET requests like /4k
or /100m
,
values that are then translated to bytes.
Pytest fixture
Since I run tests in parallel thanks to the pytest-xdist
plugin,
I must make sure I run only one instance of the HTTP server.
We can accomplish this by checking the worker ID
(to handle both parallel and non-parallel cases)
and by using a lock.
The worker ID will be master
for non-parallel runs.
It can be obtained with the worker_id
fixture.
For the lock, I chose to use mkdir
as it's an atomic operation.
If the directory already exists, the operation fails
(someone else already got the lock).
Once the server is running, we must make sure it's ready,
so we send GET requests until it responds.
We put all this in a function and declare it a "fixture"
with "session" scope and "automatic use":
# tests/conftest.py
import subprocess
import sys
import time
import pytest
import requests # or httpx
def spawn_and_wait_server(port=8000):
process = subprocess.Popen([
sys.executable,
"-m",
"uvicorn",
"tests.http_server:app",
"--port",
str(port),
])
while True:
try:
requests.get(f"http://localhost:{port}/1")
except:
time.sleep(0.1)
else:
break
return process
# credits to pytest-xdist's README
@pytest.fixture(scope="session", autouse=True)
def http_server(tmp_path_factory, worker_id):
if worker_id == "master":
# single worker: just run the HTTP server
process = spawn_and_wait_server()
yield process
process.kill()
process.wait()
return
# get the temp directory shared by all workers
root_tmp_dir = tmp_path_factory.getbasetemp().parent
# try to get a lock
lock = root_tmp_dir / "lock"
try:
lock.mkdir(exist_ok=False)
except FileExistsError:
yield # failed, don't run the HTTP server
return
# got the lock, run the HTTP server
process = spawn_and_wait_server()
yield process
process.kill()
process.wait()
With this, each time you run your test suite,
exactly one instance of the HTTP server will run.
Top comments (0)