DEV Community

Cover image for Meter LLM usage like Anthropic — tokens, models, weekly and monthly caps
0x180db
0x180db

Posted on

Meter LLM usage like Anthropic — tokens, models, weekly and monthly caps

Anthropic and OpenAI count what every customer uses down to the token, broken out by model and token type, and turn those counts into daily, weekly, and monthly caps without adding inference latency. If you're building on top of their APIs, you end up solving the same problem on your side. If you're building something like them, you solve it for everyone at once. Here's one way to do it in Python, using Unimeter (an open-source metering engine) and a working example.

Every snippet below comes from runnable code in unimeter/llm-token-metering; the file paths in the snippets match the paths in the repo.

The data model

The data is small. Each call to Anthropic produces a single usage report, and there are five things you need from it:

Field Example Why
account customer_42 Who to charge
model claude-opus-4 Different rates
token_type input / output / cached Different rates again
count 1247 How many tokens
timestamp 2026-05-12T14:30:00Z For period boundaries and caps

Pricing, plan tiers, invoice line items, dunning — none of that belongs in the metering layer; the metering layer counts. Turning those counts into invoices is upstream work, in your pricing config and in Stripe Billing.

Setup

One container, one Python package:

docker run -d \
  --security-opt seccomp:unconfined \
  -p 7001:7001 -p 9090:9090 \
  -e MY_ADDR=localhost:7001 \
  -v unimeter-data:/data \
  ghcr.io/unimeter/unimeter:latest \
  billing --data-dir=/data

pip install unimeter-python
Enter fullscreen mode Exit fullscreen mode

Port 7001 carries the binary protocol the SDK uses; 9090 exposes Prometheus metrics. MY_ADDR is how the node advertises itself in the partition map the SDK reads on connect. The seccomp flag is required because Unimeter uses io_uring for I/O, which Docker's default profile blocks. The unimeter-data named volume keeps WAL and aggregate state across container restarts.

Defining the metric

Declare one metric with the dimensions you'll group by and the thresholds you want the server to watch. Daily buckets, plus two thresholds that surface as bits in the response when usage crosses them:

from unimeter import (
    AggType, AlertThreshold, DimensionFilter,
    MetricSchema, PeriodType,
)

METRIC_CODE = "llm_tokens"

DEFAULT_MODELS = [
    "claude-opus-4", "claude-sonnet-4", "claude-haiku-4",
]
TOKEN_TYPES = ["input", "output", "cached"]


def build_schema() -> MetricSchema:
    return MetricSchema(
        code=METRIC_CODE,
        agg_type=AggType.SUM,
        field_name="tokens",
        period_type=PeriodType.DAY,
        filters=[
            DimensionFilter(key="model",      values=DEFAULT_MODELS),
            DimensionFilter(key="token_type", values=TOKEN_TYPES),
        ],
        thresholds=[
            AlertThreshold(code="daily_soft", value=1_000_000),   # warn
            AlertThreshold(code="daily_hard", value=5_000_000),   # block
        ],
    )
Enter fullscreen mode Exit fullscreen mode

period_type=PeriodType.DAY drives the per-day buckets and threshold resets; weekly and monthly checks reuse the same data with wider queries. filters declares the dimension values that become independently queryable — 3 models × 3 token types gives 9 sub-totals per customer per day. Thresholds keep the order you wrote them in, and you reference them by bit position:

ALERT_BIT_DAILY_SOFT = 1 << 0  # 0b00000001
ALERT_BIT_DAILY_HARD = 1 << 1  # 0b00000010
Enter fullscreen mode Exit fullscreen mode

Recording usage

Wrap your Anthropic call so every response feeds the meter. The function below emits two or three events per response — input and output always, cached when non-zero — and ingest() batches them onto the wire:

class TokenMeter:
    def __init__(self, client: AsyncClient):
        self._client = client

    @staticmethod
    def events_for(account_id: int, response) -> list[Event]:
        events = [
            Event(account_id=account_id, metric_code=METRIC_CODE,
                  value=response.usage.input_tokens,
                  properties={"model": response.model, "token_type": "input"}),
            Event(account_id=account_id, metric_code=METRIC_CODE,
                  value=response.usage.output_tokens,
                  properties={"model": response.model, "token_type": "output"}),
        ]
        if response.usage.cache_read_input_tokens > 0:
            events.append(Event(
                account_id=account_id, metric_code=METRIC_CODE,
                value=response.usage.cache_read_input_tokens,
                properties={"model": response.model, "token_type": "cached"},
            ))
        return events

    async def meter(self, account_id: int, response) -> IngestResult:
        return await self._client.ingest(self.events_for(account_id, response))
