DEV Community

Cover image for How to Process Azure Cosmos DB Change Streams in Parallel with Java (and Stop Leaving Throughput on the Table)
Ankit Sood
Ankit Sood

Posted on

How to Process Azure Cosmos DB Change Streams in Parallel with Java (and Stop Leaving Throughput on the Table)

You have a Cosmos DB collection with dozens of physical partitions and millions of documents. You need to migrate them or stream changes in real time to another system. You open a single change stream cursor and watch it crawl through one partition at a time, burning hours that should take minutes.

The bottleneck isn't Cosmos DB. It's the single-threaded cursor you're reading with. Cosmos DB shards data across physical partitions, but a vanilla change stream reads them sequentially. Every partition waits its turn. Your provisioned RUs sit idle while one thread does all the work.

This post walks through a Java implementation that opens one change stream per physical partition, processes them concurrently, and handles the operational details that tutorials usually skip: batching, RU consumption tracking, resume token checkpointing, and retry with exponential backoff.

TL;DR

  • Cosmos DB's GetChangeStreamTokens custom action returns one resume token per physical partition. Use it to fan out change streams in parallel.
  • A thread-per-partition model with ExecutorService lets you saturate your provisioned throughput instead of processing on a single cursor.
  • Batch documents before writing to your target system to reduce round-trips and amortise overhead.
  • Persist resume tokens per partition so you can restart from exactly where you left off and not from the beginning.
  • Track RU consumption per partition in real time to reduce cost and avoid surprise throttling.

