DEV Community

Ricardo Medeiros for VaiVoa

Posted on • Updated on • Originally published at dev.to

KAFKA + KSQLDB + .NET #1

Hi, I'm Ricardo Medeiros, .NET back end developer @vaivoa, and today I'm going to walk you through using ksqlDB to query messages produced in kafka by a .NET/C# producer. For this example, I will be deploying my enviroment as containers, described in a docker compose file, to ensure easy reproducibility of my results.

The source code used in this example is avaliable here.

Services

First, let's talk about the docker compose environment services. the file is avaliable here.

.NET API Producer

Automaticaly generated .NET api with docker compose service

ksqldbdemo:
    container_name: ksqldbdemo
    image: ${DOCKER_REGISTRY-}ksqldbdemo
    build:
      context: .
      dockerfile: Dockerfile
Enter fullscreen mode Exit fullscreen mode

This producer service needs the .NET generated dockerfile shown below:

FROM mcr.microsoft.com/dotnet/aspnet:5.0 AS base
WORKDIR /app
EXPOSE 80
EXPOSE 443

FROM mcr.microsoft.com/dotnet/sdk:5.0 AS build
WORKDIR /src
COPY ["ksqlDBDemo.csproj", "."]
RUN dotnet restore "ksqlDBDemo.csproj"
COPY . .
WORKDIR "/src/"
RUN dotnet build "ksqlDBDemo.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "ksqlDBDemo.csproj" -c Release -o /app/publish

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "ksqlDBDemo.dll"]
Enter fullscreen mode Exit fullscreen mode

ZooKeeper

Despite not been necessary since Kafka 2.8, ZooKeeper coordinates kafka tasks, defining controllers, cluster membership, topic configuration and more. In this tutorial, it's used the confluent inc. ZooKeeper image, due to it's use in the reference material. It makes Kafka more reliable, but adds complexity into the system.

zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
Enter fullscreen mode Exit fullscreen mode

Kafka

Kafka is an event streaming plataform capable of handling trillions of events a day. Kafka is based on the abstraction of an distributed commit log. Initialiy developed at LinkedIn in 2011 to work as a message queue, but it has evolved into a full-fledge event streanming platfmorm. Listed as broker in the services, is the core of this tutorial. It's configuration is tricky, but using it as follows worked well in this scenario.

 broker:
    image: confluentinc/cp-kafka:7.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
Enter fullscreen mode Exit fullscreen mode

ksqlDB

ksqlDB is a database built to allow distributed stream process applications. Made to work seamsly with kafka, it has a server that runs outside of kafka, with a REST API and a CLI application that can be run separatly and it's used in this tutorial.

ksqlDB Server

In this example, it's used the confluent inc image of the ksqlDB server, once more, due to it's widespread usage.

ksqldb-server:
    image: confluentinc/ksqldb-server:0.22.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:29092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
Enter fullscreen mode Exit fullscreen mode

ksqlDB CLI

The same goes for the ksqlDB CLI service, that also use the confluent inc image.

ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.22.0
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true
Enter fullscreen mode Exit fullscreen mode

Kafdrop

Kafdrop is a Web UI for viewing kafka topics and browsing consumer groups. It makes kafka more accessible.

kafdrop:
    container_name: kafdrop
    image: obsidiandynamics/kafdrop:latest
    depends_on:
      - broker
    ports:
      - 19000:9000
    environment:
      KAFKA_BROKERCONNECT: broker:29092
Enter fullscreen mode Exit fullscreen mode

Tutorial

Now it's the time that you have been waiting, let's make it work!

Enviroment

For this tutorial, you'll need a docker desktop installation, either it's on a Linux distribution or on Windows with WSL and git.

Cloning the project

A Visual Studio project is avaliable here, it has docker support and already deploys all the services needed for this demo in the IDE. However, you will be fine if you don't want or can't use Visual Studio. Just clone it, running the following comand on the terminal and directory of your preference:

 $ git clone https://github.com/jjackbauer/ksqlDBDemo.git
Enter fullscreen mode Exit fullscreen mode

Use the following command to move to the project folder:

 $ cd /ksqlDBDemo
Enter fullscreen mode Exit fullscreen mode

And, in the project folder, that contains the docker-compose.yml run the following command to deploy the services:

$ docker compose up -d
Enter fullscreen mode Exit fullscreen mode

after this command, make sure that all services are running. Sometimes services fall, but it is okay. In order to see if everything is running ok, it's possible to see the services running in docker desktop, as shown bellow:

Docker Desktop

Or you can execute the following command:

$ docker ps
Enter fullscreen mode Exit fullscreen mode

Which should output something like this:

CONTAINER ID   IMAGE                               COMMAND                  CREATED       STATUS       PORTS
  NAMES
