OpenAI's Batch API lets you run large asynchronous workloads (evaluations, RAG, embeddings, and more) at a 50% cost discount. But checking batch status still uses normal API requests, so naive polling across many batches can create unnecessary traffic and increase your chances of hitting rate limits (429s) or crowding out interactive calls.
Simple polling with a fixed delay doesn't scale: batch sizes vary, you need backoff on errors, pause controls, and clean cancellation. The rate-limits guide shows Python backoff patterns; this article walks through a corresponding solution for Node/TypeScript using RxJS and rxjs-poll - a polling operator that keeps status checks adaptive and predictable.
The Problem: Simple Polling
Here's a typical first attempt at polling the status of a batch:
async function pollBatchStatus(batchId: string): Promise<BatchResult> {
while (true) {
const res = await fetch(`https://api.openai.com/v1/batches/${batchId}`, {
headers: { Authorization: `Bearer ${API_KEY}` },
});
const batch = await res.json();
if (batch.status === 'completed') {
return batch;
}
if (['failed', 'expired', 'cancelled'].includes(batch.status)) {
throw new Error(batch.errors?.message ?? batch.status);
}
await sleep(5000); // Fixed delay: too aggressive?
}
}
Problems can pile up quickly:
- Fixed delay: 5 seconds might hammer the API. You want different intervals for "normal" polling vs. error retries.
- No retry strategy: A transient 5xx or network blip kills the loop. You need exponential backoff and a retry limit.
- No pause: You can't throttle or suspend polling. With many batches or concurrent pollers you burn request quota and risk 429s.
- Hard to cancel: Composing with other streams is awkward.
Introducing rxjs-poll
To fix those issues without hand-rolling loops and timers, use a single operator: rxjs-poll. It turns any completing observable into a polling stream and gives you:
- Separate control over normal poll delays and error retry timing
- Built-in strategies: constant, linear, exponential, random, or dynamic (custom logic)
-
Pause controls: an external notifier (
Observable<boolean>) and/or automatic pause when the tab is hidden - Composability: it's just an operator - drop it into any pipe
Install it:
npm install rxjs-poll --save
A minimal example:
import { poll } from 'rxjs-poll';
import { takeWhile } from 'rxjs';
request$
.pipe(
poll(),
takeWhile(({ status }) => status !== 'done', true)
)
.subscribe({ next: console.log });
The source observable should emit once and complete (as HTTP requests typically do); poll then re-subscribes after a delay. You stop with takeWhile, takeUntil, or similar.
How to Wire It Up
You need an RxJS-capable HTTP client and rxjs-poll. In the following example, createBatch$(inputFileId) and getBatchStatus$(id) are single-shot HTTP observables that emit once and complete; we use them to create a batch, poll its status with a dynamic delay driven by the last response, retry with backoff on errors, and support pausing via a notifier.
const MAX_REQUESTS = 50_000;
const TERMINAL = ['completed', 'failed', 'expired', 'cancelled'] as const;
const pause$ = new Subject<boolean>();
const calculateDelay = (n: number, min: number, max: number): number => {
const normalized = Math.max(1, Math.min(MAX_REQUESTS, n));
const logRatio = Math.log(normalized) / Math.log(MAX_REQUESTS);
return Math.round(min + logRatio * (max - min));
};
createBatch$(inputFileId)
.pipe(
switchMap((batch) =>
getBatchStatus$(batch.id).pipe(
poll({
type: 'repeat',
delay: {
strategy: 'dynamic',
time: ({ value }) => {
const status = value.status;
const { total } = value.request_counts;
// Validation/preparation phases (quick checks)
if (status === 'validating' || status === 'finalizing') {
return calculateDelay(total, 3_000, 30_000);
}
// Cancelling (may take up to 10 minutes)
if (status === 'cancelling') {
return calculateDelay(total, 10_000, 120_000);
}
// Active processing and fallback
return calculateDelay(total, 15_000, 300_000);
},
},
retry: {
strategy: 'dynamic',
// Exponential backoff + jitter
time: ({ consecutiveRetryCount }) => {
const delay = 15_000;
const offset = 1_000;
const exponential =
Math.pow(2, consecutiveRetryCount - 1) * delay;
return [exponential - offset, exponential + offset];
},
limit: 5,
consecutiveOnly: true,
},
// Pause on demand
pause: { notifier: pause$ },
})
)
),
takeWhile((batch) => !TERMINAL.includes(batch.status), true)
)
.subscribe({
next: (batch) => console.log(batch.status, batch.request_counts),
});
Above, getBatchStatus$(id) calls GET /v1/batches/:id and returns the batch object. These polling requests count against your normal request and token rate limits (RPM/TPM), while batch execution uses separate limits (Batch API, Rate limits). The configuration works as follows:
-
Delay:
value.statusandvalue.request_counts.total(batch size) determine how long to wait, with delays scaled smoothly over 1-50,000 requests so small batches are checked more often than large ones. -
Retry: errors use exponential backoff with jitter and a limit of 5 consecutive attempts (
consecutiveOnly: true). -
Pause:
pause: { notifier: pause$ }accepts anObservable<boolean>(e.g. aSubject); when it emitstrue, polling pauses; when it emitsfalse, it resumes.
DISCLAIMER: Treat the delay, retry, and pause settings here as a template, not prescriptions: adjust the numbers for your traffic patterns and tier, and use rate-limit headers (for example, x-ratelimit-remaining-requests and x-ratelimit-reset-requests) to slow down before you hit hard limits.
Coordinate pauses across multiple batches
Multiple pause streams let you stop specific groups of batches, or everything at once, without changing any polling code.
const globalPause$ = new BehaviorSubject<boolean>(false);
const rateLimitPause$ = new BehaviorSubject<boolean>(false);
const maintenancePause$ = new BehaviorSubject<boolean>(false);
type BatchGroup = 'rate-sensitive' | 'maintenance' | 'normal';
type TrackedBatch = {
id: string; // real OpenAI batch.id
group: BatchGroup; // your app's classification
};
function getPausers(group: BatchGroup): BehaviorSubject<boolean>[] {
const base = [globalPause$];
if (group === 'rate-sensitive') {
base.push(rateLimitPause$);
}
if (group === 'maintenance') {
base.push(maintenancePause$);
}
return base;
}
function pollBatch$(batch: TrackedBatch): Observable<Batch> {
const pausers = getPausers(batch.group);
return getBatchStatus$(batch.id).pipe(
poll({
// Reuse delay/retry from the main example
pause: {
notifier: combineLatest(pausers).pipe(
map((values) => values.some(Boolean))
),
},
}),
takeWhile((batch) => !TERMINAL.includes(batch.status), true)
);
}
const batches: TrackedBatch[] = [
{ id: 'batch_abc123', group: 'rate-sensitive' },
{ id: 'batch_def456', group: 'maintenance' },
{ id: 'batch_ghi789', group: 'normal' },
];
const streams = batches.map(pollBatch$);
merge(...streams).subscribe((batch) => {
console.log(batch.id, batch.status);
});
rateLimitPause$.next(true); // pause only rate-sensitive
maintenancePause$.next(true); // pause only maintenance
globalPause$.next(true); // pause all
Conclusion
Adaptive delays and exponential backoff keep polling within rate limits and reduce the risk of 429s crowding out interactive traffic. For small or short-lived batches a simple fixed delay may be enough; dynamic timing and backoff shine when you have many concurrent batches, large request counts, or strict shared quotas.
The same pattern applies to batch embeddings, moderations, and other long-running ML pipelines, where rxjs-poll keeps polling predictable and batch usage under control.
If you found this post helpful, feel free to share it with others who might benefit.
Thanks for reading!
Top comments (0)