This article was originally published at: https://www.ahmetkucukoglu.com/en/event-sourcing-with-aspnet-core-02-messaging/
1. Introduction
Since this article is second part of the article below, I recommend you to read the following article before starting. We will continue with the sample project in the article below.
https://www.ahmetkucukoglu.com/en/event-sourcing-with-asp-net-core-01-store/
In the previous article, we made the example of Kanban Board. By creating a RESTful API, we wrote the create, assign, move and complete endpoints. We recorded the requests coming to these endpoints as an event in the Event Store. So we focused on the store part of the Event Store. In this article, we will focus on with the messaging part.
We will include the following endpoint in our RESTful API endpoints.
[GET] api/tasks/{id}
2. Preparing Couchbase
Since the events are not queryable, the events must be stored as nosql documents or sql row to represent the whole. For example; We can't get tasks which their states appear as "Done, from the Event Store. For this reason, we need to keep the events as a whole in a queryable database. In this example, we will keep the tasks on the couchbase.
We run Couchbase in Docker with the command line below.
docker run -d --name couchbase -p 8091-8094:8091-8094 -p 11210:11210 couchbase
When the Couchbase has been run, you should enter the panel at the address below and create a cluster.
Let's create two buckets named "checkpoints" and "tasks" in Couchbase with the curl command lines below. Remember to replace the -u parameter with the information you have provided when creating the cluster.
curl -X POST -u [admin]:[password] http://localhost:8091/pools/default/buckets
-d name=checkpoints -d ramQuotaMB=100 -d authType=none -d bucketType=couchbase
curl -X POST -u [admin]:[password] http://localhost:8091/pools/default/buckets
-d name=tasks -d ramQuotaMB=100 -d authType=none -d bucketType=couchbase
3. Adapting Couchbase to API
We install the packages "CouchbaseNetClient" and "Couchbase.Extensions.DependencyInjection" to the API project with the following command lines.
dotnet add package CouchbaseNetClient -v 2.7.16
dotnet add package Couchbase.Extensions.DependencyInjection -v 2.0.2
We add Couchbase connection information to appsettings.json as follows.
{ | |
"EventStore": { | |
"ConnectionString": "ConnectTo=tcp://admin:changeit@localhost:1113; DefaultUserCredentials=admin:changeit;", | |
"ConnectionName": "Meetup" | |
}, | |
"Couchbase": { | |
"ConnectionString": "couchbase://localhost", | |
"Username": "admin", | |
"Password": "123456" | |
}, | |
"Logging": { | |
"LogLevel": { | |
"Default": "Information", | |
"Microsoft": "Warning", | |
"Microsoft.Hosting.Lifetime": "Information" | |
} | |
}, | |
"AllowedHosts": "*" | |
} |
In the Startup.cs file, we configure the Couchbase.
namespace EventSourcingTaskApp | |
{ | |
using Couchbase.Extensions.DependencyInjection; | |
using EventSourcingTaskApp.HostedServices; | |
using EventSourcingTaskApp.Infrastructure; | |
using EventStore.ClientAPI; | |
using Microsoft.AspNetCore.Builder; | |
using Microsoft.AspNetCore.Hosting; | |
using Microsoft.Extensions.Configuration; | |
using Microsoft.Extensions.DependencyInjection; | |
using Microsoft.Extensions.Hosting; | |
public class Startup | |
{ | |
public Startup(IConfiguration configuration) | |
{ | |
Configuration = configuration; | |
} | |
public IConfiguration Configuration { get; } | |
public void ConfigureServices(IServiceCollection services) | |
{ | |
var eventStoreConnection = EventStoreConnection.Create( | |
connectionString: Configuration.GetValue<string>("EventStore:ConnectionString"), | |
builder: ConnectionSettings.Create().KeepReconnecting(), | |
connectionName: Configuration.GetValue<string>("EventStore:ConnectionName")); | |
eventStoreConnection.ConnectAsync().GetAwaiter().GetResult(); | |
services.AddSingleton(eventStoreConnection); | |
services.AddTransient<AggregateRepository>(); | |
services.AddCouchbase((opt) => | |
{ | |
opt.ConnectionString = Configuration.GetValue<string>("Couchbase:ConnectionString"); | |
opt.Username = Configuration.GetValue<string>("Couchbase:Username"); | |
opt.Password = Configuration.GetValue<string>("Couchbase:Password"); | |
}); | |
services.AddControllers(); | |
} | |
public void Configure(IApplicationBuilder app, IWebHostEnvironment env) | |
{ | |
if (env.IsDevelopment()) | |
{ | |
app.UseDeveloperExceptionPage(); | |
} | |
app.UseRouting(); | |
app.UseEndpoints(endpoints => | |
{ | |
endpoints.MapControllers(); | |
}); | |
} | |
} | |
} |
Each event has a position value in the Event Store. After recording the events read from the Event Store to Couchbase, we must keep the position value of the last event read. We will store this position value in the CheckpointDocument type.
Let's add a class named "CheckpointDocument.cs" in the Infrastructure folder. Let's paste the code below into the class.
namespace EventSourcingTaskApp.Infrastructure | |
{ | |
using EventStore.ClientAPI; | |
public class CheckpointDocument | |
{ | |
public string Key { get; set; } | |
public Position Position { get; set; } | |
} | |
} |
We write the repository class to read the last position value that we have written to Couchbase or to record the position value.
Let's add a class named "CheckpointRepository.cs" in the Infrastructure folder. Let's paste the code below into the class.
namespace EventSourcingTaskApp.Infrastructure | |
{ | |
using Couchbase; | |
using Couchbase.Core; | |
using Couchbase.Extensions.DependencyInjection; | |
using EventStore.ClientAPI; | |
using System.Threading.Tasks; | |
public class CheckpointRepository | |
{ | |
private readonly IBucket _bucket; | |
public CheckpointRepository(IBucketProvider bucketProvider) | |
{ | |
_bucket = bucketProvider.GetBucket("checkpoints"); | |
} | |
public async Task<Position?> GetAsync(string key) | |
{ | |
var result = await _bucket.GetAsync<CheckpointDocument>(key); | |
if (result.Value == null) | |
return null; | |
return result.Value.Position; | |
} | |
public async Task<bool> SaveAsync(string key, Position position) | |
{ | |
var doc = new Document<CheckpointDocument> | |
{ | |
Id = key, | |
Content = new CheckpointDocument | |
{ | |
Key = key, | |
Position = position | |
} | |
}; | |
var result = await _bucket.UpsertAsync(doc); | |
return result.Success; | |
} | |
} | |
} |
We will store task information in TaskDocument type. Let's add a class named "TaskDocument.cs" in the Infrastructure folder. Let's paste the code below into the class.
namespace EventSourcingTaskApp.Infrastructure | |
{ | |
using EventSourcingTaskApp.Core; | |
using System; | |
public class TaskDocument | |
{ | |
public Guid Id { get; set; } | |
public string Title { get; set; } | |
public string CreatedBy { get; set; } | |
public string AssignedTo { get; set; } | |
public BoardSections Section { get; set; } | |
public string CompletedBy { get; set; } | |
} | |
} |
We write the repository class to write the events read from the Event Store to Couchbase and to query the tasks from Couchbase.
Let's add a class named "TaskRepository.cs" in the Infrastructure folder. Let's paste the code below into the class.
namespace EventSourcingTaskApp.Infrastructure | |
{ | |
using Couchbase; | |
using Couchbase.Core; | |
using Couchbase.Extensions.DependencyInjection; | |
using EventSourcingTaskApp.Core; | |
using EventSourcingTaskApp.Core.Events; | |
using System; | |
using System.Threading.Tasks; | |
public class TaskRepository | |
{ | |
private readonly IBucket _bucket; | |
public TaskRepository(IBucketProvider bucketProvider) | |
{ | |
_bucket = bucketProvider.GetBucket("tasks"); | |
} | |
public void Save(object @event) | |
{ | |
switch (@event) | |
{ | |
case CreatedTask x: OnCreated(x); break; | |
case AssignedTask x: OnAssigned(x); break; | |
case MovedTask x: OnMoved(x); break; | |
case CompletedTask x: OnCompleted(x); break; | |
} | |
} | |
public async Task<TaskDocument> Get(Guid taskId) | |
{ | |
var documentResult = await _bucket.GetDocumentAsync<TaskDocument>(taskId.ToString()); | |
return documentResult.Document.Content; | |
} | |
private async void OnCreated(CreatedTask @event) | |
{ | |
var document = new Document<TaskDocument> | |
{ | |
Id = @event.TaskId.ToString(), | |
Content = new TaskDocument | |
{ | |
Id = @event.TaskId, | |
Title = @event.Title, | |
Section = BoardSections.Open | |
} | |
}; | |
await _bucket.InsertAsync(document); | |
} | |
private async void OnAssigned(AssignedTask @event) | |
{ | |
await _bucket.MutateIn<TaskDocument>(@event.TaskId.ToString()) | |
.Replace("assignedTo", @event.AssignedTo) | |
.ExecuteAsync(); | |
} | |
private async void OnMoved(MovedTask @event) | |
{ | |
await _bucket.MutateIn<TaskDocument>(@event.TaskId.ToString()) | |
.Replace("section", @event.Section) | |
.ExecuteAsync(); | |
} | |
private async void OnCompleted(CompletedTask @event) | |
{ | |
await _bucket.MutateIn<TaskDocument>(@event.TaskId.ToString()) | |
.Replace("completedBy", @event.CompletedBy) | |
.ExecuteAsync(); | |
} | |
} | |
} |
In the save method, we add the relevant task according to the type of the event recorded in the EventStore and update the related information of the task.
In the Get method, we query the tasks from Couchbase by id .
Let's add CheckpointRepository and TaskRepository classes to DI Container in Startup.cs file as below.
namespace EventSourcingTaskApp | |
{ | |
using Couchbase.Extensions.DependencyInjection; | |
using EventSourcingTaskApp.HostedServices; | |
using EventSourcingTaskApp.Infrastructure; | |
using EventStore.ClientAPI; | |
using Microsoft.AspNetCore.Builder; | |
using Microsoft.AspNetCore.Hosting; | |
using Microsoft.Extensions.Configuration; | |
using Microsoft.Extensions.DependencyInjection; | |
using Microsoft.Extensions.Hosting; | |
public class Startup | |
{ | |
public Startup(IConfiguration configuration) | |
{ | |
Configuration = configuration; | |
} | |
public IConfiguration Configuration { get; } | |
public void ConfigureServices(IServiceCollection services) | |
{ | |
var eventStoreConnection = EventStoreConnection.Create( | |
connectionString: Configuration.GetValue<string>("EventStore:ConnectionString"), | |
builder: ConnectionSettings.Create().KeepReconnecting(), | |
connectionName: Configuration.GetValue<string>("EventStore:ConnectionName")); | |
eventStoreConnection.ConnectAsync().GetAwaiter().GetResult(); | |
services.AddSingleton(eventStoreConnection); | |
services.AddTransient<AggregateRepository>(); | |
services.AddCouchbase((opt) => | |
{ | |
opt.ConnectionString = Configuration.GetValue<string>("Couchbase:ConnectionString"); | |
opt.Username = Configuration.GetValue<string>("Couchbase:Username"); | |
opt.Password = Configuration.GetValue<string>("Couchbase:Password"); | |
}); | |
services.AddTransient<CheckpointRepository>(); | |
services.AddTransient<TaskRepository>(); | |
services.AddControllers(); | |
} | |
public void Configure(IApplicationBuilder app, IWebHostEnvironment env) | |
{ | |
if (env.IsDevelopment()) | |
{ | |
app.UseDeveloperExceptionPage(); | |
} | |
app.UseRouting(); | |
app.UseEndpoints(endpoints => | |
{ | |
endpoints.MapControllers(); | |
}); | |
} | |
} | |
} |
4. Preparing Consumer
We need the consumer to listen to the events from the Event Store. For this reason, we will use the Hosted Service in ASP.NET Core.
Let's add a folder named "HostedServices" to the project and add a class named "TaskHostedService.cs" into it. Let's paste the code below into the class.
namespace EventSourcingTaskApp.HostedServices | |
{ | |
using EventSourcingTaskApp.Core.Events; | |
using EventSourcingTaskApp.Infrastructure; | |
using EventStore.ClientAPI; | |
using Microsoft.Extensions.Hosting; | |
using Microsoft.Extensions.Logging; | |
using System; | |
using System.Text; | |
using System.Text.Json; | |
using System.Threading; | |
using System.Threading.Tasks; | |
public class TaskHostedService : IHostedService | |
{ | |
private readonly IEventStoreConnection _eventStore; | |
private readonly CheckpointRepository _checkpointRepository; | |
private readonly TaskRepository _taskRepository; | |
private readonly ILogger<TaskHostedService> _logger; | |
private EventStoreAllCatchUpSubscription subscription; | |
public TaskHostedService(IEventStoreConnection eventStore, CheckpointRepository checkpointRepository, TaskRepository taskRepository, ILogger<TaskHostedService> logger) | |
{ | |
_eventStore = eventStore; | |
_checkpointRepository = checkpointRepository; | |
_taskRepository = taskRepository; | |
_logger = logger; | |
} | |
public async Task StartAsync(CancellationToken cancellationToken) | |
{ | |
var lastCheckpoint = await _checkpointRepository.GetAsync("tasks"); | |
var settings = new CatchUpSubscriptionSettings( | |
maxLiveQueueSize: 10000, | |
readBatchSize: 500, | |
verboseLogging: false, | |
resolveLinkTos: false, | |
subscriptionName: "Tasks"); | |
subscription = _eventStore.SubscribeToAllFrom( | |
lastCheckpoint: lastCheckpoint, | |
settings: settings, | |
eventAppeared: async (sub, @event) => | |
{ | |
if (@event.OriginalEvent.EventType.StartsWith("$")) | |
return; | |
try | |
{ | |
var eventType = Type.GetType(Encoding.UTF8.GetString(@event.OriginalEvent.Metadata)); | |
var eventData = JsonSerializer.Deserialize(Encoding.UTF8.GetString(@event.OriginalEvent.Data), eventType); | |
if (eventType != typeof(CreatedTask) && eventType != typeof(AssignedTask) && eventType != typeof(MovedTask) && eventType != typeof(CompletedTask)) | |
return; | |
_taskRepository.Save(eventData); | |
await _checkpointRepository.SaveAsync("tasks", @event.OriginalPosition.GetValueOrDefault()); | |
} | |
catch (Exception exception) | |
{ | |
_logger.LogError(exception, exception.Message); | |
} | |
}, | |
liveProcessingStarted: (sub) => | |
{ | |
_logger.LogInformation("{SubscriptionName} subscription started.", sub.SubscriptionName); | |
}, | |
subscriptionDropped: (sub, subDropReason, exception) => | |
{ | |
_logger.LogWarning("{SubscriptionName} dropped. Reason: {SubDropReason}.", sub.SubscriptionName, subDropReason); | |
}); | |
} | |
public Task StopAsync(CancellationToken cancellationToken) | |
{ | |
subscription.Stop(); | |
return Task.CompletedTask; | |
} | |
} | |
} |
In the line 33, we query the position value of the last event recorded to Couchbase.
In the line 42, we subscribe to the Event Store and start listening to the events. If lastCheckpoint value is not given, all events are read repeatedly whenever we run the project. With lastCheckpoint, we state that we will read the events coming after the last position value we have read.
In the line 47, we skip in order not to take action on the Event Store's own events.
In the line 52, we find the type of the event which is read.
In the line 53, we deserialize the event according to its type.
In the line 55, we skip in order not to take action for events other than these events.
In the line 58, we record the task to Couchbase.
In the line 60, we record the position value of the event read from the Event Store to Couchbase.
5. Preparing API Endpoint
We add the endpoint where we will query the task to TasksController as follows.
namespace EventSourcingTaskApp.Controllers | |
{ | |
using EventSourcingTaskApp.Core; | |
using EventSourcingTaskApp.Infrastructure; | |
using Microsoft.AspNetCore.Mvc; | |
using System; | |
using System.Threading.Tasks; | |
[Route("api/tasks/{id}")] | |
[ApiController] | |
[Consumes("application/x-www-form-urlencoded")] | |
public class TasksController : ControllerBase | |
{ | |
private readonly AggregateRepository _aggregateRepository; | |
private readonly TaskRepository _taskRepository; | |
public TasksController(AggregateRepository aggregateRepository, TaskRepository taskRepository) | |
{ | |
_aggregateRepository = aggregateRepository; | |
_taskRepository = taskRepository; | |
} | |
[HttpGet] | |
public async Task<IActionResult> Get(Guid id) | |
{ | |
var task = await _taskRepository.Get(id); | |
return Ok(task); | |
} | |
[HttpPost, Route("create")] | |
public async Task<IActionResult> Create(Guid id, [FromForm] string title) | |
{ | |
var aggregate = await _aggregateRepository.LoadAsync<Core.Task>(id); | |
aggregate.Create(id, title, "Ahmet KÜÇÜKOĞLU"); | |
await _aggregateRepository.SaveAsync(aggregate); | |
return Ok(); | |
} | |
[HttpPatch, Route("assign")] | |
public async Task<IActionResult> Assign(Guid id, [FromForm] string assignedTo) | |
{ | |
var aggregate = await _aggregateRepository.LoadAsync<Core.Task>(id); | |
aggregate.Assign(assignedTo, "Ahmet KÜÇÜKOĞLU"); | |
await _aggregateRepository.SaveAsync(aggregate); | |
return Ok(); | |
} | |
[HttpPatch, Route("move")] | |
public async Task<IActionResult> Move(Guid id, [FromForm] BoardSections section) | |
{ | |
var aggregate = await _aggregateRepository.LoadAsync<Core.Task>(id); | |
aggregate.Move(section, "Ahmet KÜÇÜKOĞLU"); | |
await _aggregateRepository.SaveAsync(aggregate); | |
return Ok(); | |
} | |
[HttpPatch, Route("complete")] | |
public async Task<IActionResult> Complete(Guid id) | |
{ | |
var aggregate = await _aggregateRepository.LoadAsync<Core.Task>(id); | |
aggregate.Complete("Ahmet KÜÇÜKOĞLU"); | |
await _aggregateRepository.SaveAsync(aggregate); | |
return Ok(); | |
} | |
} | |
} |
Everything is ready now. We can run the API project. We can query the task we have added at the end of the previous article with the curl command line below.
curl https://localhost:44361/api/tasks/3a7daba9-872c-4f4d-8d6f-e9700d78c4f5
You can access the final version of the project from Github.
Good luck.
Top comments (0)