You can simplify the process of reading the change feed on Azure Cosmos DB thanks to the Change Feed Processor
As part of v3 of the Azure Cosmos DB .NET SDK, The Cosmos DB integrated the Change Feed Processor as part of the library. Previously in v2, it had it’s own library that you had to download separately in order to use it.
Using C#, we can work with the Change Feed in a variety of different ways. The easiest way to implement the Change Feed is through Azure Functions. You can create a Function with a Cosmos DB Trigger to listen to the container that way. You can also use the Azure Cosmos DB SQL API SDK, which provides a low-level control of the Change Feed. Finally, you can read the Change Feed by using the Change Feed Processor Library.
In this article, I’m going to talk about the Change Feed Processor, break down the specific parts in the code that make it work and conceptualize this using a demo. This demo will read events from a source container in our Cosmos DB account and persist these changes in a backup container in Cosmos DB. You could alternatively use another data store (Such as Blob Storage or Azure SQL), but I’ll keep it to Cosmos DB to keep it simple for now.
Wait, I’m new to this. What is the Change Feed in Cosmos DB?
In Cosmos DB, we use the Change Feed to listen to a container in our Cosmos databases to detect any changes within that container. The Change Feed will then return a list of sorted documents in the order in which they were modified.
This is a fantastic feature that allows us to trigger events based off our activity within our Cosmos DB containers. This article will demonstrate how we can use the Change Feed to move data from our Cosmos DB collections into another type of data store, but you can also use the Change Feed for other use cases such as stream processing or triggering notifications whenever items are inserted or updated.
At the time of writing, you can use the Change Feed for SQL API, Cassandra, MongoDB API and Gremlin API Cosmos DB account. The Change Feed currently captures all operations. You can’t limit what type of operations the Change Feed will listen to (such as updates-only or inserts-only). The Change Feed will also not listen to any delete operations.
You don’t have to do anything special to enable the Change Feed in your Cosmos DB accounts (It’s there by default!).
Ok, so what’s special about the Change Feed Processor?
The Change Feed Processor simplifies the reading of the Change Feed and it distributes the processing of events across multiple consumers. It’s fault-tolerant behavior provides ‘at-least-once’ delivery of all events in the Change Feed.
The library follows the observer pattern. If your Change Feed has a high throughput, we can just instantiate multiple clients to read from the feed and the library will divide the load among all our clients without us having to do anything. If you really want to or need to have your own load balancer, you can do that as well.
So how does the Processor work?
The Change Feed Processor works in 4 steps:
- It will read the change feed
- If there aren’t any changes, it will go to sleep (we can actually customize this time).
- If there are changes, we will process these using the delegate.
- When these changes are processed, the lease store will be updates with the latest processed point and will return to step 1.
In order to implement the Change Feed processor, we will need to set up the following components:
- We need a container to monitor for events. These events generate the Change Feed.
- We need a lease container. This acts as a state storage and will help co-ordinate the processing the change feed across multiple workers.
- We need a host that uses the processor to listen for changes. If we have multiple instances of the Change Feed Processor that share the same lease configuration, each host instance should have a different name.
- Finally, we need to have a delegate. This is where we process our changes that the Change Feed will read.
What if an error happens in my delegate? How are errors handled?
If there’s an unhandled exception in your delegate, the thread that is processing those changes will be stopped and a new thread will be created.
This new thread will then see what the latest point in time in the lease store that was persisted and start again from there. This will continue until our changes have been processed. This is the ‘at least once’ guarantee that the Change Feed Processor provides.
Can you show me some code please?
Let’s write up some code to see this in action. I’ll create a simple console application and go through the most important parts:
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient)
{
Container leaseContainer = cosmosClient.GetContainer(_productDatabase.Id, _productLeaseContainer.Id);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(_productDatabase.Id, _productContainer.Id)
.GetChangeFeedProcessorBuilder<Product>("cosmosBackupSample", HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor for Product Backups...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Product Backup started");
return changeFeedProcessor;
}
In this method, we define the point of entry which is the container that we want to monitor. We use the method GetChangeFeedProcessorBuilder to build our processor on the container that we want to monitor. Here, we define how we want to handle changes in our Change Feed (define our delegate), define the instance name and then add the lease container configuration.
Now let’s take a look at the delegate:
private static async Task HandleChangesAsync(
IReadOnlyCollection<Product> changes,
CancellationToken cancellationToken)
{
Console.WriteLine("Backing up Product items");
foreach (Product product in changes)
{
await _productBackUpContainer.CreateItemAsync(
product,
partitionKey: new PartitionKey(product.ProductCategory));
Console.WriteLine($"Product added to backup container");
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes");
}
Here, we’re passing through a IReadOnlyCollection list of Product items. This is essentially the Change Feed events that have been picked up. We then iterate through these changes and for each product that has been added to our list, we persist that item in our Backup collection.
If you want to see the basic implementation of this, please check out this GitHub Repo.
Seems simple enough, but how do I scale this?
As long as all instances of the Change Feed that you deploy use the same configuration for the lease container, they all have the same workflow name and each instance has it’s own name, the Change Feed Processor will distribute all leases in the lease container across all our instances. We can increase and decrease the number of instances and the processor will adjust the load.
In terms of Throughput, how will I get charged?
The Change Feed is still essentially a read operation, so you will be charged for RU’s consumed. These will be consumed by the lease container.
Conclusion
After reading this article, I hope you’ve gained a basic understanding on how the Change Feed Processor works. If you need to perform multiple Change Feed actions on the same container, then I’d highly recommend you go with the Processor instead of using multiple Azure Functions with the Cosmos DB Triggers.
If you have any questions, feel free to post in the comments.
Top comments (1)
Thanks, the neat explaination, I struggled to find...