DEV Community

Cover image for Event streaming in .Net with Kafka
LGouellec
LGouellec

Posted on

Event streaming in .Net with Kafka

For a couple years now, Apache Kafka® has become the standard in the event streaming architecture.

During each everyday of our life, we generate events :

  • Buy a new car
  • Change his address in a e-commerce website
  • Add an item to your shopping cart
  • Book a flight for New-York
  • Refuel your gas
  • ...

An event is composed of a state and a timestamp. So if you could stream all of your events into an immutable place, you could revive your own life since the beginning with all changes. That's why Kafka has been created couple years ago.

What is the stream processing ?

Stream processing is an approach that involves ingesting events as a continuous pipeline to quickly transform, filter, aggregate in the same time.

Based on this definition, a lot of use cases appears with this mindset :

  • Detect fraudulent transactions
  • Real-time stock trades
  • Marketing, sales, and business analytics
  • Tracking user activity
  • Monitoring and Reporting
  • Cyber security
  • Databases migration

Today in this blog post, we will build an event streaming application using Streamiz, the .NET Stream Processing Library for Apache Kafka®.

Before starting, more explanation about each component should be more than welcome.

Apache Kafka®

Apache Kafka is a distributed log and stream-processing platform. It is an open-source system developed by the Apache Software Foundation. The project aims to provide a highly scalable, elastic and fault-tolerant platform. You can easily interact with external systems (source and sink) with Kafka Connect. "Kafka Streams" is a JAVA library used for creating stream processing applications.

Streamiz

Streamiz is an open source project (released under the MIT licence) to allow you to create a .net event streaming application with Apache Kafka®.

Let's go into more detail.

Internally, Streamiz use the .Net client for Apache Kafka released by Confluent and try to provide the same features than Kafka Streams. There is gap between these two library, but the trend is decreasing after each release.

Streamiz Architecture

Streamiz wrap a consumer, a producer, and execute the topology for each record consumed in the source topic. You can easily create stateless and stateful application. By default, each state store is a RocksDb state store persisted on disk.

For more informations on Streamiz, consult the documentation.

.Net event streaming application

Let's present the use case : We work for a shoes e-shop online, all orders are published into a Kafka topic. Some referential data like the customer name, email, the product name and the price are not present in the order payload.

As a developer, you want to publish an enriched event with all these fields into a Kafka topic for the downstream applications.

Streamiz Flow

First step, you have to import all your referential data in Kafka. It depends your source of truth, in my case and for simplicity, I will use the DatagenConnector for mocking customers and products messages.

Create a datagen connector for mocking customer events

curl -s -X PUT \
      -H "Content-Type: application/json" \
      --data '{
                "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
                "kafka.topic": "customers",
                "max.interval": 10,
                "quickstart": "shoe_customers",
                "iterations": 1000,
                "tasks.max": "1",
                "transforms": "ValueToKey,ExtractField",
                "transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
                "transforms.ValueToKey.fields": "id",
                "transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
                "transforms.ExtractField.field": "id"
            }' \
      http://localhost:8083/connectors/datagen-customers/config
Enter fullscreen mode Exit fullscreen mode

This connector will generate 1000 mock customers.

Create a datagen connector for mocking product events

curl -s -X PUT \
      -H "Content-Type: application/json" \
      --data '{
                "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
                "kafka.topic": "products",
                "max.interval": 10,
                "iterations": 1000,
                "quickstart": "shoes",
                "tasks.max": "1",
                "transforms": "ValueToKey,ExtractField",
                "transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
                "transforms.ValueToKey.fields": "id",
                "transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
                "transforms.ExtractField.field": "id"
            }' \
      http://localhost:8083/connectors/datagen-products/config
Enter fullscreen mode Exit fullscreen mode

This connector will generate 1000 mock products.

Now, all I need it is an order event stream. Let's go create a new Datagenconnector instance. This one will create a flow of orders every 50ms.

curl -s -X PUT \
      -H "Content-Type: application/json" \
      --data '{
                "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
                "kafka.topic": "orders",
                "max.interval": 50,
                "quickstart": "shoe_orders",
                "tasks.max": "1",
                "transforms": "ValueToKey,ExtractField",
                "transforms.ValueToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
                "transforms.ValueToKey.fields": "customer_id",
                "transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
                "transforms.ExtractField.field": "customer_id"
            }' \
      http://localhost:8083/connectors/datagen-orders/config
Enter fullscreen mode Exit fullscreen mode

Let's create the .net event streaming application. Please create a new .net console project with the Streamiz dependency :

dotnet add package Streamiz.Kafka.Net
Enter fullscreen mode Exit fullscreen mode

Your streamiz application will split in 3 different parts :

  • Configuration (Configure your application.id, bootstrap.servers, default serializer/deserializer, credentials, etc ...)
  • Design the topology via a StreamBuilder
  • Run the streamiz application

Configure your application

Only two parameters are mandatory :

  • ApplicationId : An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.
  • BooststrapServers : Initial list of brokers as a CSV list of broker host or host:port.
var config = new StreamConfig();
config.ApplicationId = "shoes-shop-app";
config.ClientId = "shoes-shop-app-client";
// Where to find Kafka broker(s).
config.BootstrapServers = boostrapserver;
// Set to earliest so we don't miss any data that arrived in the topics before the process started
config.AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Earliest;
Enter fullscreen mode Exit fullscreen mode

Design your topology

A topology is an acyclic graph of sources, processors, and sinks. A source is a node in the graph that consumes one or more Kafka topics and forwards them to its successor nodes. A processor is a node in the graph that receives input records from upstream nodes, processes the records, and optionally forwarding new records to one or all of its downstream nodes. Finally, a sink is a node in the graph that receives records from upstream nodes and writes them to a Kafka topic.

