The problem with run_in_executor
If you've ever needed to offload CPU-bound work from an asyncio event loop, you've written something like this:
import asyncio
from concurrent.futures import ProcessPoolExecutor
def heavy(n: int) -> int:
return sum(i * i for i in range(n))
async def main():
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, heavy, 10_000_000)
print(result)
It works. But the moment you need to answer any of these questions, you're stuck:
- What process is running this? No PID exposed.
- How long has it been running? No start time, no duration.
-
Can I cancel it cleanly?
task.cancel()cancels the Future in Python — but the worker process keeps running. -
What state is the task in right now? You get a bare
Future, nothing else. -
Did it fail? With what exception? You have to
try/exceptthe await.
For a short script, this is fine. For production code — background jobs, ML inference pipelines, data processing workers — these gaps are real pain.
What I built
async_patcher is a zero-dependency library that monkey-patches asyncio on import to add a to_process() function. It returns a ProcessTask — an asyncio.Task subclass that carries everything run_in_executor never told you.
pip install async-patcher
import asyncio
import async_patcher # patches asyncio on import
def heavy(n: int) -> int:
return sum(i * i for i in range(n))
async def main():
task = asyncio.to_process(heavy, 10_000_000)
result = await task
print(task.pid) # 48271
print(task.status) # TaskStatus.DONE
print(task.duration) # 0.412 (seconds)
print(task.func_name) # "heavy"
Same API surface. Same await semantics. Just a richer object coming back.
What ProcessTask gives you
Every ProcessTask carries:
| Attribute | Type | Description |
|---|---|---|
pid |
`int \ | None` |
func_name |
str |
Name of the offloaded callable |
status |
TaskStatus |
PENDING / RUNNING / DONE / FAILED / CANCELLED |
start_time |
float |
time.monotonic() at worker start |
end_time |
float |
time.monotonic() at worker end |
duration |
float |
end_time - start_time in seconds |
exception |
`BaseException \ | None` |
args/kwargs |
tuple / dict |
Arguments the worker was called with |
Graceful cancellation
The most important fix. When you call task.cancel(), async_patcher actually kills the process:
task = asyncio.to_process(heavy, 10_000_000)
await asyncio.sleep(0.1)
task.cancel()
await task # SIGTERM sent, then SIGKILL after cancel_timeout (default 5s)
print(task.status) # TaskStatus.CANCELLED
print(task.pid) # still available post-cancel
The escalation sequence: SIGTERM first, then SIGKILL after cancel_timeout seconds if the process hasn't stopped. Configurable per call:
task = asyncio.to_process(heavy, 10_000_000, cancel_timeout=2.0)
The @run_in_process decorator
Tired of writing asyncio.to_process(fn, ...) at every call site? Use the decorator:
from async_patcher import run_in_process
@run_in_process
def preprocess(data: list[float]) -> list[float]:
return [x ** 2 for x in data]
async def main():
task = await preprocess([1.0, 2.0, 3.0])
print(task.duration) # 0.003
Parameterised form for custom executor or timeout:
@run_in_process(cancel_timeout=10.0, timeout=30.0)
def train_model(config: dict) -> dict:
...
Lifecycle callbacks
Hook into any lifecycle transition without subclassing:
import logging
def on_start(task):
logging.info("started pid=%s fn=%s", task.pid, task.func_name)
def on_done(task):
logging.info("done fn=%s duration=%.3fs", task.func_name, task.duration)
def on_error(task):
logging.error("failed fn=%s err=%s", task.func_name, task.exception)
task = asyncio.to_process(
heavy, 10_000_000,
on_start=on_start,
on_done=on_done,
on_error=on_error,
)
Callback exceptions are caught and logged at WARNING level — they never affect the awaiter.
Timeout support
try:
task = asyncio.to_process(heavy, 10_000_000, timeout=5.0)
result = await task
except TimeoutError:
print(task.status) # TaskStatus.CANCELLED
print(task.duration) # ~5.0
The timeout fires → SIGTERM → SIGKILL → TimeoutError raised to the caller. No zombie processes.
Managing the executor pool
# Option 1 — set a module-level default once at startup
from concurrent.futures import ProcessPoolExecutor
import async_patcher
async_patcher.set_default_executor(ProcessPoolExecutor(max_workers=4))
# Option 2 — context manager (creates, sets default, shuts down cleanly)
async with async_patcher.process_pool(max_workers=4):
results = await asyncio.gather(
asyncio.to_process(heavy, 1_000_000),
asyncio.to_process(heavy, 2_000_000),
asyncio.to_process(heavy, 3_000_000),
)
Full type support
async_patcher ships with a py.typed PEP 561 marker and .pyi stub files for every public module. It passes mypy --strict out of the box. All ProcessTask attributes are fully typed and discoverable in your IDE.
from async_patcher import ProcessTask
from async_patcher.task import TaskStatus
async def run() -> None:
task: ProcessTask = asyncio.to_process(heavy, 100)
await task
assert task.status is TaskStatus.DONE
Zero dependencies
The entire library is pure Python stdlib — asyncio, concurrent.futures, multiprocessing, signal, functools, enum. Nothing to pin, nothing to audit, nothing that breaks on a transitive update.
Install & links
pip install async-patcher
- GitHub: https://github.com/satyamsoni2211/async_patcher
- Issues / feature requests: https://github.com/satyamsoni2211/async_patcher/issues
The library has 67 tests across 12 test files covering the full lifecycle, cancellation paths, callbacks, decorator forms, and pool management. Feedback, issues, and PRs are very welcome.
Built this after noticing the same boilerplate appearing in every asyncio + multiprocessing codebase I worked on. The event loop deserves to know what its workers are doing.
Top comments (0)