DEV Community

Cover image for What asyncio.run_in_executor doesn't tell you (and how I fixed it)
satyamsoni2211
satyamsoni2211

Posted on

What asyncio.run_in_executor doesn't tell you (and how I fixed it)

The problem with run_in_executor

If you've ever needed to offload CPU-bound work from an asyncio event loop, you've written something like this:

import asyncio
from concurrent.futures import ProcessPoolExecutor

def heavy(n: int) -> int:
    return sum(i * i for i in range(n))

async def main():
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, heavy, 10_000_000)
    print(result)
Enter fullscreen mode Exit fullscreen mode

It works. But the moment you need to answer any of these questions, you're stuck:

  • What process is running this? No PID exposed.
  • How long has it been running? No start time, no duration.
  • Can I cancel it cleanly? task.cancel() cancels the Future in Python — but the worker process keeps running.
  • What state is the task in right now? You get a bare Future, nothing else.
  • Did it fail? With what exception? You have to try/except the await.

For a short script, this is fine. For production code — background jobs, ML inference pipelines, data processing workers — these gaps are real pain.


What I built

async_patcher is a zero-dependency library that monkey-patches asyncio on import to add a to_process() function. It returns a ProcessTask — an asyncio.Task subclass that carries everything run_in_executor never told you.

pip install async-patcher
Enter fullscreen mode Exit fullscreen mode
import asyncio
import async_patcher  # patches asyncio on import

def heavy(n: int) -> int:
    return sum(i * i for i in range(n))

async def main():
    task = asyncio.to_process(heavy, 10_000_000)
    result = await task

    print(task.pid)        # 48271
    print(task.status)     # TaskStatus.DONE
    print(task.duration)   # 0.412 (seconds)
    print(task.func_name)  # "heavy"
Enter fullscreen mode Exit fullscreen mode

Same API surface. Same await semantics. Just a richer object coming back.


What ProcessTask gives you

Every ProcessTask carries:

Attribute Type Description
pid `int \ None`
func_name str Name of the offloaded callable
status TaskStatus PENDING / RUNNING / DONE / FAILED / CANCELLED
start_time float time.monotonic() at worker start
end_time float time.monotonic() at worker end
duration float end_time - start_time in seconds
exception `BaseException \ None`
args/kwargs tuple / dict Arguments the worker was called with

Graceful cancellation

The most important fix. When you call task.cancel(), async_patcher actually kills the process:

task = asyncio.to_process(heavy, 10_000_000)
await asyncio.sleep(0.1)
task.cancel()
await task  # SIGTERM sent, then SIGKILL after cancel_timeout (default 5s)

print(task.status)   # TaskStatus.CANCELLED
print(task.pid)      # still available post-cancel
Enter fullscreen mode Exit fullscreen mode

The escalation sequence: SIGTERM first, then SIGKILL after cancel_timeout seconds if the process hasn't stopped. Configurable per call:

task = asyncio.to_process(heavy, 10_000_000, cancel_timeout=2.0)
Enter fullscreen mode Exit fullscreen mode

The @run_in_process decorator

Tired of writing asyncio.to_process(fn, ...) at every call site? Use the decorator:

from async_patcher import run_in_process

@run_in_process
def preprocess(data: list[float]) -> list[float]:
    return [x ** 2 for x in data]

async def main():
    task = await preprocess([1.0, 2.0, 3.0])
    print(task.duration)  # 0.003
Enter fullscreen mode Exit fullscreen mode

Parameterised form for custom executor or timeout:

@run_in_process(cancel_timeout=10.0, timeout=30.0)
def train_model(config: dict) -> dict:
    ...
Enter fullscreen mode Exit fullscreen mode

Lifecycle callbacks

Hook into any lifecycle transition without subclassing:

import logging

def on_start(task):
    logging.info("started pid=%s fn=%s", task.pid, task.func_name)

def on_done(task):
    logging.info("done fn=%s duration=%.3fs", task.func_name, task.duration)

def on_error(task):
    logging.error("failed fn=%s err=%s", task.func_name, task.exception)

task = asyncio.to_process(
    heavy, 10_000_000,
    on_start=on_start,
    on_done=on_done,
    on_error=on_error,
)
Enter fullscreen mode Exit fullscreen mode

Callback exceptions are caught and logged at WARNING level — they never affect the awaiter.


Timeout support

try:
    task = asyncio.to_process(heavy, 10_000_000, timeout=5.0)
    result = await task
except TimeoutError:
    print(task.status)   # TaskStatus.CANCELLED
    print(task.duration) # ~5.0
Enter fullscreen mode Exit fullscreen mode

The timeout fires → SIGTERM → SIGKILL → TimeoutError raised to the caller. No zombie processes.


Managing the executor pool

# Option 1 — set a module-level default once at startup
from concurrent.futures import ProcessPoolExecutor
import async_patcher

async_patcher.set_default_executor(ProcessPoolExecutor(max_workers=4))

# Option 2 — context manager (creates, sets default, shuts down cleanly)
async with async_patcher.process_pool(max_workers=4):
    results = await asyncio.gather(
        asyncio.to_process(heavy, 1_000_000),
        asyncio.to_process(heavy, 2_000_000),
        asyncio.to_process(heavy, 3_000_000),
    )
Enter fullscreen mode Exit fullscreen mode

Full type support

async_patcher ships with a py.typed PEP 561 marker and .pyi stub files for every public module. It passes mypy --strict out of the box. All ProcessTask attributes are fully typed and discoverable in your IDE.

from async_patcher import ProcessTask
from async_patcher.task import TaskStatus

async def run() -> None:
    task: ProcessTask = asyncio.to_process(heavy, 100)
    await task
    assert task.status is TaskStatus.DONE
Enter fullscreen mode Exit fullscreen mode

Zero dependencies

The entire library is pure Python stdlib — asyncio, concurrent.futures, multiprocessing, signal, functools, enum. Nothing to pin, nothing to audit, nothing that breaks on a transitive update.


Install & links

pip install async-patcher
Enter fullscreen mode Exit fullscreen mode

The library has 67 tests across 12 test files covering the full lifecycle, cancellation paths, callbacks, decorator forms, and pool management. Feedback, issues, and PRs are very welcome.


Built this after noticing the same boilerplate appearing in every asyncio + multiprocessing codebase I worked on. The event loop deserves to know what its workers are doing.

Top comments (0)