DEV Community

Will Velida
Will Velida

Posted on

Building a simple streaming app with Azure Cosmos DB, Event Hubs and Azure Functions

Thanks to Azure Functions, we can build event-driven applications without having to worry about the infrastructure. Functions are great for integrating different components together, building simple API’s, developing event-driven micro-services and so much more.

In this article, I’m going to show you have we can build an Azure Function that generate events that we can send to Event Hubs and then persist those events into Azure Cosmos DB. For this sample, I will be showing you how you can use dependency injection within Azure Functions instead of using the bindings. For the purposes of this article, I’ll be developing my Function using C#.

While we could build the same application using the supported bindings for Event Hubs and Cosmos DB, I prefer to use dependency injection. If we wanted to create more functions underneath our Function App, we could share a singleton instance of our Cosmos DB and Event Hub clients, which I believe is a lot cleaner than developing bindings all over the place.

For this sample, Let’s imagine we have various devices situated around the world that capture temperature, how much damage they’ve received by level and how long that device has been out in the field. We aren’t going to do too much logic based on this scenario, we’ll use this concept as a basis to demonstrate how generated events can be processed by Event Hubs.

What packages will we need?

For this sample I’ve installed the following NuGet packages into my Function App:

  • Azure.Messaging.EventHubs.Processor. This is the new .NET Core library for Event Hubs. (I didn’t know there was a new library so I’d thought it would be cool to give it a try). I’ll use this library to connect to my Event Hub namespace and process events through my Event Hub.
  • Microsoft.Azure.Cosmos. I’ll be using v3 of the .NET SDK to persist my events to my Cosmos DB collection. At the time of writing this article, the Azure Function bindings for Cosmos DB still use v2 of the .NET SDK, so by using Dependency Injection in our Function app, we can leverage all the new and improved features of v3.
  • Bogus. This is a neat little library that allows us to generate random data based on our C# objects. I first saw this library in action when I was going through the labs that the Cosmos DB Engineering team have put together. I’m just using it here to generate lots of events for our Device Readings that I can send to Event Hub.

Injecting our services

In order to register my services, I’ll need to create a Startup class. Here’s the Startup class for my Function app.

using Azure.Messaging.EventHubs.Producer;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using TelementryApp.Helpers;

[assembly: WebJobsStartup(typeof(Startup))]
namespace TelementryApp.Helpers
{
    public class Startup : IWebJobsStartup
    {
        public void Configure(IWebJobsBuilder builder)
        {
            builder.Services.AddLogging(loggingBuilder =>
            {
                loggingBuilder.AddFilter(level => true);
            });

            var config = (IConfiguration)builder.Services.First(d => d.ServiceType == typeof(IConfiguration)).ImplementationInstance;

            builder.Services.AddSingleton(s => new CosmosClient(config[Constants.COSMOS_CONNECTION_STRING]));
            builder.Services.AddSingleton(
                s => new EventHubProducerClient(config[Constants.EVENT_HUB_CONNECTION_STRING], config[Constants.EVENT_HUB_NAME]));
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

In this class, I’ve created two singleton instances. First, I’ve declared a singleton instance for my CosmosClient which requires my to pass through a connection string value for my Cosmos DB account. I’ve also created a singleton instance for my EventHubProducerClient, which takes in a connection string for my Event Hub namespace along with the name of the Event Hub that I was to send messages to.

As you may have noticed, instead of passing in the explicit values for my connection strings, I’m passing through configuration values. I have a constants file below that declares my constants. These will correspond to the real connection string values within my local.settings.json file

using System;
using System.Collections.Generic;
using System.Text;

namespace TelementryApp.Helpers
{
    public class Constants
    {
        public const string COSMOS_CONNECTION_STRING = "COSMOS_CONNECTION_STRING";
        public const string TELEMENTRY_DATABASE = "TELEMENTRY_DATABASE";
        public const string TELEMENTRY_CONTAINER = "TELEMENTRY_CONTAINER";
        public const string EVENT_HUB_CONNECTION_STRING = "EVENT_HUB_CONNECTION_STRING";
        public const string EVENT_HUB_NAME = "EVENT_HUB_NAME";
    }
}
Enter fullscreen mode Exit fullscreen mode

Sending Events to Event Hub

Now that we’ve registered our CosmosClient and EventHubProducerClient services, let’s start working on sending events to Event Hub.

Let’s explore the model that we’ll use for our scenario. Here I have a DeviceReading class that we’ll use for our scenario. In v3 of the Cosmos DB .NET SDK, we need to explicitly specify our id of the document that we are trying to persist. In order to do this, I have a property in our DeviceReading class called DeviceId which I have decorated with a JsonProperty called id. To do this, you’ll need to add a using statement for Newtonsoft.Json.

using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;

namespace TelementryApp.Models
{
    public class DeviceReading
    {
        [JsonProperty("id")]
        public string DeviceId { get; set; }
        public decimal DeviceTemperature { get; set; }
        public string DamageLevel { get; set; }
        public int DeviceAgeInDays { get; set; }
    }
}
Enter fullscreen mode Exit fullscreen mode

We’ve got a model to work with so now we can build our Function to send events to our Event Hub:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Bogus;
using TelementryApp.Models;
using System.Collections.Generic;
using Microsoft.Extensions.Configuration;
using Azure.Messaging.EventHubs.Producer;
using Azure.Messaging.EventHubs;
using System.Text;

namespace TelementryApp.Functions
{
    public class TriggerEvents
    {
        private readonly ILogger<TriggerEvents> _logger;
        private readonly IConfiguration _config;
        private readonly EventHubProducerClient _eventHubProducerClient;

        public TriggerEvents(
            ILogger<TriggerEvents> logger,
            IConfiguration config,
            EventHubProducerClient eventHubProducerClient)
        {
            _logger = logger;
            _config = config;
            _eventHubProducerClient = eventHubProducerClient;
        }

        [FunctionName(nameof(TriggerEvents))]
        public async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "TriggerEvents")] HttpRequest req)
        {
            IActionResult result = null;

            try
            {
                var deviceIterations = new Faker<DeviceReading>()
                .RuleFor(i => i.DeviceId, (fake) => Guid.NewGuid().ToString())
                .RuleFor(i => i.DeviceTemperature, (fake) => Math.Round(fake.Random.Decimal(0.00m, 30.00m), 2))
                .RuleFor(i => i.DamageLevel, (fake) => fake.PickRandom(new List<string> { "Low", "Medium", "High" }))
                .RuleFor(i => i.DeviceAgeInDays, (fake) => fake.Random.Number(1, 60))
                .GenerateLazy(5000);

                foreach (var reading in deviceIterations)
                {
                    EventDataBatch eventDataBatch = await _eventHubProducerClient.CreateBatchAsync();
                    var eventReading = JsonConvert.SerializeObject(reading);
                    eventDataBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes(eventReading)));
                    await _eventHubProducerClient.SendAsync(eventDataBatch);
                    _logger.LogInformation($"Sending {reading.DeviceId} to event hub");
                }

