Anthropic and OpenAI count what every customer uses down to the token, broken out by model and token type, and turn those counts into daily, weekly, and monthly caps without adding inference latency. If you're building on top of their APIs, you end up solving the same problem on your side. If you're building something like them, you solve it for everyone at once. Here's one way to do it in Python, using Unimeter (an open-source metering engine) and a working example.
Every snippet below comes from runnable code in unimeter/llm-token-metering; the file paths in the snippets match the paths in the repo.
The data model
The data is small. Each call to Anthropic produces a single usage report, and there are five things you need from it:
| Field | Example | Why |
|---|---|---|
| account | customer_42 |
Who to charge |
| model | claude-opus-4 |
Different rates |
| token_type |
input / output / cached
|
Different rates again |
| count | 1247 |
How many tokens |
| timestamp | 2026-05-12T14:30:00Z |
For period boundaries and caps |
Pricing, plan tiers, invoice line items, dunning — none of that belongs in the metering layer; the metering layer counts. Turning those counts into invoices is upstream work, in your pricing config and in Stripe Billing.
Setup
One container, one Python package:
docker run -d \
--security-opt seccomp:unconfined \
-p 7001:7001 -p 9090:9090 \
-e MY_ADDR=localhost:7001 \
-v unimeter-data:/data \
ghcr.io/unimeter/unimeter:latest \
billing --data-dir=/data
pip install unimeter-python
Port 7001 carries the binary protocol the SDK uses; 9090 exposes Prometheus metrics. MY_ADDR is how the node advertises itself in the partition map the SDK reads on connect. The seccomp flag is required because Unimeter uses io_uring for I/O, which Docker's default profile blocks. The unimeter-data named volume keeps WAL and aggregate state across container restarts.
Defining the metric
Declare one metric with the dimensions you'll group by and the thresholds you want the server to watch. Daily buckets, plus two thresholds that surface as bits in the response when usage crosses them:
from unimeter import (
AggType, AlertThreshold, DimensionFilter,
MetricSchema, PeriodType,
)
METRIC_CODE = "llm_tokens"
DEFAULT_MODELS = [
"claude-opus-4", "claude-sonnet-4", "claude-haiku-4",
]
TOKEN_TYPES = ["input", "output", "cached"]
def build_schema() -> MetricSchema:
return MetricSchema(
code=METRIC_CODE,
agg_type=AggType.SUM,
field_name="tokens",
period_type=PeriodType.DAY,
filters=[
DimensionFilter(key="model", values=DEFAULT_MODELS),
DimensionFilter(key="token_type", values=TOKEN_TYPES),
],
thresholds=[
AlertThreshold(code="daily_soft", value=1_000_000), # warn
AlertThreshold(code="daily_hard", value=5_000_000), # block
],
)
period_type=PeriodType.DAY drives the per-day buckets and threshold resets; weekly and monthly checks reuse the same data with wider queries. filters declares the dimension values that become independently queryable — 3 models × 3 token types gives 9 sub-totals per customer per day. Thresholds keep the order you wrote them in, and you reference them by bit position:
ALERT_BIT_DAILY_SOFT = 1 << 0 # 0b00000001
ALERT_BIT_DAILY_HARD = 1 << 1 # 0b00000010
Recording usage
Wrap your Anthropic call so every response feeds the meter. The function below emits two or three events per response — input and output always, cached when non-zero — and ingest() batches them onto the wire:
class TokenMeter:
def __init__(self, client: AsyncClient):
self._client = client
@staticmethod
def events_for(account_id: int, response) -> list[Event]:
events = [
Event(account_id=account_id, metric_code=METRIC_CODE,
value=response.usage.input_tokens,
properties={"model": response.model, "token_type": "input"}),
Event(account_id=account_id, metric_code=METRIC_CODE,
value=response.usage.output_tokens,
properties={"model": response.model, "token_type": "output"}),
]
if response.usage.cache_read_input_tokens > 0:
events.append(Event(
account_id=account_id, metric_code=METRIC_CODE,
value=response.usage.cache_read_input_tokens,
properties={"model": response.model, "token_type": "cached"},
))
return events
async def meter(self, account_id: int, response) -> IngestResult:
return await self._client.ingest(self.events_for(account_id, response))
In the handler:
resp = await anthropic_client.messages.create(...)
await meter.meter(account_id=42, response=resp)
return resp
ingest returns as soon as Unimeter has accepted the events; the server flushes them to disk in batches a few hundred microseconds later. For events you can't afford to lose on power loss, pass delivery=DeliveryMode.SYNC and the call blocks until the data is durable.
Enforcing caps
Four tiers, top to bottom:
- Soft daily — warn the user, keep serving
- Hard daily — block until midnight UTC
- Weekly quota — block until next Monday; matches Anthropic's Pro/Max weekly window
- Monthly cap — block until the next billing cycle
The two daily tiers cost almost nothing to check, because the server has already done the work. Every AlertThreshold on the metric occupies one bit in alert_flags, and the server flips that bit the moment today's bucket crosses the threshold value. Reading those bits is one realtime query, answered from in-memory state:
rt = await client.query_realtime(account_id, METRIC_CODE)
if rt.alert_flags & ALERT_BIT_DAILY_HARD:
return blocked("daily_block")
# Wider windows aren't a single bucket, so we sum day buckets over
# the window we care about. Two queries and a `>=` comparison each:
week = await client.query(account_id, METRIC_CODE, this_week_utc())
if week.value.sum >= weekly_cap:
return blocked("weekly_cap")
month = await client.query(account_id, METRIC_CODE, current_month())
if month.value.sum >= monthly_cap:
return blocked("monthly_cap")
if rt.alert_flags & ALERT_BIT_DAILY_SOFT:
return warn("daily_warn")
Server-side bit checks handle any cap whose period matches the metric. Anything wider becomes a client-side window sum. The full implementation in src/caps.py wraps this in a CapChecker class, runs the weekly and monthly queries concurrently via asyncio.gather so wall-clock doesn't double when both are active, and orders the results so the shorter cycle wins — a customer who tripped both daily and weekly limits gets unblocked at midnight, not the following Monday.
Wiring it into the handler:
from src.caps import CapChecker, Caps
caps = Caps(weekly_cap=25_000_000, monthly_cap=100_000_000)
checker = CapChecker(client, caps)
decision = await checker.check(account_id)
if not decision.allowed:
raise HTTPException(429, f"limit reached: {decision.reason}")
if decision.reason == "daily_warn":
notify_async(account_id, "You're past your soft daily limit")
Querying for a dashboard
"How many tokens did this customer spend on Opus vs Sonnet this month?" — one call:
from unimeter import current_month
cells = await client.query_breakdown(
account_id, METRIC_CODE, current_month(),
group_by=["model", "token_type"],
)
# cells: dict[frozenset[(key, value), ...]] = AggValue
# {(model=claude-opus-4, token_type=input)} → 2_400_000
# {(model=claude-opus-4, token_type=output)} → 800_000
# {(model=claude-sonnet-4, token_type=input)} → 12_000_000
# ...
query_breakdown is one round trip. The server walks every combination of declared dimension values for this customer and returns only the ones with non-zero usage. The example wraps this in a dataclass-friendly helper at src/dashboard.py and rolls it up by model and by token type for charts.
Pushing to Stripe
At the close of each billing period, query each account and create Stripe invoice line items. The example prints them instead of calling Stripe so it runs without credentials:
RATES = { # USD per million tokens, as of 2026-05
("claude-opus-4", "input"): 15.00,
("claude-opus-4", "output"): 75.00,
("claude-opus-4", "cached"): 1.50,
# ... 12 more rows
}
async def push_to_stripe(client, account_id, period, customer_id):
cells = await client.query_breakdown(
account_id, METRIC_CODE, period,
group_by=["model", "token_type"],
)
for dims, agg in cells.items():
d = dict(dims)
rate = RATES.get((d["model"], d["token_type"]), 0.0)
cost = (agg.sum / 1_000_000) * rate
# stripe.InvoiceItem.create(customer=customer_id, ...)
print(f"{d['model']} / {d['token_type']}: {agg.sum:,} × ${rate}/M = ${cost:.4f}")
Pricing stays in your code. When Anthropic changes prices, you update one Python dict; no historical usage gets rewritten and no aggregates get recomputed.
Performance
Measured on a bare-metal AMD Ryzen 7 7700X (8c/16t) with NVMe storage. Single-node Unimeter, no replicas. All numbers come from a Python client using the SDK shown in this article, with default settings — the same code path a real backend runs.
Async ingest (delivery=DeliveryMode.ASYNC, the default), 10M events across 100K accounts:
| Python ingesters | Throughput |
|---|---|
| 1 | 412K events/sec |
| 2 | 720K events/sec |
| 4 | 810K events/sec |
| 6 | 835K events/sec |
| 8 | 757K events/sec (server saturates) |
One Python process meters around 400K LLM events/sec with default settings. A handful of parallel backend instances doubles that to 835K.
Query latency doesn't move under ingest load. Across every row above — including the 835K events/sec peak — client.query_realtime() (what powers CapChecker.check()) returns in:
| p50 | p99 | |
|---|---|---|
| Idle node | 58 µs | 68 µs |
| 835K events/sec inbound | 56 µs | 96 µs |
The preflight cap check stays under 100 µs even at the 835K events/sec peak — roughly one event ingested per microsecond. Against Anthropic's typical 500ms–5s response time, calling it inline before every LLM request is invisible. The dashboard breakdown across all nine (model, token_type) cells comes back in one round-trip at p50 ~90 µs / p99 ~150 µs.
Sync ingest (delivery=DeliveryMode.SYNC, for events that must survive a crash — hard caps, payment events, audit records):
- p50 115 µs, p99 232 µs, max 381 µs over 5,000 samples
- About 20 µs above a single
dd ... oflag=syncon the same NVMe — most of the budget is the disk, not Unimeter
Sync isn't the default; token counting doesn't need it. Reserve it for the small set of events where losing one is worse than adding 115 µs to the request path.
Memory is predictable. Each aggregate row in the memtable is exactly 96 bytes regardless of how many events fed into it; 1.35M rows = ~130 MB of pure aggregate state. From your customer count × dimension combinations you can do the math up front and decide whether one node fits or you need to partition.
Full bench setup and raw numbers for this article are at bench/results.md. The Unimeter docs also publish a broader benchmarks page covering additional workloads and hardware.
Stability
Distributed-systems bugs are nasty by nature: a leader fails mid-write, a message gets reordered, a disk reports success on an fsync that didn't actually flush. Catching these by running real clusters depends on luck — you have to hit the race in test, not in production.
The engine ships with a deterministic test harness called VOPR. At test time it spins up a virtual cluster, injects faults at every step — leader failures, network partitions, message reorderings, disk errors — and after each step checks invariants: VSR safety (no committed write lost across leader changes), aggregate correctness (agg.sum matches the source events), per-dimension filter consistency, WAL durability. Each scenario runs from a numeric seed, so any invariant violation is a reproducible failure tied to that seed.
Six seeds × 10K iterations currently pass clean. This is a build-time check, not anything running in production — but it's what lets the storage code ship knowing the hard cases have been exercised before users see them.
Try it
git clone https://github.com/unimeter/llm-token-metering
cd llm-token-metering
pdm install -G dev
docker compose up -d
pdm run python smoke.py
That clones the example repo, brings up Unimeter in a fresh container, and runs the whole flow against it: creates the metric, meters mock Anthropic responses, runs the cap check, prints the breakdown, prints a mock Stripe invoice.
Code in this post is Python. If you'd rather use Go, the same protocol and patterns are in unimeter/go-unimeter.
The Python SDK is early-stage and feedback is welcome — especially on the SDK shape and the docs. What would you want from a metering layer for your AI product?
If you're at an LLM provider and the infrastructure side of usage-based billing is something you're actively building or using, drop me a line — 0x180db@gmail.com.
Top comments (0)