DEV Community

zhongqiyue
zhongqiyue

Posted on

When Your AI API Keeps Timing Out: A Lesson in Async Chunking

I spent last weekend trying to build a simple document summarizer. Nothing fancy—just take a 50-page PDF, send it to an AI API, and get back three bullet points. But the API kept timing out. Every. Single. Time.

I went from frustration to a solid solution, and I want to walk you through my thought process, the dead ends, and what actually worked. No fluff, just the trade-offs I wish someone had told me about.

The Problem: Documents That Don't Fit in a Single Prompt

I had a bunch of long legal documents—think contracts, reports, proposals. The AI API I was using (let's call it the endpoint at https://ai.interwestinfo.com/v1/completions) had a context window of about 4,000 tokens. A single contract could easily be 15,000 tokens.

My first naive attempt? Just send the whole thing and hope for the best:

import requests

url = "https://ai.interwestinfo.com/v1/completions"
payload = {
    "prompt": f"Summarize this: {full_text}",
    "max_tokens": 500
}
response = requests.post(url, json=payload, timeout=60)
print(response.json()["choices"][0]["text"])
Enter fullscreen mode Exit fullscreen mode

It failed with a 504 Gateway Timeout after 60 seconds. I increased the timeout to 300 seconds—still failed. The API wasn't designed to chew through 15k tokens in one go, and frankly, I don't blame it.

What I Tried That Didn't Work

1. Truncating the text

I chopped the document at 4,000 characters (roughly 1,000 tokens) and sent that. The summary was terrible—it summarised only the first page, missing the entire middle and conclusion. So truncation is a non-starter if you care about completeness.

2. Sending multiple sequential requests

I split the document into fixed-size chunks and sent them one after another, then combined the individual summaries. This worked, but it took ages—each chunk took ~3 seconds, and for 15 chunks that's 45 seconds. Plus, if one request failed, the whole pipeline had to restart. Not scalable.

3. Using requests in a loop

I tried a loop with requests.post inside, but Python's synchronous nature meant each request blocked the next. I was waiting around for nothing.

What Eventually Worked: Async Chunking with asyncio and aiohttp

The key insight was: I can send chunks in parallel, as long as I respect rate limits and handle partial failures gracefully. Most AI APIs are stateless, so there's no ordering requirement—each chunk is independent. I just need to combine results afterwards.

The Technique

  1. Chunk the document intelligently – split by paragraphs, not fixed character count, to avoid cutting sentences mid-thought.
  2. Send all chunks concurrently using asyncio with semaphore to throttle requests.
  3. Collect summaries and then run a final merging pass to combine them.

Here's the code I ended up with:

import asyncio
import aiohttp
from textwrap import wrap

API_URL = "https://ai.interwestinfo.com/v1/completions"
API_KEY = "sk-your-key"
MAX_TOKENS_PER_CHUNK = 1000  # conservative estimate

async def summarize_chunk(session, semaphore, chunk_text):
    async with semaphore:
        payload = {
            "prompt": f"Summarize the following text in 2-3 sentences:\n\n{chunk_text}",
            "max_tokens": 100
        }
        headers = {"Authorization": f"Bearer {API_KEY}"}
        async with session.post(API_URL, json=payload, headers=headers) as resp:
            if resp.status == 200:
                data = await resp.json()
                return data["choices"][0]["text"].strip()
            else:
                print(f"Chunk failed with {resp.status}: {await resp.text()}")
                return ""  # graceful degradation

async def parallel_summarize(full_text):
    # Split by paragraphs, then group into chunks under token limit
    paragraphs = full_text.split("\n\n")
    chunks = []
    current_chunk = ""
    for para in paragraphs:
        if len(current_chunk) + len(para) < MAX_TOKENS_PER_CHUNK * 4:  # rough char-to-token ratio
            current_chunk += para + "\n\n"
        else:
            chunks.append(current_chunk)
            current_chunk = para + "\n\n"
    if current_chunk:
        chunks.append(current_chunk)

    semaphore = asyncio.Semaphore(5)  # max 5 concurrent requests
    async with aiohttp.ClientSession() as session:
        tasks = [summarize_chunk(session, semaphore, chunk) for chunk in chunks]
        summaries = await asyncio.gather(*tasks)

    # Now combine the summaries into one final summary
    combined = " ".join(filter(None, summaries))
    # If combined is still long, recurse with a single chunk (no parallelism needed)
    if len(combined) > 1500:
        combined = await parallel_summarize(combined)  # recursion with smaller text
    return combined

# Usage
async def main():
    with open("contract.txt", "r") as f:
        text = f.read()
    summary = await parallel_summarize(text)
    print("Final summary:")
    print(summary)

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Note the Semaphore(5) – without it, you'll hit rate limits and get 429 errors. Tune the number based on your API's docs.

Lessons Learned & Trade-offs

This approach isn't perfect. Here are the trade-offs I hit:

  • Context loss: When you summarise each chunk independently, you lose cross-references. A legal clause in chunk 1 might be amended in chunk 5. My final merge pass helps a bit, but it's still lossy. If you need perfect coherence, you'd need a model with a larger window or use a technique like Map-Reduce summarisation (LangChain implements this).
  • Token estimation: I used rough characters-to-tokens ratio. That can be off by a factor of 2 depending on the language. Better to actually tokenise using the API's tokeniser, but that adds latency. I opted for a conservative max.
  • Error handling: My code silently drops failed chunks. In production you'd want to retry with exponential backoff and maybe log failures for manual review.
  • Cost: Parallel requests cost the same per token, but you pay for both the chunk requests and the final merge. For very long documents, you might run multiple levels of merging.
  • Not always async-friendly: Some APIs require ordered processing (e.g., GPT-3 Instruct series). In that case, you're stuck with sequential requests. But most completion endpoints are order-independent.

What I'd Do Differently Next Time

I'd start with a proper chunking library (like tiktoken for OpenAI-compatible APIs) to avoid the character-count kludge. I'd also implement a retry mechanism with jittered backoff. And I'd add a timeout per request (my code currently waits indefinitely).

If I were processing hundreds of documents, I'd push this into a queue system (Redis + Celery) rather than running it ad-hoc. But for a weekend project, async Python worked beautifully.

Your mileage may vary. What's your setup for handling large documents with AI APIs? I'd love to hear what's worked for you.

Top comments (0)