Когда у вас 10 запросов в LLM — синхронный for нормально. Когда 1000 — он становится бутылочным горлышком, и пайплайн крутится часами. Когда 100 000 — обычный API становится дорогим, и расходы на токены съедают юнит-экономику. Два классических решения: async-параллельность (asyncio + aiohttp для 50–500 запросов в секунду) и Batch API (off-line режим со скидкой 50% на input/output).
Этот гайд — рабочий код обоих паттернов через единый шлюз Promptra (Claude Opus 4.7, GPT-5.5, Gemini 3.1 Pro, DeepSeek V4 Pro), расчёт реальной экономии на типовых сценариях, паттерны очередей и retry для production, и чёткие правила «когда что брать». оплата в рублях по договору, полный пакет закрывающих документов, цены в рублях по курсу ЦБ.
TL;DR — два режима, две экономии
Async (через asyncio.gather + AsyncOpenAI):
- Real-time, ответ за секунды
- Throughput до 500 RPS на ключ
- Та же цена, что у обычного API
- Когда: UI, агенты, real-time чаты
Batch API (через client.batches.create):
- Offline, SLA до 24 часов (обычно час-два)
- Скидка 50% на input и output
- Лимита на размер нет (миллионы запросов в одном файле)
- Когда: разметка, классификация архива, summary всей базы
Производительный production-стек использует оба.
Часть 1: Async-вызовы через asyncio
Базовый паттерн — параллельное выполнение N запросов через asyncio.gather:
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI(
api_key="sk-promptra-...",
base_url="https://api.promptra.ru/v1",
)
async def call_one(prompt: str) -> str:
response = await client.chat.completions.create(
model="claude-sonnet-4-6",
messages=[{"role": "user", "content": prompt}],
)
return response.choices[0].message.content
async def main:
prompts = [f"Расскажи короткий факт про число {i}" for i in range(100)]
results = await asyncio.gather(*[call_one(p) for p in prompts])
for p, r in zip(prompts, results):
print(p, "→", r[:80])
asyncio.run(main)
100 запросов в секунду — обычно реально (упирается в rate limit ключа, не в SDK). Сравнение со синхронным циклом:
| Сценарий | Время | Throughput |
|---|---|---|
Sync for (100 запросов) |
~180 сек | 0.5 RPS |
asyncio.gather(100) |
~3.5 сек | ~28 RPS |
asyncio.gather + Semaphore(20) |
~6 сек | ~17 RPS |
Async ускоряет в 30–50 раз на типовом latency 1–2 секунды на запрос. Но без контроля параллельности вы быстро упрётесь в rate limit.
Rate limit через Semaphore
Если параметр N в gather слишком большой — ловите 429 от API. Решение — asyncio.Semaphore:
async def call_with_semaphore(sem: asyncio.Semaphore, prompt: str) -> str:
async with sem:
return await call_one(prompt)
async def main:
sem = asyncio.Semaphore(20) # максимум 20 параллельных запросов
prompts = [f"Запрос {i}" for i in range(1000)]
results = await asyncio.gather(*[call_with_semaphore(sem, p) for p in prompts])
Semaphore(20) означает: всегда не больше 20 параллельных, как только одна задача завершилась — следующая стартует. Это даёт постоянную нагрузку без всплесков.
Производительный лимит на ключ через Promptra — обычно 600 RPM (10 RPS), при росте трафика — поднимается через дашборд. Semaphore — это контроль на вашей стороне, чтобы не пропускать 429 в код приложения.
Retry с exponential backoff
Даже с Semaphore периодически прилетят 429 (всплески), 503 (временные сбои API), таймауты. Стандарт — tenacity:
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from openai import RateLimitError, APIConnectionError, APITimeoutError
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=2, max=30),
retry=retry_if_exception_type((RateLimitError, APIConnectionError, APITimeoutError)),
reraise=True,
)
async def call_with_retry(prompt: str) -> str:
response = await client.chat.completions.create(
model="claude-sonnet-4-6",
messages=[{"role": "user", "content": prompt}],
timeout=120,
)
return response.choices[0].message.content
Что важно:
- 5 attempts — больше обычно бесполезно, проблема не разрешится.
- Exponential backoff 2 → 4 → 8 → 16 → 30 сек — даёт API время восстановиться.
- Только на retry-able ошибках — 400 (bad request) или 401 (auth) не retry'ить, это ваши ошибки.
-
reraise=True— после 5 неудачных попыток ошибка пробрасывается в код, не глотается.
Полный production-шаблон async
Собираем всё вместе — паттерн для обработки тысяч задач:
import asyncio
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
client = AsyncOpenAI(
api_key="sk-promptra-...",
base_url="https://api.promptra.ru/v1",
)
@retry(stop=stop_after_attempt(5), wait=wait_exponential(min=2, max=30))
async def classify(text: str, sem: asyncio.Semaphore) -> dict:
async with sem:
response = await client.chat.completions.create(
model="gpt-5-4",
messages=[
{"role": "system", "content": "Классифицируй текст: positive/neutral/negative."},
{"role": "user", "content": text},
],
temperature=0,
timeout=60,
)
return {
"text": text[:100],
"label": response.choices[0].message.content.strip.lower,
"tokens": response.usage.total_tokens,
}
async def process_dataset(texts: list[str], concurrency: int = 20) -> list[dict]:
sem = asyncio.Semaphore(concurrency)
results = await asyncio.gather(
*[classify(t, sem) for t in texts],
return_exceptions=True,
)
# отделяем успехи от ошибок
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
print(f"Успешно: {len(successes)}, ошибок: {len(failures)}")
return successes
asyncio.run(process_dataset(my_texts, concurrency=30))
return_exceptions=True — критично: одна упавшая задача не валит весь батч. Анализируете ошибки отдельно, ретраите при необходимости.
Часть 2: Batch API — −50% за оффлайн режим
Async помогает с throughput, но цену за токены не меняет. Batch API даёт скидку 50% на input и output, если согласны ждать до 24 часов. Эта статья — часть pillar-гида: полный технический гид по LLM API на Python — токены, function calling, streaming, RAG, batch.
Архитектура: вы готовите JSONL-файл с тысячами запросов, загружаете на сервер, ждёте окончания, скачиваете JSONL с результатами.
Шаг 1. Готовим JSONL
Каждая строка — один запрос:
import json
def make_batch_file(texts: list[str], model: str, path: str):
with open(path, "w", encoding="utf-8") as f:
for i, text in enumerate(texts):
request = {
"custom_id": f"task-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": model,
"messages": [
{"role": "system", "content": "Классифицируй текст."},
{"role": "user", "content": text},
],
"temperature": 0,
},
}
f.write(json.dumps(request, ensure_ascii=False) + "\n")
make_batch_file(texts, "gpt-5-4", "/tmp/batch.jsonl")
custom_id — ваш идентификатор для сопоставления результата с исходным запросом. Обычно — id записи в БД или индекс.
Шаг 2. Загружаем и стартуем batch
from openai import OpenAI
client = OpenAI(
api_key="sk-promptra-...",
base_url="https://api.promptra.ru/v1",
)
# загружаем файл
upload = client.files.create(file=open("/tmp/batch.jsonl", "rb"), purpose="batch")
print(f"File ID: {upload.id}")
# стартуем batch
batch = client.batches.create(
input_file_id=upload.id,
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={"task": "classify_tickets", "version": "v3"},
)
print(f"Batch ID: {batch.id}, status: {batch.status}")
completion_window="24h" — SLA, обычно реально завершается за час-два.
Шаг 3. Опрос статуса и скачивание результата
import time
def wait_for_batch(batch_id: str, poll_interval: int = 60) -> dict:
while True:
batch = client.batches.retrieve(batch_id)
print(f"Status: {batch.status}, completed: {batch.request_counts.completed}/{batch.request_counts.total}")
if batch.status in ("completed", "failed", "expired", "cancelled"):
return batch
time.sleep(poll_interval)
batch = wait_for_batch(batch.id)
if batch.status == "completed":
output = client.files.content(batch.output_file_id)
with open("/tmp/batch_results.jsonl", "wb") as f:
f.write(output.content)
Шаг 4. Парсим результаты
results = {}
with open("/tmp/batch_results.jsonl") as f:
for line in f:
record = json.loads(line)
custom_id = record["custom_id"]
if record.get("error"):
results[custom_id] = {"error": record["error"]}
else:
answer = record["response"]["body"]["choices"][0]["message"]["content"]
results[custom_id] = {"answer": answer}
# теперь по custom_id сопоставляете с исходными данными
Подробности API — в официальной документации Batch у OpenAI и в Message Batches у Anthropic.
Экономика: реальные числа
Пример сценария: классификация 100 000 тикетов, средний 1500 input + 500 output токенов.
Через обычный async API
| Модель | Цена | Стоимость |
|---|---|---|
| Claude Opus 4.7 | 350/1790 ₽ | 142 000 ₽ |
| Claude Sonnet 4.6 | 210/1070 ₽ | 85 000 ₽ |
| GPT-5.5 | 350/2150 ₽ | 160 000 ₽ |
| GPT-5.4 | 170/1070 ₽ | 78 500 ₽ |
| Gemini 3.1 Pro | 140/860 ₽ | 64 000 ₽ |
| DeepSeek V4 Pro | 30/60 ₽ | 7 500 ₽ |
Через Batch API (−50%)
| Модель | Цена batch | Стоимость | Экономия |
|---|---|---|---|
| Claude Opus 4.7 | 175/895 ₽ | 71 000 ₽ | 71 000 ₽ |
| Claude Sonnet 4.6 | 105/535 ₽ | 42 500 ₽ | 42 500 ₽ |
| GPT-5.5 | 175/1075 ₽ | 80 000 ₽ | 80 000 ₽ |
| GPT-5.4 | 85/535 ₽ | 39 250 ₽ | 39 250 ₽ |
Если у вас есть оффлайн-процессинг — Batch это просто бесплатные −50% к расходам. На больших объёмах это миллионы рублей экономии в год.
Когда брать async, когда batch — дерево решений
Задача поступила:
┌─────────────────────────────────┐
│ Нужен ответ за секунды? │
└────┬────────────────────┬───────┘
│ да │ нет
▼ ▼
┌──────┐ ┌─────────────────┐
│ async│ │ Объём > 1000 │
│ (UI, │ │ запросов? │
│ агент,│ └─────┬──────┬────┘
│ чат) │ │ да │ нет
└──────┘ ▼ ▼
┌──────┐ ┌──────┐
│batch │ │async │
│−50% │ │быстро│
└──────┘ └──────┘
Правила:
- Real-time UI (чат, ассистент) → async + streaming
- Агент с tool calls → async (нужно несколько roundtrips)
- Embedding большой базы → batch (вместо 100K параллельных async)
- Ночная переклассификация → batch
- A/B тест промтов на датасете → batch
- Если хочется и того, и того → async с фоновой очередью + batch для архивных задач
Production-паттерн: async + batch в одной системе
Архитектура зрелого LLM-сервиса:
# real-time эндпоинт — async
@app.post("/chat")
async def chat(req: ChatRequest):
return await async_chat_via_streaming(req)
# background задачи через очередь (Celery/RQ/Dramatiq)
@celery.task
def reclassify_all_tickets:
tickets = db.query("SELECT id, text FROM tickets WHERE status='new'").all
batch_id = submit_to_batch(tickets, model="claude-sonnet-4-6")
schedule_check_batch(batch_id, after_minutes=60)
@celery.task
def check_batch(batch_id: str):
batch = client.batches.retrieve(batch_id)
if batch.status == "completed":
results = download_and_parse(batch.output_file_id)
save_to_db(results)
elif batch.status in ("in_progress", "validating"):
# ещё не готов — перепланировать
schedule_check_batch(batch_id, after_minutes=30)
else:
alert_team(f"Batch {batch_id} failed: {batch.status}")
Та же база токенов, тот же ключ Promptra, один счёт от юр.лица. Через единый шлюз биллинг един — async расходы и batch расходы видны в одном дашборде.
Распространённые ошибки
1. Использовать sync for на 1000+ запросов. На latency 1.5 сек — это 25 минут вместо 30 секунд через async.
2. Делать async без Semaphore. Получаете шторм 429-х, ваши попытки retry усугубляют ситуацию.
3. Использовать обычный API там, где нужен Batch. На 100K запросов это лишние 40-80K ₽ за прогон. На 10 прогонов в месяц — почти 1М ₽ в год.
4. Не парсить ошибки batch results. Каждая запись может быть либо response, либо error. Если игнорировать error — потеряете 0.1-1% записей молча.
5. Не использовать custom_id осмысленно. Если это просто индекс — после реорганизации файла теряете связь с исходными данными. Используйте id из вашей БД.
6. Запускать batch без оценки стоимости заранее. Считайте ожидаемый input/output через tokenizer («Как считать токены в LLM»), умножайте на batch-ставки, сверяйте с бюджетом — до запуска.
Оплата и закрывающие документы
Async и Batch — это одни и те же модели через тот же шлюз Promptra. Юрлицо-исполнитель — российское юр.лицо , резидент РФ. Сервисная комиссия 5% берётся только при пополнении баланса, на токены наценки нет. Batch-скидка 50% видна непосредственно в дашборде по статье «Batch usage». Полный пакет закрывающих документов (договор-оферта, счёт на оплату, акт оказанных услуг, счёт-фактура, УПД) приходит через ЭДО — Диадок, СБИС, Контур. Подробнее — на странице «Тарифы».
Что дальше
Async-вызовы — это переход с 0.5 RPS до 30+ на одном ключе и Semaphore'е. Batch API — это −50% к цене токенов за готовность подождать час-два. Зрелый production-стек использует оба: real-time async для пользовательских интерфейсов, batch для оффлайн-обработки и архивных задач. На объёме 100K+ запросов в месяц экономия от Batch измеряется сотнями тысяч рублей. Полезные следующие шаги: «Function calling и tool use» для async-агентов, «Embeddings и векторный поиск» для batch-индексации больших баз, «Streaming LLM-ответов» для real-time UI. Если нужно прикинуть стоимость на вашем трафике или подключить ключ через юрлицо — напишите команде Promptra в Telegram.
📚 Главный гайд по теме: Лучшая нейросеть 2026: какую LLM выбрать под задачу — связанные материалы и обзор всей категории.
Promptra — Russian LLM API aggregator. One OpenAI-compatible endpoint to all flagship models: OpenAI (GPT-5.5, GPT-5.4), Anthropic (Claude Opus 4.7, Sonnet 4.6), Google (Gemini 3.1 Pro, 3.5 Flash), DeepSeek V4 Pro, Qwen 3.6 Plus.
Provider prices 1-to-1 at CBR rate — no markup on tokens. Ruble billing per contract, full closing documents through EDI. No VPN — legal B2B service in Russia.
Try: promptra.ru · model catalog · docs





Top comments (0)