DEV Community

VzlDev
VzlDev

Posted on

Use Kafka in your Web Api

Hello guys!
Today I challenged myself to learn a little bit about event streaming, so I created an example using Kafka.

What is event streaming?

Event streaming is the practice of capturing data in real-time from event sources like databases, sensors, mobile devices, cloud services, and software applications in the form of streams of events; storing these event streams durably for later retrieval; manipulating, processing, and reacting to the event streams in real-time as well as retrospectively; and routing the event streams to different destination technologies as needed. (https://kafka.apache.org/intro)

What is Kafka?

Kafka is an event streaming platform, that allows:

  • Write/read data.
  • Store data.
  • Process data.

Key Kafka Concepts

  • Event, the events in a nutshell are the data that we'll store. It can also be called record or message. When you read or write data to Kafka, you do this in the form of events.

  • Topic, is the place where the events will be stored.

  • Producers are those client applications that publish (write) events to Kafka

  • Consumersare those that subscribe to (read and process) events

In my project, i made changes to my EventsAPI, so that now when a user is registered in an event, i will store some data in a topic, that a new LogsApi will read and store in a db.

Using Kafka step by step

Step 1

To start using Kafka I recommend using an image container and run that image in the docker container.
After you pull the image, run it.

Step 2

In your Producer and Consumer API install the following nuget package:

  • Confluent.Kafka

Image description

Step 3

Update your producer api to write data to a kafka topic:

[Route("[controller]")]
[ApiController]
public class EventEnrollmentController : ControllerBase
{
    protected IEventsRepository _repository;
    private readonly IProducer<string, string> _producer;

    public EventEnrollmentController(IEventsRepository repository)
    {
        _repository = repository;
        var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
        _producer = new ProducerBuilder<string, string>(config).Build();
    }

    [HttpPut("{eventId}/register")]
    public async Task<ActionResult<bool>> AddUserToEvent(Guid userId, Guid eventId)
    {
        Event evento = await _repository.GetEventById(eventId);

        if (evento == null)
        {
            return NotFound();
        }

        evento.Users.Add(userId);

        LogEntry log = new LogEntry()
        {
            Id = Guid.NewGuid(),
            EventTitle = evento.Title,
            UserId = userId,
            RegistrationTime = DateTime.UtcNow,
        };

        await _producer.ProduceAsync("user-event-registrations", new Message<string, string>
        {
            Key = "UserEventRegistered",
            Value = JsonConvert.SerializeObject(log)
        });

        return await _repository.UpdateEvent(evento);
    }
}
Enter fullscreen mode Exit fullscreen mode

In this case, a topic called user-event-registrations is created.

Step 4

On your consumer API, do the following changes, so it will consume data of the user-event-registrations topic

public class LogService : BackgroundService
{
    private readonly IServiceProvider _serviceProvider;
    private readonly IConsumer<string, string> _consumer;
    private readonly ILogger<LogService> _logger;

    public LogService(IServiceProvider serviceProvider, ILogger<LogService> logger)
    {
        _serviceProvider = serviceProvider;
        _logger = logger;

        var config = new ConsumerConfig
        {
            GroupId = "log-group",
            BootstrapServers = "localhost:9092",
            AutoOffsetReset = AutoOffsetReset.Earliest
        };
        _consumer = new ConsumerBuilder<string, string>(config).Build();
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _consumer.Subscribe("user-event-registrations");

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                var consumeResult = _consumer.Consume(stoppingToken);
                var registration = JsonConvert.DeserializeObject<LogEntry>(consumeResult.Message.Value);

                using (var scope = _serviceProvider.CreateScope())
                {
                    var context = scope.ServiceProvider.GetRequiredService<LogContext>();

                    context.LogEntries.Add(registration);
                    await context.SaveChangesAsync(stoppingToken);
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing Kafka message.");
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Don't forget to add the dependencies to your Program.cs file.

And that's it guys, a simple use case of event streaming using Kafka. I hope you liked it, stay tuned for more!

Top comments (0)