DEV Community

Mahesh Nikam
Mahesh Nikam

Posted on • Updated on

Apache Kafka with .NET Core: A Comprehensive Guide

.net core and Kafka

What is Kafka?
Apache Kafka is a distributed event streaming platform designed to handle high-throughput, fault-tolerant, and scalable data pipelines. It excels in real-time data streaming, enabling developers to build robust data pipelines and streaming applications.

Core Concepts

  1. Topics: Categories for streaming data, where each topic is a logical channel to which data is sent and from which data is received.

  2. Partitions: Distributed, ordered logs within topics that allow Kafka to scale horizontally and provide fault tolerance.

  3. Producers: Applications that send data to Kafka topics.

  4. Consumers: Applications that read data from Kafka topics.

  5. Brokers: Servers that host topics and partitions, managing the storage and retrieval of data.

  6. Consumer Groups: Clusters of consumers that work together to read data from Kafka topics, providing scalability and fault tolerance.

Key APIs

  • Producer API: Used to publish streams of records to Kafka topics.
  • Consumer API: Used to subscribe to topics and process streams of records.
  • Streams API: Used for building stream processing applications that transform input streams into output streams.
  • Connect API: Used to build and run reusable producers or consumers that connect Kafka topics to existing applications or data systems.

Advanced Features

  • Exactly-once semantics: Ensures that data is processed exactly once, preventing data duplication.
  • Transactional writes: Allows for atomic writes to multiple Kafka partitions.
  • Idempotent producers: Prevents duplicate records during network retries.
  • KRaft (Kafka Raft): A consensus protocol replacing ZooKeeper for metadata management, enhancing Kafka's reliability and scalability.

When to Use Kafka

  • Real-time data pipelines
  • Activity tracking
  • Metrics collection
  • Log aggregation
  • Stream processing
  • Event-sourcing
  • Commit log service

Alternatives

  • Apache Pulsar: Offers multi-tenancy and geo-replication. Ideal when tiered storage and multiple namespaces are needed.
  • RabbitMQ: Supports complex routing and low latency. Best for priority queues and request-reply patterns.
  • Amazon Kinesis: Fully managed service, perfect for integration with the AWS ecosystem.
  • Google Pub/Sub: A global message bus, suitable for multi-region deployments with exactly-once semantics.

Kafka Shines In

  • Large-scale data pipelines
  • Microservices event backbone
  • Real-time analytics and ML feature stores

When to Reconsider Kafka

  • Small-scale applications (the overhead may outweigh the benefits)
  • Strict ordering requirements across all messages
  • Need for complex message routing

Setting Up Kafka with ZooKeeper

Step 1: Download and Extract Kafka
Download Kafka from the official website. Extract the downloaded archive to your desired directory.

Step 2: Start ZooKeeper
Kafka uses ZooKeeper to manage distributed brokers. Start ZooKeeper using the following command:

bin/zookeeper-server-start.sh config/zookeeper.properties

Enter fullscreen mode Exit fullscreen mode

Step 3: Start Kafka Server
Once ZooKeeper is running, start the Kafka server:

bin/kafka-server-start.sh config/server.properties
Enter fullscreen mode Exit fullscreen mode

Step 4: Create a Topic
Create a Kafka topic named "test-topic":

bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Enter fullscreen mode Exit fullscreen mode

Building a .NET Core Application with Kafka

Setting Up .NET Core
Create a .NET Core Application: Use the .NET CLI to create a new console application.

dotnet new console -n KafkaDemo
cd KafkaDemo
Enter fullscreen mode Exit fullscreen mode

Add Kafka NuGet Package: Add the Confluent.Kafka package to your project.

dotnet add package Confluent.Kafka
Enter fullscreen mode Exit fullscreen mode

Producer Example

using Confluent.Kafka;
using System;
using System.Threading.Tasks;