b42ce9954fd9   ksqldbdemo_ksqldbdemo               "dotnet ksqlDBDemo.d…"   2 hours ago   Up 2 hours   0.0.0.0:9009->80/tcp, 0.0.0.0:52351->443/tcp   ksqldbdemo
0a0186712553   confluentinc/ksqldb-cli:0.22.0      "/bin/sh"                2 hours ago   Up 2 hours
  ksqldb-cli
76519de6946e   obsidiandynamics/kafdrop:latest     "/kafdrop.sh"            2 hours ago   Up 2 hours   0.0.0.0:19000->9000/tcp
  kafdrop
11c3a306ee01   confluentinc/ksqldb-server:0.22.0   "/usr/bin/docker/run"    2 hours ago   Up 2 hours   0.0.0.0:8088->8088/tcp
  ksqldb-server
07cef9d69267   confluentinc/cp-kafka:7.0.0         "/etc/confluent/dock…"   2 hours ago   Up 2 hours   9092/tcp, 0.0.0.0:29092->29092/tcp
  broker
3fa1b9a60954   confluentinc/cp-zookeeper:7.0.0     "/etc/confluent/dock…"   2 hours ago   Up 2 hours   2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp     zookeeper
Enter fullscreen mode Exit fullscreen mode

WEB API

Now, with all services up and running, we can access the WEB API Swagger to populate our Kafka topics. The code is very simple and it's avaliable in the repository.

The WEB API swagger is deployed at http://localhost:9009/swagger/index.html. As shown in the image bellow, it has two endpoints and they create events that could be created by indepent microservices. One for creating an event that creates a userName in the system and another that takes an Id and generates a three digit code.

Swagger Geral

Then you can create an User with the user name of your choise, as shown:
Request Create user

And it will have an assigned unique Id, as demonstrated:

Response create user

Now, you can get a three digit code for your user Id as displayed:

Get Code Request

And a random code is generated for the selectd, as we can observe in the image that follows:

Get Code Response

Kafdrop

We can use the kafdrop UI the check if everything is okay. Kafdrop is deployed at http://localhost:19000/.
There, you will find all the brokers and topics avaliable. It should look like this:

Kafdrop

KSQL CLI

After all that, you'll be able to create your streams of data and query it using ksqlDB. On your preferential terminal, use the command:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
Enter fullscreen mode Exit fullscreen mode

Creating streams

And then you are in the ksql CLI and are free to create your streams and queries. First, let's create a stream for each one of our topics:

CREATE STREAM stream_user (Name VARCHAR, Id VARCHAR)
  WITH (kafka_topic='demo-user', value_format='json', partitions=1);
Enter fullscreen mode Exit fullscreen mode
CREATE STREAM stream_code (Id VARCHAR, code INT)
  WITH (kafka_topic='demo-code', value_format='json', partitions=1);
Enter fullscreen mode Exit fullscreen mode

Create a materialized view

You can join the client data with the most recent randomized code. to achieve this, you must create a materialized view table, that joins both streams as seen in the ksqldb script that follows:

CREATE TABLE currentCodeView AS
>   SELECT user.Name,
>   LATEST_BY_OFFSET(code.code) AS CurrentCode
>   FROM stream_code code INNER JOIN stream_user user
>   WITHIN 7 DAYS ON code.Id = user.Id
>   GROUP BY user.Name
>EMIT CHANGES;
Enter fullscreen mode Exit fullscreen mode

Making a push query

After that, we can query this materialized view:

SELECT * FROM currentCodeView 
  EMIT CHANGES;
Enter fullscreen mode Exit fullscreen mode

This push query keep on running until you hit cntrl+c to cancel it.

Conclusions

In this tutorial it's demonstrated that in a kafka + ksqlDB enviroment, you can make SQL queries and also join on data that comes from different events, which is one of most complexities envolved with microsservices systems. And it is what ksqlDB solves by enabling SQL operations over Kafka topics.
It's my goal to explore the possibilites allowed by this ecosystem and I hope to bring more knowledge on this topic in another articles here. Any sugestions, comments or corrections, fell free to reach me out at LinkedIn.

References

ksqlDB Quickstart
ksqlDB Overview
Kafka .NET Client
ksqlDB Documentation - Data Types Overview
KSQL and ksqlDB
Welcome to Apache ZooKeeper
What is ZooKeeper & How Does it Support Kafka?
What is Apache Kafka®?
ksqlDB - The database purpose-built for stream processing applications
An overview of ksqlDB
CREATE TABLE AS SELECT
How to join a stream and a stream
Time and Windows in ksqlDB Queries
Time operations

linha horizontal

Disclaimer

A VaiVoa incentiva seus Desenvolvedores em seu processo de crescimento e aceleração técnica. Os artigos publicados não traduzem a opinião da VaiVoa. A publicação obedece ao propósito de estimular o debate.

logo vaivoa

Top comments (0)