Enter fullscreen mode Exit fullscreen mode

In the handler:

resp = await anthropic_client.messages.create(...)
await meter.meter(account_id=42, response=resp)
return resp
Enter fullscreen mode Exit fullscreen mode

ingest returns as soon as Unimeter has accepted the events; the server flushes them to disk in batches a few hundred microseconds later. For events you can't afford to lose on power loss, pass delivery=DeliveryMode.SYNC and the call blocks until the data is durable.

Enforcing caps

Four tiers, top to bottom:

  1. Soft daily — warn the user, keep serving
  2. Hard daily — block until midnight UTC
  3. Weekly quota — block until next Monday; matches Anthropic's Pro/Max weekly window
  4. Monthly cap — block until the next billing cycle

The two daily tiers cost almost nothing to check, because the server has already done the work. Every AlertThreshold on the metric occupies one bit in alert_flags, and the server flips that bit the moment today's bucket crosses the threshold value. Reading those bits is one realtime query, answered from in-memory state:

rt = await client.query_realtime(account_id, METRIC_CODE)

if rt.alert_flags & ALERT_BIT_DAILY_HARD:
    return blocked("daily_block")

# Wider windows aren't a single bucket, so we sum day buckets over
# the window we care about. Two queries and a `>=` comparison each:
week = await client.query(account_id, METRIC_CODE, this_week_utc())
if week.value.sum >= weekly_cap:
    return blocked("weekly_cap")

month = await client.query(account_id, METRIC_CODE, current_month())
if month.value.sum >= monthly_cap:
    return blocked("monthly_cap")

if rt.alert_flags & ALERT_BIT_DAILY_SOFT:
    return warn("daily_warn")
Enter fullscreen mode Exit fullscreen mode

Server-side bit checks handle any cap whose period matches the metric. Anything wider becomes a client-side window sum. The full implementation in src/caps.py wraps this in a CapChecker class, runs the weekly and monthly queries concurrently via asyncio.gather so wall-clock doesn't double when both are active, and orders the results so the shorter cycle wins — a customer who tripped both daily and weekly limits gets unblocked at midnight, not the following Monday.

Wiring it into the handler:

from src.caps import CapChecker, Caps

caps = Caps(weekly_cap=25_000_000, monthly_cap=100_000_000)
checker = CapChecker(client, caps)

decision = await checker.check(account_id)
if not decision.allowed:
    raise HTTPException(429, f"limit reached: {decision.reason}")
if decision.reason == "daily_warn":
    notify_async(account_id, "You're past your soft daily limit")
Enter fullscreen mode Exit fullscreen mode

Querying for a dashboard

"How many tokens did this customer spend on Opus vs Sonnet this month?" — one call:

from unimeter import current_month

cells = await client.query_breakdown(
    account_id, METRIC_CODE, current_month(),
    group_by=["model", "token_type"],
)
# cells: dict[frozenset[(key, value), ...]] = AggValue
# {(model=claude-opus-4, token_type=input)}    → 2_400_000
# {(model=claude-opus-4, token_type=output)}   → 800_000
# {(model=claude-sonnet-4, token_type=input)}  → 12_000_000
# ...
Enter fullscreen mode Exit fullscreen mode

query_breakdown is one round trip. The server walks every combination of declared dimension values for this customer and returns only the ones with non-zero usage. The example wraps this in a dataclass-friendly helper at src/dashboard.py and rolls it up by model and by token type for charts.

Pushing to Stripe

At the close of each billing period, query each account and create Stripe invoice line items. The example prints them instead of calling Stripe so it runs without credentials:

RATES = {  # USD per million tokens, as of 2026-05
    ("claude-opus-4",   "input"):  15.00,
    ("claude-opus-4",   "output"): 75.00,
    ("claude-opus-4",   "cached"):  1.50,
    # ... 12 more rows
}