A Topology allows you to construct an acyclic graph of these nodes, and then passed into a streamiz application that will then begin consuming, processing, and producing records.

Topology

The StreamBuilder helps the developer to design the topology. You can easily stream a topic, transform, filter, enriched, aggregate using the DSL API.

For example, our use case need two referential tables, one for customer and an another one for product. You can create GlobalTable, it looks like a global database from these two topics :

// Create a global state store (materialized with a rocksdb store) based on the "customers" topic
var customers = builder.GlobalTable<string, Customer, StringSerDes, JsonSerDes<Customer>>(
                CUSTOMER_TOPIC, RocksDb.As<string, Customer>(CUSTOMER_STORE));

// Create a global state store (materialized with a  rocksdb store) based on the "products" topic
var products = builder.GlobalTable<string, Product, StringSerDes, JsonSerDes<Product>>(
                PRODUCT_TOPIC, RocksDb.As<string, Product>(PRODUCT_STORE));
Enter fullscreen mode Exit fullscreen mode

Now, you have to stream the orders topic inside the topology and join the order with the customer and the product.

Keep in mind, the ultimate goal of our use case is to enriched the original order payload with all referential data regarding the customer and the product.

// Create a stream based on the "orders" topic 
var orderStream = builder.Stream<string, Order, StringSerDes, JsonSerDes<Order>>(ORDER_TOPIC);

// Join the order and the customer based on the customer id
var customerOrderStream = orderStream.Join(customers,
(orderId, order) => order.customer_id,
(order, customer) => new CustomerOrder(customer, order));

// Join the order+customer and the product based on the product id
var enrichedOrderStream = customerOrderStream.Join(products,
(orderId, customerOrder) => customerOrder.Order.product_id,
(customerOrder, product) => EnrichedOrderBuilder.Build(customerOrder, product));

// Publish the enriched payload to an output topic
enrichedOrderStream.To<StringSerDes, JsonSerDes<EnrichedOrder>>(ENRICHED_ORDER_TOPIC);
Enter fullscreen mode Exit fullscreen mode

For illustrate the result, an example of input payloads and the output desired :

Customer payload

{
  "id": "6143a99d-cd87-473b-bdb3-00ef2230a57b",
  "first_name": "Ursulina",
  "last_name": "Bowstead",
  "email": "ogibbard89@meetup.com",
  "phone": "992-999-4009",
  "street_address": "2470 Old Shore Road",
  "state": "New York",
  "zip_code": "11247",
  "country": "United States",
  "country_code": "US"
}
Enter fullscreen mode Exit fullscreen mode

Product payload

{
  "id": "a4baf0d7-2712-4b07-98a9-8287a320bc47",
  "brand": "Torphy Group",
  "name": "Perf Max 686",
  "sale_price": 5247,
  "rating": 0
}
Enter fullscreen mode Exit fullscreen mode

Order payload

{
  "order_id": 1087,
  "product_id": "a4baf0d7-2712-4b07-98a9-8287a320bc47",
  "customer_id": "6143a99d-cd87-473b-bdb3-00ef2230a57b",
  "ts": 1671184033000
}
Enter fullscreen mode Exit fullscreen mode

OrderEnriched payload

{
  "OrderId": 1087,
  "ProductId": "a4baf0d7-2712-4b07-98a9-8287a320bc47",
  "ProductName": "Perf Max 686",
  "ProductBrand": "Torphy Group",
  "ProductPrice": 5247,
  "CustomerId": "6143a99d-cd87-473b-bdb3-00ef2230a57b",
  "OrderTime": "2022-12-16T09:47:13Z",
  "CustomerFirstName": "Ursulina",
  "CustomerLastName": "Bowstead",
  "CustomerEmail": "ogibbard89@meetup.com",
  "CustomerPhone": "992-999-4009",
  "CustomerAddress": "2470 Old Shore Road 11247 New York - United States"
}
Enter fullscreen mode Exit fullscreen mode

Bellow, the complete topology designed :

StreamBuilder builder = new();

var orderStream = builder.Stream<string, Order, StringSerDes, JsonSerDes<Order>>(ORDER_TOPIC);

var customers = builder.GlobalTable<string, Customer, StringSerDes, JsonSerDes<Customer>>(
                CUSTOMER_TOPIC, RocksDb.As<string, Customer>(CUSTOMER_STORE));

var products = builder.GlobalTable<string, Product, StringSerDes, JsonSerDes<Product>>(
                PRODUCT_TOPIC, RocksDb.As<string, Product>(PRODUCT_STORE));

var customerOrderStream = orderStream.Join(customers,
(orderId, order) => order.customer_id,
(order, customer) => new CustomerOrder(customer, order));

var enrichedOrderStream = customerOrderStream.Join(products,
(orderId, customerOrder) => customerOrder.Order.product_id,
(customerOrder, product) => EnrichedOrderBuilder.Build(customerOrder, product));

enrichedOrderStream.To<StringSerDes, JsonSerDes<EnrichedOrder>>(ENRICHED_ORDER_TOPIC);
Enter fullscreen mode Exit fullscreen mode

Start your application

The last part is to create a stream instance which will run the topology at scale.

// Build your `StreamBuilder` for getting a `Topology`
var topology = builder.Build();

// Create a new KafkaStream instance with your topology and the configuration
KafkaStream stream = new(topology, config);   

// Start the processing
await stream.StartAsync(source.Token);
Enter fullscreen mode Exit fullscreen mode

Summary

In this post, we introduced how to use Streamiz, the .NET Stream Processing Library for Apache Kafka to build at scale, an event streaming application.

Give it a try - here’s the GitHub repository, follow the README instructions.

Feel free to support the project giving a ⭐️ and testing the library. Contributors are more than welcome 💥🚀

Resources:

Top comments (0)