Prerequisites

  • Java 21 (the implementation uses virtual-thread-friendly patterns, though it works on platform threads too)
  • MongoDB Java driver 4.11+ (for Cosmos DB's MongoDB API compatibility)
  • An Azure Cosmos DB account with the MongoDB API enabled
  • Familiarity with change streams, Cosmos DB's physical partition model, and Java concurrency (ExecutorService, ConcurrentHashMap, atomics)

Table of Contents

The Problem with Single Cursor Change Streams

A standard MongoDB change stream opens one cursor against the collection. On a vanilla MongoDB replica set, that's fine. There is one oplog and the cursor tails it. But Cosmos DB distributes data across physical partitions, and a single change stream cursor reads them one after another. If you have 20 physical partitions, your throughput is 1/20th of what it could be.

The real cost isn't just wall-clock time. It's the RUs you've provisioned sitting unused while one partition drains. If you're paying for 10,000 RU/s and your single cursor uses 500 RU/s, you're wasting the rest.

The fix is straightforward: open one cursor per physical partition and read them all simultaneously. Cosmos DB gives you the mechanism to do this but it's a custom action, not part of the standard MongoDB API, and the documentation doesn't make the implementation obvious.

Getting Partition Level Resume Tokens

Cosmos DB exposes a custom command called GetChangeStreamTokens that returns one resume token per physical partition. Each token is a cursor starting point that tells Cosmos DB: "give me changes from this partition, starting at this timestamp."

You can test this directly from NoSQL Booster, mongosh, or any MongoDB shell before writing a line of Java. Connect to your Cosmos DB account and run:

db.runCommand({
  customAction: "GetChangeStreamTokens",
  collection: "yourCollection",
  startAtOperationTime: Timestamp(1621900800, 0)
})
Enter fullscreen mode Exit fullscreen mode

Replace "yourCollection" with your collection name, and the timestamp with the Unix epoch seconds for your desired start date. To start from 30 days ago, you can compute it inline:

var twoYearsAgo = Math.floor((Date.now() - (144 * 30 * 24 * 60 * 60 * 1000)) / 1000);

db.runCommand({
  customAction: "GetChangeStreamTokens",
  collection: "yourCollection",
  startAtOperationTime: Timestamp(twoYearsAgo, 0)
})
Enter fullscreen mode Exit fullscreen mode

Cosmos DB responds with an array of resume tokens — one per physical partition:

{
  "resumeAfterTokens": [
    { "_data": { "$binary": "...", "$type": "00" }, "_kind": { "$numberInt": "1" } },
    { "_data": { "$binary": "...", "$type": "00" }, "_kind": { "$numberInt": "1" } },
    { "_data": { "$binary": "...", "$type": "00" }, "_kind": { "$numberInt": "1" } }
  ],
  "ok": 1
}
Enter fullscreen mode Exit fullscreen mode

Each entry in resumeAfterTokens maps to one physical partition. Three entries means three partitions and three change stream cursors you can open in parallel.

Here's the Java code that builds that command and parses the response:

public List<BsonDocument> getChangeStreamTokens(LocalDateTime startDate) {
    long epochSeconds = startDate.toEpochSecond(ZoneOffset.UTC);
    BSONTimestamp timestamp = new BSONTimestamp((int) epochSeconds, 0);

    Document command = new Document()
            .append("customAction", "GetChangeStreamTokens")
            .append("collection", collection.getNamespace().getCollectionName())
            .append("startAtOperationTime", timestamp);

    Document result = database.runCommand(command);
    List<BsonDocument> tokens = new ArrayList<>();

    if (result.containsKey("resumeAfterTokens")) {
        List<Document> cursors = (List<Document>) result.get("resumeAfterTokens");
        for (Document cursor : cursors) {
            // Convert the entire cursor Document to BsonDocument
            // This preserves the complete token structure Cosmos DB expects
            BsonDocument token = cursor.toBsonDocument(
                BsonDocument.class, collection.getCodecRegistry());
            tokens.add(token);
        }
    }

    log.info("Retrieved {} partition tokens", tokens.size());
    return tokens;
}
Enter fullscreen mode Exit fullscreen mode

A few things to notice here. The startAtOperationTime parameter controls how far back you read. You can set it to five, six or seven years ago for a full migration, or to thirty seconds ago for near real-time streaming. The number of tokens returned tells you how many physical partitions Cosmos DB has allocated for your collection. That number changes as your data grows, so don't hardcode it.

The token conversion line deserves attention. You might be tempted to extract just the _data field from each token. Don't. The full token structure includes metadata that Cosmos DB needs to route the cursor to the correct partition. Converting the entire Document to BsonDocument via the codec registry preserves that structure.

The Thread Per Partition Architecture

Once you have one token per partition, the architecture is simple: spin up one thread per token, and let each thread process its own change stream independently.

public void startParallelChangeStreams(List<BsonDocument> tokens) {
    List<Future<?>> futures = new ArrayList<>();

    for (int i = 0; i < tokens.size(); i++) {
        final BsonDocument resumeToken = tokens.get(i);
        final int partitionIndex = i;
        partitionStats.put(partitionIndex, new PartitionStats(partitionIndex));

        Future<?> future = executorService.submit(() -> {
            changeStreamProcessor.processChangeStream(
                resumeToken, partitionIndex, partitionStats);
        });

        futures.add(future);
    }

    log.info("Started {} change stream threads", tokens.size());
    startMetricsReporter();
}
Enter fullscreen mode Exit fullscreen mode

The ExecutorService is sized to match the token count — one thread per partition. This is deliberate. Each thread blocks on its cursor's hasNext() call waiting for the next change event. You're not doing CPU-bound work here; you're waiting on network I/O. A fixed thread pool sized to the partition count is the right model.

The PartitionStats map gives you per-partition visibility: how many documents each partition has processed, how many RUs it's consumed, whether it's completed or failed. Without this, debugging a stalled migration is guesswork.

The PartitionStats Model

Each partition tracks its own metrics independently using atomics for thread safety:

public class PartitionStats {
    private final int partitionIndex;
    private final AtomicLong processed = new AtomicLong(0);
    private final AtomicLong errors = new AtomicLong(0);
    private final AtomicLong ruConsumed = new AtomicLong(0);
    private volatile boolean completed = false;
    private volatile boolean failed = false;

    // ... increment methods, getters
}
Enter fullscreen mode Exit fullscreen mode

The ruConsumed field stores values multiplied by 100 as longs (to avoid AtomicDouble, which doesn't exist in the standard library) and divides on read. A small trick that avoids pulling in Guava just for one atomic type.

Processing Changes in Batches

Reading one document at a time from the change stream and writing it one at a time to your target system is the second throughput killer after single-cursor reads. Batching amortises the overhead of each write operation.

The processor accumulates documents until it hits a batch size threshold, then flushes them through a MigrationCallback interface:

ChangeStreamIterable<Document> changeStream = collection.watch(pipeline)
        .fullDocument(FullDocument.UPDATE_LOOKUP)
        .resumeAfter(resumeToken)
        .batchSize(batchSize);

List<ChangeStreamDocument<Document>> batch = new ArrayList<>();

try (MongoCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor()) {
    while (!shutdownRequested && cursor.hasNext()) {
        ChangeStreamDocument<Document> change = cursor.next();
        batch.add(change);

        BsonDocument currentResumeToken = change.getResumeToken();

        if (batch.size() >= batchSize) {
            processBatch(batch, partitionIndex, stats);
            batch.clear();
        }
    }

    // Flush remaining documents
    if (!batch.isEmpty()) {
        processBatch(batch, partitionIndex, stats);
    }

    stats.setCompleted(true);
}
Enter fullscreen mode Exit fullscreen mode

The MigrationCallback is an interface with a single method — onBatch(List<Document> documents, int partitionIndex). That abstraction is the entire pluggability story. Swap in a Google Cloud Storage writer, a Kafka producer, a Spanner batch insert, or a test double. The change stream processing logic doesn't change.

public interface MigrationCallback {
    void onBatch(List<Document> documents, int partitionIndex) throws Exception;
}
Enter fullscreen mode Exit fullscreen mode

The pipeline filters for insert, update, and replace operations and projects only the fields you need (_id, fullDocument, ns, documentKey). This matters for RU cost — every field Cosmos DB returns costs RUs, and if you're processing millions of documents, projecting only what you need adds up.

Fault Tolerance: Resume Tokens and Retry

Migrations fail. Networks drop. Cosmos DB throttles you when you exceed provisioned throughput. The question isn't whether something will go wrong — it's whether you can pick up where you left off without reprocessing millions of documents.

Resume Token Checkpointing

Every change stream event carries a resume token. Persist it after each batch, and you can restart from exactly that point:

public void persistResumeToken(int partitionIndex, BsonDocument resumeToken,
        ConcurrentHashMap<Integer, PartitionStats> partitionStats) {
    MongoCollection<Document> checkpointCollection =
            database.getCollection("changeStreamCheckpoints");

    Document checkpoint = new Document()
            .append("partitionIndex", partitionIndex)
            .append("resumeToken", resumeToken)
            .append("timestamp", Instant.now())
            .append("documentsProcessed",
                    partitionStats.get(partitionIndex).getProcessed());

    checkpointCollection.updateOne(
            Filters.eq("partitionIndex", partitionIndex),
            new Document("$set", checkpoint),
            new UpdateOptions().upsert(true));
}
Enter fullscreen mode Exit fullscreen mode

The checkpoint includes the document count alongside the token — so when you resume, you know both where you are and how far you've already come. The upsert ensures you're always updating the same checkpoint document per partition rather than accumulating stale rows.

Exponential Backoff with Jitter

When a partition fails, the retry logic uses exponential backoff with 30% jitter to avoid the thundering herd problem — multiple partitions failing simultaneously and retrying at the exact same time:

private void handleError(BsonDocument resumeToken, int partitionIndex,
        int retryCount,
        ConcurrentHashMap<Integer, PartitionStats> partitionStats) {
    long backoffMs = generateRandomBackoffTime(retryCount);
    log.warn("Retrying partition {} after {}ms (attempt {})",
             partitionIndex, backoffMs, (retryCount + 1));

    try {
        Thread.sleep(backoffMs);
    } catch (InterruptedException ie) {
        Thread.currentThread().interrupt();
        return;
    }
    processChangeStream(resumeToken, partitionIndex, partitionStats);
}

private long generateRandomBackoffTime(int retryCount) {
    long baseBackoff = 100 * (long) Math.pow(2, retryCount);
    long jitter = (long) (Math.random() * baseBackoff * 0.3);
    return baseBackoff + jitter;
}
Enter fullscreen mode Exit fullscreen mode

After three failed retries, the partition is marked as failed and stops retrying. The metrics reporter surfaces this immediately so you can investigate rather than burning retries silently.

Tracking RU Consumption

This is the one that catches people off guard. You can provision 10,000 RU/s, open 20 parallel change streams, and still get throttled — because change streams consume RUs, and the consumption isn't evenly distributed across partitions.

The processor tracks RU usage per partition by calling getLastRequestStatistics after each batch:

public Document getRUsUsed(int partitionIndex) {
    Document stats = database.runCommand(
        new Document("getLastRequestStatistics", 1));
    return stats;
}
Enter fullscreen mode Exit fullscreen mode

The metrics reporter aggregates these into a live dashboard printed every 10 seconds:

=== Migration Statistics ===
Total Processed: 142,857
Total Errors: 0
Total RUs Consumed: 8,432.50
Active Partitions: 18
Completed Partitions: 2
Failed Partitions: 0
===========================
Enter fullscreen mode Exit fullscreen mode

Without this, you're flying blind. You might complete a migration in two hours, look at your Azure bill, and discover you burned through RUs at 3x your expected rate because a few hot partitions were doing disproportionate work.

High Level Diagram

High Level Diagram


The Part Nobody Tells You

Token format is fragile. The first version of this code extracted only the _data field from each resume token. It worked in local testing but failed against a production Cosmos DB account with multiple physical partitions. The full token structure includes routing metadata that Cosmos DB needs to direct the cursor to the correct partition. Always convert the entire Document — not just one field.

Batch size is a tradeoff you'll tune more than once. Too small, and you're making too many round-trips to your target system. Too large, and you hold too many documents in memory across 20+ concurrent threads. A batch size of 20 was the sweet spot for a collection averaging 4KB per document — your mileage will vary based on document size and target system latency.

Graceful shutdown is harder than it looks. The shutdownRequested flag checked in each cursor loop works for clean shutdowns, but the MongoCursor might be blocking on hasNext() when the flag flips. Closing the MongoClient from the shutdown hook forces the cursors to throw, which the catch block handles but you need both mechanisms working together.

FAQ

Q: How many physical partitions will my Cosmos DB collection have?
A: It depends on your data size and provisioned throughput. Cosmos DB allocates physical partitions automatically — roughly one per 50 GB of data or 10,000 RU/s of throughput. Call GetChangeStreamTokens to find out your current count; don't guess.

Q: Can I use this with Cosmos DB's serverless tier?
A: Change streams work on serverless, but GetChangeStreamTokens is a custom action tied to the provisioned throughput model. On serverless, you'll typically have fewer physical partitions and the parallelism benefit is smaller. Test it but the single-cursor approach might be sufficient for serverless workloads.

Q: What happens if Cosmos DB adds or removes physical partitions during a migration?
A: The token set you retrieved at startup remains valid for those partitions. New partitions created after you called GetChangeStreamTokens won't be covered. For long-running migrations, periodically re-fetch tokens and diff against what you already have to pick up new partitions.

Q: Why not use Cosmos DB's built-in change feed processor library instead?
A: The change feed processor in the Azure Cosmos DB SDK (not the MongoDB API) handles partition management automatically. But if you're on the MongoDB API — which many teams choose for driver compatibility — that library isn't available. This approach gives you the same parallelism on the MongoDB wire protocol.

Q: How do I size the thread pool if I have 100+ partitions?
A: One thread per partition works up to roughly 50–60 partitions on a standard JVM. Beyond that, consider virtual threads (Project Loom, Java 21+) or a work-stealing pool. Each thread is I/O-bound, not CPU-bound, so the constraint is memory for thread stacks, not core count.

Wrapping Up

The single most important insight is this: Cosmos DB's physical partition model is your friend, not your enemy but only if you read from each partition independently. A single change stream cursor leaves most of your provisioned throughput unused. The GetChangeStreamTokens command unlocks that parallelism, and the rest is standard Java concurrency with careful operational hygiene around batching, checkpointing, and RU tracking.

Have you built something similar: parallel change stream readers, or a different approach to high-throughput Cosmos DB migrations? I'd be curious to hear what worked for your team, especially around partition rebalancing during long-running jobs.

Top comments (0)