                result = new OkResult();
            }
            catch (Exception ex)
            {
                _logger.LogWarning($"Something went wrong. Exception thrown: {ex.Message}");
                result = new StatusCodeResult(StatusCodes.Status500InternalServerError);
            }

            return result;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Let’s go through this block by block.

First up, I’m using constructor injection to make our dependencies available within our function. Here I’m injecting my ILogger dependency (So I can send application logs to Application Insights), IConfiguration dependency (So I can access my configuration values in this Function) and EventHubProducerClient (So I can connect to my Singleton instance of my Event Hub Client) dependencies.

I’ve set up an HttpTrigger Function just so I can trigger the sending of events through an HTTP call. I’ll be using Postman to generate events.

In order to mock up events, I’m using Bogus.Faker to create a IEnumerable collection of Device Reading and defining the rules that I want to set for each property. I want to generate 5000 events, but you can go nuts and try and generate more if you want (You might be waiting a long time!).

I’m then iterating through my IEnumerable collection and converting each reading within that collection into an EventData type and sending that to my Event Hub.

Persisting events to Cosmos DB

Now let’s take a look at the function that will persist events to Cosmos DB:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using TelementryApp.Helpers;
using TelementryApp.Models;

namespace TelementryApp.Functions
{
    public class PersistEvents
    {
        private readonly ILogger<PersistEvents> _logger;
        private readonly IConfiguration _config;
        private CosmosClient _cosmosClient;
        private Container _telementryContainer;

        public PersistEvents(
            ILogger<PersistEvents> logger,
            IConfiguration config,
            CosmosClient cosmosClient)
        {
            _logger = logger;
            _config = config;
            _cosmosClient = cosmosClient;
            _telementryContainer = _cosmosClient.GetContainer(_config[Constants.TELEMENTRY_DATABASE],_config[Constants.TELEMENTRY_CONTAINER]);
        }

        [FunctionName(nameof(PersistEvents))]
        public async Task Run([EventHubTrigger("telementryreadings",
            Connection = "EVENT_HUB_CONNECTION_STRING")] EventData[] events)
        {

            foreach (EventData eventData in events)
            {
                try
                {
                    string messageBody = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);

                    var telementryEvent = JsonConvert.DeserializeObject<DeviceReading>(messageBody);

                    // Persist to cosmos db
                    await _telementryContainer.CreateItemAsync(telementryEvent);
                    _logger.LogInformation($"{telementryEvent.DeviceId} has been persisted");                  
                }
                catch (Exception ex)
                {
                    _logger.LogError($"Something went wrong. Exception thrown: {ex.Message}");
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Again, let’s go through this block by block.

I’m also using constructor injection to make my dependencies available in this function, but this time I’m using my CosmosClient, then setting the Container that I want to use by retrieving the Container I want to persist my events to by using the .GetContainer() method. I need to pass through the database and the container name in this method, which I’m getting from my Constants.

For this Function, I’m using an EventHubTrigger which will trigger every time an event gets sent to our Event Hub. In order to connect to my Event Hub, I’ve specified the Event Hub name that processes our events and defining the connection to our Event Hub Namespace. This will send events into an array of EventData.

We will iterate through this array and for each event, we’ll try to convert out incoming event into a DeviceReading object, then persist that to our container in Cosmos DB.

Seeing this in action

Now that’s we’ve written up the code for this Function, let’s see it in action.

I’ll fire up my Function in Visual Studio and open up Postman in order to generate my Events. This will give us a URL that we can use to send a request to generate our events:

Let’s paste that URL into Postman, hit Send and kick off some events:

We should see our sent events being logged:

We’ll also see events being persisted to Cosmos DB in our logs. After a while, we can view the persisted events in our container:

Conclusion

I hope by reading this article you can understand how straightforward it is to build event-driven applications with Azure Functions that use dependency injection to connect different services together.

If you would like to take a look at the whole sample, please check out my GitHub repository.

As always, if you have any questions let me know in the comments and I’ll do my best to help.

Top comments (0)