The problem is straightforward to state and surprisingly hard to solve correctly.
Celery workers are synchronous. Celery spawns prefork worker processes, and when a task arrives, it calls your task function like this: task_function(*args, **kwargs). It expects a return value. It blocks the worker thread until it gets one. It does not know or care that you wrote async def.
But modern Python services are async. FastAPI is async. SQLAlchemy 2.0 is async. httpx, aiohttp, asyncpg the entire interesting half of the ecosystem has gone async-first. The idea of maintaining two parallel code paths, one async for your web layer, one sync for your task layer is exactly the kind of thing that creates maintenance debt, copy-paste bugs, and the kind of divergence you only notice when something breaks in production.
So you want to write async def task functions and have them work inside a Celery worker. How hard can it be?
Harder than it looks.
Why asyncio.run() doesn't work
The first thing most people try:
def task_wrapper(*args, **kwargs):
return asyncio.run(your_async_function(*args, **kwargs))
This works in isolation. It fails in production for a specific reason: asyncio.run() creates a new event loop, runs the coroutine to completion, then closes the loop. If there is already a running event loop on the current thread, and there frequently is, in test environments, in newer Celery versions, in signal handlers, it raises:
RuntimeError: This event loop is already running.
The fix most people find next is nest_asyncio:
import nest_asyncio
nest_asyncio.apply()
# now asyncio.run() "works" from inside a running loop
nest_asyncio patches the event loop to allow re-entrant calls. It works in simple cases. The subtle failure mode: re-entrant event loops change the execution order of scheduled callbacks and coroutines. Code that was safe under normal scheduling assumptions becomes non-deterministic under concurrent load. Bugs appear only at production concurrency, only under specific timing, and are nearly impossible to reproduce in development.
The prefork complication
Even if you solve the asyncio.run() problem, Celery's prefork concurrency model introduces a second failure that takes longer to diagnose because it manifests as infinite silence rather than an immediate error.
When Celery starts, it forks N worker processes from a single parent. After fork(), the child process inherits the parent's memory including any event loop objects that existed before the fork.
The problem: fork() does not copy threads. A Python asyncio.AbstractEventLoop is driven by a thread calling loop.run_forever(). After fork(), the child has the loop object but not the thread running it. The loop's internal state may indicate it was running; nothing is actually driving it. Any coroutine scheduled onto this loop hangs indefinitely.
@worker_process_init.connect
def bad_init(**kwargs):
loop = asyncio.get_event_loop()
# This loop was inherited from the parent.
# The thread driving it died when the parent forked.
# loop.is_running() → False.
# Scheduling coroutines onto it produces no results and no errors.
future = asyncio.run_coroutine_threadsafe(some_coro(), loop)
future.result() # blocks forever
This is the kind of bug that produces a zero-width failure window. The loop object exists and looks valid. No exception is raised. Work just never completes. I spent the better part of a day convinced the issue was in the Redis client before realizing the loop scheduled to drive it had died at fork time.
The solution: a persistent bridge loop per worker process
The correct approach is to create a brand-new event loop inside each forked worker process and start a dedicated daemon thread to drive it. The bridge loop is the only asyncio runtime in the worker process. All async work runs on it. Celery's synchronous worker threads never touch an event loop directly.
worker_loop: asyncio.AbstractEventLoop | None = None
@worker_process_init.connect
def init_worker_process(**kwargs):
global worker_loop
# Always create a fresh loop in the forked child.
# Never reuse the inherited parent loop object.
worker_loop = asyncio.new_event_loop()
# A daemon thread drives the loop independently of Celery's
# synchronous execution threads.
t = threading.Thread(
target=_run_event_loop,
args=(worker_loop,),
daemon=True
)
t.start()
def _run_event_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
Now the bridge is asyncio.run_coroutine_threadsafe. When Celery calls the synchronous task wrapper, the wrapper schedules the async orchestration coroutine onto the background loop and blocks the worker thread waiting for the result:
def wrapper(self, *args, **kwargs):
async def _orchestrate():
# Schema migration, idempotency check, Phoenix heartbeat,
# OTel span setup, task execution, fence validation, DLQ quarantine.
result = await your_async_task_function(*args, **kwargs)
return result
# Schedule the coroutine from this synchronous thread onto
# the event loop running on the background thread.
future = asyncio.run_coroutine_threadsafe(_orchestrate(), worker_loop)
# Block the Celery worker thread here. All actual work happens
# on the bridge loop thread.
return future.result(timeout=300)
run_coroutine_threadsafe is the correct API for this pattern. It is thread-safe, it returns a concurrent.futures.Future (not an asyncio Future), and future.result() blocks without touching the event loop. The background loop thread does all the async I/O. The Celery worker thread just waits.
This solves both problems cleanly:
- No
asyncio.run()from inside a running loop. The loop lives on a different thread. - No inherited-but-dead loop. Each worker creates its own after fork.
The push/apush split
Dispatching tasks has its own version of this problem. Celery's send_task is synchronous and blocking, it opens a broker connection and writes a message. If you call it from inside an async FastAPI route handler, you block the event loop during a network round-trip.
This is why Relier has two dispatch methods:
# From async code (FastAPI, async Django):
receipt = await send_invoice.apush(invoice_id)
# From sync code (Flask routes, sync Django views, scripts):
receipt = send_invoice.push(invoice_id)
apush runs the blocking broker send in an executor so the async caller is never blocked:
async def apush(self, *args, **kwargs):
# Admission check, schema wrapping, OTel context injection...
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
None,
lambda: celery_app.send_task(
self.name,
args=(envelope,),
queue=queue,
task_id=task_id,
),
)
push explicitly guards against being called from inside a running loop:
def push(self, *args, **kwargs):
try:
asyncio.get_running_loop()
except RuntimeError:
pass # No running loop on this thread. Safe.
else:
raise RuntimeError(
f"{self.name}.push() was called from inside a running event loop, "
"where it would block and deadlock that loop. "
f"Use `await {self.name}.apush(...)` instead."
)
# Inside a Celery worker: reuse the bridge loop.
if worker_loop and worker_loop.is_running():
future = asyncio.run_coroutine_threadsafe(
self.apush(*args, **kwargs), worker_loop
)
return future.result(timeout=5.0)
# Outside Celery (Flask route, script):
return asyncio.run(self.apush(*args, **kwargs))
The error message in the RuntimeError matters. When someone calls push() from a FastAPI route handler, they get an actionable message telling them exactly what to do instead. Not a silent deadlock. Not a timeout with no context. A specific message at the exact moment the mistake is made.
The check itself, asyncio.get_running_loop() in a try/except RuntimeError is the canonical way to detect whether the current thread is running an event loop. It raises RuntimeError if no loop is running on this thread, which is the safe case for push().
Sync tasks in an async world
What about existing sync task functions? A codebase of def tasks shouldn't require a full rewrite to benefit from Relier's reliability stack.
Inside the orchestration coroutine, execution branches on whether the function is async:
if inspect.iscoroutinefunction(func):
result = await func(*actual_args, **actual_kwargs)
else:
result = await asyncio.to_thread(func, *actual_args, **actual_kwargs)
asyncio.to_thread runs the sync function in Python's default thread pool executor. The orchestration layer awaits it without blocking the bridge loop. All the async infrastructure, heartbeat refreshes, Phoenix registration, OTel span updates, fence validation keeps running concurrently on the bridge loop while the sync function runs on a thread pool thread.
The constraint is honest: two-tier timeouts (soft_timeout, hard_timeout) only work for async def tasks. A sync function running in asyncio.to_thread cannot be cooperatively cancelled from outside. Relier raises ValueError at decoration time if you pass timeout parameters to a sync task, rather than silently providing no protection:
@rl_task(soft_timeout=8, hard_timeout=10) # ValueError at import time
def sync_task(data: str) -> dict:
...
# Fix: convert to async def, or remove the timeout parameters.
Failing loudly at decoration time is better than failing silently at runtime when the timeout fires and nothing happens.
Timeout enforcement without thread kills
Two-tier timeouts deserve their own explanation because they interact with the bridge loop in a non-obvious way.
When a task starts, Relier spawns two watcher coroutines as asyncio tasks alongside the actual work:
task_coro = asyncio.create_task(func(*args, **kwargs))
async def _soft_timeout_handler():
await asyncio.sleep(float(soft))
if not task_coro.done():
# Fire the recovery hook. Task keeps running.
if on_soft:
await on_soft(ctx)
async def _hard_timeout_handler():
await asyncio.sleep(float(hard))
if not task_coro.done():
task_coro.cancel() # Delivers CancelledError at next await point.
soft_watcher = asyncio.create_task(_soft_timeout_handler())
hard_watcher = asyncio.create_task(_hard_timeout_handler())
done, pending = await asyncio.wait(
{task_coro, hard_watcher},
return_when=asyncio.FIRST_COMPLETED,
)
All three coroutines run concurrently on the bridge loop. The soft timeout fires and calls your recovery hook, where you can call ctx.set_partial(state) to checkpoint work in progress while the task keeps running. If the task doesn't finish before the hard deadline, task_coro.cancel() delivers asyncio.CancelledError at the task's next await point.
No thread kills. No SIGALRM. No OS-level signals. Pure cooperative asyncio cancellation. This matters for cleanup: CancelledError propagates through finally blocks. Resources get released. Partial state gets checkpointed. The task gets quarantined to the DLQ with its full payload and resurrection history. None of that happens with a hard OS kill.
The disposable loop case
One edge case worth knowing: outside a Celery worker in a CLI script, a management command, a test, there's no bridge loop. The task wrapper's loop resolution falls through to creating a fresh event loop just for that call:
def _get_worker_loop():
# 1. Check for persistent worker bridge.
if relier.tasks.app.worker_loop is not None:
return relier.tasks.app.worker_loop
# 2. Check for a running loop on this thread (test contexts).
try:
return asyncio.get_running_loop()
except RuntimeError:
pass
# 3. Create a disposable loop for this one call.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
Disposable loops are cleaned up after the call: Redis connections are closed, the loop is stopped and closed, and asyncio.set_event_loop(None) clears the thread-local reference. The persistent worker_loop is specifically excluded from this cleanup path closing the bridge loop mid-execution would kill all in-flight tasks.
What I learned
The prefork problem is the kind of failure that shows up as "nothing happens" rather than an exception. You schedule coroutines, they don't run, no error surfaces. It took a day of debugging the wrong thing before I isolated it to the inherited-but-dead loop. The fix (create a fresh loop in worker_process_init) is obvious in retrospect. Getting there required understanding exactly what fork() does to threads.
asyncio.run_coroutine_threadsafe is underused. Most Python developers never need to cross a thread boundary into a running event loop, so the API is obscure. But for anything that marries a sync framework (Celery, Django ORM, WSGI in general) with async internals, it is the correct and safe way to do it. It appears in the Python docs in a single paragraph. It deserves more.
The two-method dispatch split (push/apush) is the right API surface even though it introduces surface area. The alternative, a single method that auto-detects the context and does the right thing sounds better but produces confusing failures when the auto-detection is wrong. The explicit split makes the contract clear. Async code always uses apush. Sync code always uses push. The guard in push() exists so that misuse produces a useful error immediately rather than a deadlock ten seconds later.
Cooperative timeout cancellation is better than OS-level signals for tasks that care about cleanup. The finally block guarantee is the part that matters: partial state can be persisted, connections can be closed, the DLQ entry gets written with everything needed to re-inspect or re-dispatch. An OS kill gives you none of that.
The whole bridge, bridge loop thread, run_coroutine_threadsafe, push/apush split, disposable loop cleanup is about 200 lines in app.py and decorator.py combined. The complexity is real but contained. Once the pattern is in place, every async def task function just works, without the task author knowing anything about the event loop infrastructure underneath.
GitHub: github.com/getrelier/relier
Docs: getrelier.github.io/relier
pip install relier
Top comments (0)