async def push_to_stripe(client, account_id, period, customer_id):
    cells = await client.query_breakdown(
        account_id, METRIC_CODE, period,
        group_by=["model", "token_type"],
    )
    for dims, agg in cells.items():
        d = dict(dims)
        rate = RATES.get((d["model"], d["token_type"]), 0.0)
        cost = (agg.sum / 1_000_000) * rate
        # stripe.InvoiceItem.create(customer=customer_id, ...)
        print(f"{d['model']} / {d['token_type']}: {agg.sum:,} × ${rate}/M = ${cost:.4f}")
Enter fullscreen mode Exit fullscreen mode

Pricing stays in your code. When Anthropic changes prices, you update one Python dict; no historical usage gets rewritten and no aggregates get recomputed.

Performance

Measured on a bare-metal AMD Ryzen 7 7700X (8c/16t) with NVMe storage. Single-node Unimeter, no replicas. All numbers come from a Python client using the SDK shown in this article, with default settings — the same code path a real backend runs.

Async ingest (delivery=DeliveryMode.ASYNC, the default), 10M events across 100K accounts:

Python ingesters Throughput
1 412K events/sec
2 720K events/sec
4 810K events/sec
6 835K events/sec
8 757K events/sec (server saturates)

One Python process meters around 400K LLM events/sec with default settings. A handful of parallel backend instances doubles that to 835K.

Query latency doesn't move under ingest load. Across every row above — including the 835K events/sec peak — client.query_realtime() (what powers CapChecker.check()) returns in:

p50 p99
Idle node 58 µs 68 µs
835K events/sec inbound 56 µs 96 µs

The preflight cap check stays under 100 µs even at the 835K events/sec peak — roughly one event ingested per microsecond. Against Anthropic's typical 500ms–5s response time, calling it inline before every LLM request is invisible. The dashboard breakdown across all nine (model, token_type) cells comes back in one round-trip at p50 ~90 µs / p99 ~150 µs.

Sync ingest (delivery=DeliveryMode.SYNC, for events that must survive a crash — hard caps, payment events, audit records):

  • p50 115 µs, p99 232 µs, max 381 µs over 5,000 samples
  • About 20 µs above a single dd ... oflag=sync on the same NVMe — most of the budget is the disk, not Unimeter

Sync isn't the default; token counting doesn't need it. Reserve it for the small set of events where losing one is worse than adding 115 µs to the request path.

Memory is predictable. Each aggregate row in the memtable is exactly 96 bytes regardless of how many events fed into it; 1.35M rows = ~130 MB of pure aggregate state. From your customer count × dimension combinations you can do the math up front and decide whether one node fits or you need to partition.

Full bench setup and raw numbers for this article are at bench/results.md. The Unimeter docs also publish a broader benchmarks page covering additional workloads and hardware.

Stability

Distributed-systems bugs are nasty by nature: a leader fails mid-write, a message gets reordered, a disk reports success on an fsync that didn't actually flush. Catching these by running real clusters depends on luck — you have to hit the race in test, not in production.

The engine ships with a deterministic test harness called VOPR. At test time it spins up a virtual cluster, injects faults at every step — leader failures, network partitions, message reorderings, disk errors — and after each step checks invariants: VSR safety (no committed write lost across leader changes), aggregate correctness (agg.sum matches the source events), per-dimension filter consistency, WAL durability. Each scenario runs from a numeric seed, so any invariant violation is a reproducible failure tied to that seed.

Six seeds × 10K iterations currently pass clean. This is a build-time check, not anything running in production — but it's what lets the storage code ship knowing the hard cases have been exercised before users see them.

Try it

git clone https://github.com/unimeter/llm-token-metering
cd llm-token-metering
pdm install -G dev
docker compose up -d
pdm run python smoke.py
Enter fullscreen mode Exit fullscreen mode

That clones the example repo, brings up Unimeter in a fresh container, and runs the whole flow against it: creates the metric, meters mock Anthropic responses, runs the cap check, prints the breakdown, prints a mock Stripe invoice.

Code in this post is Python. If you'd rather use Go, the same protocol and patterns are in unimeter/go-unimeter.

The Python SDK is early-stage and feedback is welcome — especially on the SDK shape and the docs. What would you want from a metering layer for your AI product?

If you're at an LLM provider and the infrastructure side of usage-based billing is something you're actively building or using, drop me a line — 0x180db@gmail.com.

Top comments (0)