DEV Community

Mukunda Rao Katta
Mukunda Rao Katta

Posted on

Single-Flight LLM Calls: Coalesce 50 Concurrent Identical Requests Into One API Call

Fifty users submitted their daily summaries at 9:00 AM. All fifty requests ask the same follow-up question in the same way. Your system sends fifty identical LLM calls. You pay for fifty calls. All fifty complete at roughly the same time with the same response.

One call would have been enough.

llm-batch-coalesce is a single-flight deduplication layer: while an LLM call is in-flight, any identical call that arrives waits for the first result and shares it.


The Shape of the Fix

from llm_batch_coalesce import LLMCoalesce

coalesce = LLMCoalesce()

async def call_llm(messages: list[dict], model: str = "claude-sonnet-4-6") -> dict:
    return await coalesce.call(
        fn=anthropic_client.messages.create,
        model=model,
        messages=messages,
        max_tokens=1024,
    )
Enter fullscreen mode Exit fullscreen mode

If 50 concurrent requests arrive with the same messages and model, only one reaches the LLM. The other 49 wait. When the first completes, all 50 get the same result. You paid for one call.


What It Does NOT Do

llm-batch-coalesce does not deduplicate across time. It deduplicates in-flight requests only. If the first call completes and a second identical call arrives 100ms later, the second call goes to the API. For cross-time deduplication, use llm-cache-mem.

It does not handle partial failures gracefully for waiters. If the first call errors, all waiters get the same error. You cannot retry just one of the 50 waiters — they all see the failure from the first call. This is appropriate for transient errors (all should retry) but less ideal for quota errors where you might want to queue.

It does not preserve per-request context for waiters. All 50 requesters get the same response object. If they need different behavior based on per-request metadata (different user IDs, different logging contexts), that differentiation happens after the shared call completes.


Inside the Library

The coalescer uses asyncio.Future to share results:

import asyncio
import hashlib
import json

class LLMCoalesce:
    def __init__(self):
        self._in_flight: dict[str, asyncio.Future] = {}
        self._lock = asyncio.Lock()

    def _key(self, kwargs: dict) -> str:
        canonical = json.dumps(kwargs, sort_keys=True, default=str)
        return hashlib.sha256(canonical.encode()).hexdigest()

    async def call(self, fn, **kwargs) -> dict:
        key = self._key(kwargs)

        async with self._lock:
            if key in self._in_flight:
                # Wait for the existing call
                future = self._in_flight[key]
                # Return early while holding lock just briefly to get the future

        if key in self._in_flight:
            return await asyncio.shield(future)

        async with self._lock:
            if key in self._in_flight:
                return await asyncio.shield(self._in_flight[key])

            # Create future before releasing lock
            future = asyncio.get_event_loop().create_future()
            self._in_flight[key] = future

        try:
            result = await fn(**kwargs)
            future.set_result(result)
            return result
        except Exception as e:
            future.set_exception(e)
            raise
        finally:
            async with self._lock:
                self._in_flight.pop(key, None)
Enter fullscreen mode Exit fullscreen mode

The asyncio.Future is the coordination primitive. The first caller creates the future and starts the real API call. Subsequent callers find the future already in _in_flight and await it. When the first caller completes, future.set_result() wakes all waiters simultaneously.

The lock window is narrow: it is held only to check and update the _in_flight dict, not while waiting for the API call. This prevents lock contention from becoming a bottleneck under high concurrency.


When to Use It

Use it for stateless operations that may receive burst traffic with identical inputs. Report generation endpoints, daily digest processing, batch evaluation jobs — anything where multiple users or workers may simultaneously submit the same request.

Use it for external API calls that have quota limits. If you have 50 concurrent requests and a rate limit of 100/min, coalescing identical requests leaves quota headroom for diverse requests.

Use it alongside llm-cache-mem for both in-flight and historical deduplication. The coalescer handles concurrent duplicates; the cache handles repeated identical calls over time. Together they minimize API spend significantly.

Skip it for streaming responses. The coalescer waits for the complete response before sharing. If you need to stream output to users as it arrives, coalescing is incompatible — each requester would need their own stream.


Install

pip install git+https://github.com/MukundaKatta/llm-batch-coalesce

# Or from PyPI
pip install llm-batch-coalesce
Enter fullscreen mode Exit fullscreen mode
from llm_batch_coalesce import LLMCoalesce

coalesce = LLMCoalesce()

# Wrap any async LLM function
async def generate_daily_summary(user_id: str, date: str) -> str:
    messages = [{"role": "user", "content": build_summary_prompt(date)}]

    # Multiple users asking for the same date's summary
    # Only one LLM call happens for each unique messages/model combo
    response = await coalesce.call(
        fn=anthropic_client.messages.create,
        model="claude-sonnet-4-6",
        messages=messages,
        max_tokens=512,
    )

    return response.content[0].text

# Test: 10 concurrent requests for the same date
async def test_coalescing():
    results = await asyncio.gather(*[
        generate_daily_summary(f"user-{i}", "2026-05-24")
        for i in range(10)
    ])

    # All results are identical (same LLM response)
    assert all(r == results[0] for r in results)
    print(f"10 requests coalesced into 1 API call")
Enter fullscreen mode Exit fullscreen mode

Sibling Libraries

Library What it solves
llm-cache-mem Cross-time deduplication (LRU+TTL for non-concurrent identical calls)
agentidemp-py Request-level idempotency for non-concurrent duplicate agent runs
llm-rate-limit-bucket Token-bucket rate limiter for outbound LLM calls
llm-retry Exponential backoff retry when calls fail
token-budget-pool Thread-safe concurrent token/USD budget tracking

The concurrency stack: llm-batch-coalesce for in-flight dedup, llm-cache-mem for cross-time dedup, llm-rate-limit-bucket for rate limiting, llm-retry for failure recovery.


What's Next

Waiter count metrics: coalesce.stats() returning how many times a request was coalesced (waiter count per key), how much the coalescing saved in estimated API cost, and the current in-flight count. Makes the value of the coalescer visible in dashboards.

Configurable key function: LLMCoalesce(key_fn=...) for cases where you want to coalesce on a subset of the kwargs (e.g., coalesce on messages content only, ignoring model differences). Some use cases allow sharing responses across model variants.

Sync version: SyncLLMCoalesce using threading.Event instead of asyncio.Future for sync contexts. The async version requires an event loop; the sync version would work in traditional multi-threaded WSGI apps.


Built as part of the agent-stack family: composable Python primitives for production LLM agents.

Top comments (0)