class Producer
{
    public static async Task Main(string[] args)
    {
        var config = new ProducerConfig { BootstrapServers = "localhost:9092" };

        using (var producer = new ProducerBuilder<Null, string>(config).Build())
        {
            try
            {
                var deliveryResult = await producer.ProduceAsync("test-topic", new Message<Null, string> { Value = "Hello Kafka" });
                Console.WriteLine($"Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");
            }
            catch (ProduceException<Null, string> e)
            {
                Console.WriteLine($"Delivery failed: {e.Error.Reason}");
            }
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

Consumer Example

using Confluent.Kafka;
using System;

class Consumer
{
    public static void Main(string[] args)
    {
        var config = new ConsumerConfig
        {
            GroupId = "test-consumer-group",
            BootstrapServers = "localhost:9092",
            AutoOffsetReset = AutoOffsetReset.Earliest
        };

        using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
        {
            consumer.Subscribe("test-topic");

            try
            {
                while (true)
                {
                    var cr = consumer.Consume();
                    Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                }
            }
            catch (OperationCanceledException)
            {
                consumer.Close();
            }
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

Schema Registry with .NET

Setting Up Schema Registry
Schema Registry is a tool that provides a serving layer for your metadata. It provides a RESTful interface for managing Avro schemas. You need to download and run the Schema Registry provided by Confluent.

Step 1: Download and Extract Schema Registry
Download the Schema Registry from the Confluent website. Extract the downloaded archive to your desired directory.

Step 2: Start Schema Registry
Start the Schema Registry using the following command:

bin/schema-registry-start config/schema-registry.properties
Enter fullscreen mode Exit fullscreen mode

Using Schema Registry in .NET Core

Schema Registry in Kafka is a component that provides a centralized repository for managing and validating schemas for data produced and consumed via Kafka. Here's a brief overview of its uses:

Schema Management: Stores Avro, JSON, and Protobuf schemas for Kafka topics, ensuring consistent data structure across producers and consumers.

Version Control: Maintains a version history of schemas, allowing for schema evolution and backward/forward compatibility checks.

Validation: Ensures that data written to Kafka topics adheres to predefined schemas, preventing schema mismatches and data corruption.

Interoperability: Facilitates smooth integration between different applications and services by standardizing data formats.

Decoupling: Separates schema management from application logic, simplifying the development and maintenance process.

By using Schema Registry, developers can ensure data consistency, streamline data processing, and improve overall data quality within their Kafka-based data pipelines.

Set up Schema Registry:
Ensure you have Confluent's Schema Registry running. You can use Docker for this:

docker run -d --name schema-registry -p 8081:8081 -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://localhost:9092 confluentinc/cp-schema-registry:latest

Enter fullscreen mode Exit fullscreen mode

Create an Avro Schema:
Define your Avro schema and register it in the Schema Registry.

{
  "type": "record",
  "name": "User",
  "namespace": "com.example",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"}
  ]
}

Enter fullscreen mode Exit fullscreen mode

Define Avro Schema: Define your Avro schema and compile it using tools like avrogen.

You can register this schema using the Schema Registry API:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"}' \
http://localhost:8081/subjects/user-value/versions

Enter fullscreen mode Exit fullscreen mode

Add Confluent.SchemaRegistry NuGet Package: Add the Confluent.SchemaRegistry package to your project.

dotnet add package Confluent.Kafka
dotnet add package Confluent.SchemaRegistry
dotnet add package Confluent.SchemaRegistry.Serdes
Enter fullscreen mode Exit fullscreen mode

Producer Example with Avro

using System;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using Avro.Generic;

class Program
{
    static async Task Main(string[] args)
    {
        var schemaRegistryConfig = new SchemaRegistryConfig { Url = "http://localhost:8081" };
        var producerConfig = new ProducerConfig { BootstrapServers = "localhost:9092" };

        using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
        using var producer = new ProducerBuilder<string, GenericRecord>(producerConfig)
            .SetValueSerializer(new AvroSerializer<GenericRecord>(schemaRegistry))
            .Build();

        var schema = (await schemaRegistry.GetLatestSchemaAsync("user-value")).SchemaString;
        var parser = new Avro.IO.Parser();
        var avroSchema = parser.Parse(schema);

        var user = new GenericRecord(avroSchema);
        user.Add("name", "John Doe");
        user.Add("age", 30);

        var message = new Message<string, GenericRecord> { Key = "user1", Value = user };

        var deliveryResult = await producer.ProduceAsync("users", message);

        Console.WriteLine($"Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");
    }
}


Enter fullscreen mode Exit fullscreen mode

Consumer Example with Avro

using System;
using Confluent.Kafka;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using Avro.Generic;

class Program
{
    static void Main(string[] args)
    {
        var schemaRegistryConfig = new SchemaRegistryConfig { Url = "http://localhost:8081" };
        var consumerConfig = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "test-consumer-group",
            AutoOffsetReset = AutoOffsetReset.Earliest
        };

        using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
        using var consumer = new ConsumerBuilder<string, GenericRecord>(consumerConfig)
            .SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
            .Build();

        consumer.Subscribe("users");

        while (true)
        {
            var consumeResult = consumer.Consume();
            var user = consumeResult.Value;

            Console.WriteLine($"Consumed record with name: {user["name"]}, age: {user["age"]}");
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

Happy coding!!!

Top comments (0)