loading...
Cover image for Event-driven integration #3 - Storing events in the outbox table [ASPF02O|E042]

Event-driven integration #3 - Storing events in the outbox table [ASPF02O|E042]

joaofbantunes profile image João Antunes Originally published at blog.codingmilitia.com on ・8 min read

ASP.NET Core: From 0 to overkill (45 Part Series)

1) ASP.NET Core: From 0 to overkill - Intro 2) Episode 001 - The Reference Project - ASP.NET Core: From 0 to overkill 3 ... 43 3) Episode 002 - Project structure plus first application - ASP.NET Core: From 0 to overkill 4) Episode 003 - First steps with MVC - ASP.NET Core: From 0 to overkill 5) Episode 004 - The Program and Startup classes - ASP.NET Core: From 0 to overkill 6) Episode 005 - Dependency Injection - ASP.NET Core: From 0 to overkill 7) Episode 006 - Configuration - ASP.NET Core: From 0 to overkill 8) Episode 007 - Logging - ASP.NET Core: From 0 to overkill 9) Episode 008 - Middlewares - ASP.NET Core: From 0 to overkill 10) Episode 009 - MVC filters - ASP.NET Core: From 0 to overkill 11) Episode 010 - Async all the things - ASP.NET Core: From 0 to overkill 12) Episode 011 - Data access with Entity Framework Core - ASP.NET Core: From 0 to overkill 13) Episode 012 - Move to a Web API - ASP.NET Core: From 0 to overkill 14) Episode 013 - Starting the frontend with Vue.js - ASP.NET Core: From 0 to overkill 15) Episode 014 - Centralizing frontend state with Vuex - ASP.NET Core: From 0 to overkill 16) Episode 015 - Calling the Web API from the frontend - ASP.NET Core: From 0 to overkill 17) Episode 016 - Authentication with Identity and Razor Pages - ASP.NET Core: From 0 to overkill 18) Episode 017 - More Identity, more Razor Pages - ASP.NET Core: From 0 to overkill 19) Episode 018 - Internationalization - ASP.NET Core: From 0 to overkill 20) Episode 019 - Roles, claims and policies - ASP.NET Core: From 0 to overkill 21) Episode 020 - The backend for frontend and the HttpClient - ASP.NET Core: From 0 to overkill 22) Episode 021 - Integrating IdentityServer4 - Part 1 - Overview - ASP.NET Core: From 0 to overkill 23) Episode 022 - Integrating IdentityServer4 - Part 2 - Auth Service - ASP.NET Core: From 0 to overkill 24) Episode 023 - Integrating IdentityServer4 - Part 3 - API - ASP.NET Core: From 0 to overkill 25) Episode 024 - Integrating IdentityServer4 - Part 4 - Back for Front - ASP.NET Core: From 0 to overkill 26) Episode 025 - Integrating IdentityServer4 - Part 5 - Frontend - ASP.NET Core: From 0 to overkill 27) Episode 026 - Getting started with Docker - ASP.NET Core: From 0 to overkill 28) Episode 027 - Up and running with Docker Compose - ASP.NET Core: From 0 to overkill 29) Episode 028 - Multiple service instances tweaks - ASP.NET Core: From 0 to overkill 30) Episode 029 - Simplifying the BFF with ProxyKit - ASP.NET Core: From 0 to overkill 31) Episode 030 - Analyzing performance with BenchmarkDotNet - ASP.NET Core: From 0 to overkill 32) Episode 031 - Some simple unit tests with xUnit - ASP.NET Core: From 0 to overkill 33) Episode 032 - Upgrading to ASP.NET Core 3.0 - ASP.NET Core: From 0 to overkill 34) E033 - Redesigning the API: Improving the internal architecture - ASPF02O 35) E034 - Segregating use cases with MediatR - ASPF02O 36) E035 - Experimenting with (yet) another approach to data access organization - ASPF02O 37) E036 - Making things more object oriented with rich domain entities - ASPF02O 38) Better use of types - avoiding nulls with an Optional type - ASPF02O|E037 39) More explicit domain error handling and fewer exceptions with Either and Error types [ASPF02O|E038] 40) Event-driven integration - Overview [ASPF02O|E039] 41) Event-driven integration #1 - Intro to the transactional outbox pattern [ASPF02O|E040] 42) Event-driven integration #2 - Inferring events from EF Core changes [ASPF02O|E041] 43) Event-driven integration #3 - Storing events in the outbox table [ASPF02O|E042] 44) Event-driven integration #4 - Outbox publisher (feat. IHostedService & Channels) [ASPF02O|E043] 45) Event-driven integration #5 - Quick intro to Apache Kafka [ASPF02O|E044]

On the footsteps of the last episode, in this one we store the inferred events in the outbox table, doing so transactionally, so we have guarantees that any change will eventually result in a published event.

Note: depending on your preference, you can check out the following video, otherwise, skip to the written version below.

The playlist for the whole series is here.

Intro

In the previous episode, we implemented the detection of events based on EF Core changes. In this episode, we'll take a look at the event storage part of the transactional outbox pattern introduced in episode 40.

To situate ourselves, we can take a look at the diagram introduced in episode 40:

situating ourselves

The outbox table

The first thing we need is to create the outbox table, where we can store the events to later publish. We'll keep it simple with three columns, even though one of them is "special": an identifier, a DateTime when the event was created and a JSON column where we store the complete information about an event.

We'll use a JSON column for the event data, because it doesn't really make much sense to model with a bunch of columns all the possible properties the events will have, as it can be rather dynamic and doesn't provide any value. PostgreSQL has great support for JSON columns, being even used as a document database with some abstractions on top of it.

Starting with the class that'll map to the table, we have OutboxMessage:

Data\OutboxMessage.cs

public class OutboxMessage
{
    public OutboxMessage(DateTime createdAt, BaseAuthEvent @event)
    {
        CreatedAt = createdAt;
        Event = @event;
    }

    public long Id { get; private set; }

    public DateTime CreatedAt { get; private set; }

    public BaseAuthEvent Event { get; private set; }
}

The Event property will be the one mapped as JSON. To keep it simple, we have a class named BaseAuthEvent (we'll look at it in a minute) that'll be the base for all types of events published by the auth service.

As for configuring the mapping between the class and the database, we implement IEntityTypeConfiguration<OutboxMessage> as usual:

Infrastructure\Data\Configurations\OutboxMessageConfiguration.cs

public class OutboxMessageConfiguration : IEntityTypeConfiguration<OutboxMessage>
{
    public void Configure(EntityTypeBuilder<OutboxMessage> builder)
    {
        var settings = new JsonSerializerSettings
        {
            TypeNameHandling = TypeNameHandling.Objects
        };

        builder
            .HasKey(e => e.Id);

        builder
            .Property(e => e.Id)
            .UseIdentityAlwaysColumn();

        builder
            .Property(e => e.Event)
            .HasColumnType("json")
            .HasConversion(
                e => JsonConvert.SerializeObject(e, settings),
                e => JsonConvert.DeserializeObject<BaseAuthEvent>(e, settings));
    }
}

The Id property configuration is the same we already saw in past episodes. Now the interesting part is the Event property.

We start by setting the column type as json. If we didn't have to deal with inheritance, which we will because we'll be inheriting from BaseAuthEvent, that would be all that we need to do, as the PostgreSQL provider for EF Core (Npgsql) supports serializing things to JSON out of the box, using System.Text.Json.

As, unfortunately, System.Text.Json doesn't handle inheritance without custom code (at the time of writing), we can either implement that custom code or use something else. To keep it simple, we'll use Json.NET, which is capable of handling inheritance fine, provided we do the necessary configurations, which we can see in the code, by setting the TypeNameHandling property. What we're saying with this setting, is that when serializing objects, Json.NET should include the type name, so it's capable of figuring out what's the actual type it needs to deserialize data into. With the call to HasConversion, we setup the Event property serialization to be handled by Json.NET with the desired settings.

An example stored event:

{
    "$type": "CodingMilitia.PlayBall.Auth.Web.Data.Events.UserUpdatedEvent, CodingMilitia.PlayBall.Auth.Web",
    "UserId": "4138e44c-da10-4108-b3ab-4901eb27da5f",
    "UserName": "test2@test.com",
    "Id": "43ae30cf-105b-4354-aa61-8f7f765e81fc",
    "OccurredAt": "2020-04-25T15:25:37.5250567Z"
}

As a final note about this configuration, I had initially used the column type as jsonb not json, but ended up changing. The main difference is that json stores things in text format while jsonb stores things in a optimized binary format. From the docs:

The json and jsonb data types accept almost identical sets of values as input. The major practical difference is one of efficiency. The json data type stores an exact copy of the input text, which processing functions must reparse on each execution; while jsonb data is stored in a decomposed binary format that makes it slightly slower to input due to added conversion overhead, but significantly faster to process, since no reparsing is needed. jsonb also supports indexing, which can be a significant advantage.

As mentioned, the main advantage of jsonb is its usage in queries, which we won't really take advantage of, but that's not the main reason I ended up changing types (even though it's a good reason). The main reason is when storing the column data, jsonb might reorder the object properties. Normally this is no problem, but because Json.NET needs to find the $type property (seen in the example above) as the first property to know the type to deserialize things into, and PostgreSQL was moving the Id property to the first spot, things didn't work 🙂.

The final thing we need is to add the OutboxMessages property (a DbSet<OutboxMessage>) to the AuthDbContext class and add the migration.

dotnet ef migrations add CreateOutboxMessagesTable -c AuthDbContext

Modeling the events

Now to model the events. In previous episodes we defined that, for now, we'll have three types: user registered, user updated and user deleted. As mentioned in the previous section, we'll have a base class for the events, named BaseAuthEvent.

Data\Events\BaseAuthEvent.cs

public abstract class BaseAuthEvent
{
    public Guid Id { get; set; }

    public DateTime OccurredAt { get; set; } = DateTime.UtcNow;
}

Keeping things simple, we have an identifier and the DateTime in which the event occurred. In the future, we might need to add something more, but for now, it'll do.

As for the actual events, not much to them as well, as we don't have a ton of data to include.

Data\Events\UserRegisteredEvent.cs

public class UserRegisteredEvent : BaseAuthEvent
{
    public string UserId { get; set; }

    public string UserName { get; set; }
}

Data\Events\UserUpdatedEvent.cs

public class UserUpdatedEvent : BaseAuthEvent
{
    public string UserId { get; set; }

    public string UserName { get; set; }
}

Data\Events\UserDeletedEvent.cs

public class UserDeletedEvent : BaseAuthEvent
{
    public string UserId { get; set; }
}

Both UserRegisteredEvent and UserUpdatedEvent include the id of the user affected, as well as its username, as it's the only property that we care to inform the listening services about. UserDeletedEvent only needs the user id.

Mapping changes to events

With things in place at the database level, we need to implement the logic bits, starting with the mapping of the events.

In the last episode, we created the concept of event detectors, to which we provided the DbContext and they would check the change tracker for event worthy changes. We'll build upon that concept to map the events that are detected.

The IEventDetector interface becomes IEventMapper, with slight tweaks to the exposed method.

Data\IEventMapper.cs

public interface IEventMapper
{
    IEnumerable<OutboxMessage> Map(AuthDbContext db, DateTime occurredAt);
}

Instead of just detecting events, it now returns a collection of events detected, already mapped as an OutboxMessage instance. Just for optimization's sake, it also gets as a parameter the occurrence date/time.

In the AuthDbContext class, we can replace the event detection with the mapping.

Data\AuthDbContext.cs

public class AuthDbContext : IdentityDbContext<PlayBallUser>
{
    private readonly IEnumerable<IEventMapper> _eventMappers;

    public AuthDbContext(DbContextOptions<AuthDbContext> options, IEnumerable<IEventMapper> eventMappers)
        : base(options)
    {
        _eventMappers = eventMappers;
    }

    // ...

    public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = new CancellationToken())
    {
        var eventsDetected = GetEvents();

        // ...
    }

    private IReadOnlyCollection<OutboxMessage> GetEvents()
    {
        var now = DateTime.UtcNow;

        return _eventMappers
            .SelectMany(mapper => mapper.Map(this, now))
            .ToList();
    }
}

As for the event mapper implementations, they're similar to the detectors we saw in the past episode, with the extra mapping logic:

Infrastructure\Data\UserRegisteredEventMapper.cs

public class UserRegisteredEventMapper : IEventMapper
{
    public IEnumerable<OutboxMessage> Map(AuthDbContext db, DateTime occurredAt)
        => db
            .ChangeTracker
            .Entries<PlayBallUser>()
            .Where(entry => entry.State == EntityState.Added)
            .Select(entry =>
                new OutboxMessage(occurredAt,
                    new UserRegisteredEvent
                    {
                        Id = Guid.NewGuid(),
                        OccurredAt = occurredAt,
                        UserId = entry.Entity.Id,
                        UserName = entry.Entity.UserName
                    }));
}

Infrastructure\Data\UserUpdatedEventMapper.cs

public class UserUpdatedEventMapper : IEventMapper
{
    public IEnumerable<OutboxMessage> Map(AuthDbContext db, DateTime occurredAt)
    {
        const string UserNameProperty = nameof(PlayBallUser.UserName);

        return db
            .ChangeTracker
            .Entries<PlayBallUser>()
            .Where(entry => entry.State == EntityState.Modified
                            &&
                            entry.OriginalValues.GetValue<string>(UserNameProperty) !=
                            entry.CurrentValues.GetValue<string>(UserNameProperty))
            .Select(entry =>
                new OutboxMessage(occurredAt,
                    new UserUpdatedEvent
                    {
                        Id = Guid.NewGuid(),
                        OccurredAt = occurredAt,
                        UserId = entry.Entity.Id,
                        UserName = entry.Entity.UserName
                    }));
    }
}

Infrastructure\Data\UserDeletedEventMapper.cs

public class UserDeletedEventMapper : IEventMapper
{
    public IEnumerable<OutboxMessage> Map(AuthDbContext db, DateTime occurredAt)
        => db
            .ChangeTracker
            .Entries<PlayBallUser>()
            .Where(entry => entry.State == EntityState.Deleted)
            .Select(entry =>
                new OutboxMessage(occurredAt,
                    new UserDeletedEvent
                    {
                        Id = Guid.NewGuid(),
                        OccurredAt = occurredAt,
                        UserId = entry.Entity.Id
                    }));
}

Transactionally storing the events

This final section has the most pretentious title, but it is probably the most straightforward.

As we discussed some times already, to ensure reliability in event publishing, we need the events to be persisted in the same transaction as the actual changes. However, we don't need to do anything very special.

When calling SaveChanges, by default all the changes are persisted in a transaction, so all we need to do in our SaveChangesAsync override is add the outbox messages to the context before invoking the base class' implementation. We can see the complete (for now) implementation of SaveChangesAsync below.

Data\AuthDbContext.cs

public class AuthDbContext : IdentityDbContext<PlayBallUser>
{
    // ...

    public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = new CancellationToken())
    {
        var eventsDetected = GetEvents();
        AddEventsIfAny(eventsDetected);

        var result = await base.SaveChangesAsync(cancellationToken);

        // TODO: publish the events persisted in the outbox

        return result;
    }

    // ...

    private void AddEventsIfAny(IReadOnlyCollection<OutboxMessage> eventsDetected)
    {
        if (eventsDetected.Count > 0)
        {
            Set<OutboxMessage>().AddRange(eventsDetected);
        }
    }
}

And with this we have both the user account related changes and the outbox messages persisted transactionally. If something goes wrong when persisting things, an exception is thrown and nothing is committed.

Outro

That does it for this episode. We took care of the persistence part of the transactional outbox pattern, creating a new table to store the events, as well as storing things transactionally.

The main topics we looked at:

  • PostgreSQL and EF Core/Npgsql support for JSON columns (we barely scratched the surface though)
  • Transactionally persist changes and outbox messages with EF Core
  • Continue taking advantage of overriding SaveChanges to centralize some logic

In the next episode, we'll implement the outbox publisher, which will read the messages stored in the outbox table to publish to the event bus.

Links in the post:

The source code for this post is in the Auth repository, tagged as episode042.

Sharing and feedback always appreciated!

Thanks for stopping by, cyaz!

ASP.NET Core: From 0 to overkill (45 Part Series)

1) ASP.NET Core: From 0 to overkill - Intro 2) Episode 001 - The Reference Project - ASP.NET Core: From 0 to overkill 3 ... 43 3) Episode 002 - Project structure plus first application - ASP.NET Core: From 0 to overkill 4) Episode 003 - First steps with MVC - ASP.NET Core: From 0 to overkill 5) Episode 004 - The Program and Startup classes - ASP.NET Core: From 0 to overkill 6) Episode 005 - Dependency Injection - ASP.NET Core: From 0 to overkill 7) Episode 006 - Configuration - ASP.NET Core: From 0 to overkill 8) Episode 007 - Logging - ASP.NET Core: From 0 to overkill 9) Episode 008 - Middlewares - ASP.NET Core: From 0 to overkill 10) Episode 009 - MVC filters - ASP.NET Core: From 0 to overkill 11) Episode 010 - Async all the things - ASP.NET Core: From 0 to overkill 12) Episode 011 - Data access with Entity Framework Core - ASP.NET Core: From 0 to overkill 13) Episode 012 - Move to a Web API - ASP.NET Core: From 0 to overkill 14) Episode 013 - Starting the frontend with Vue.js - ASP.NET Core: From 0 to overkill 15) Episode 014 - Centralizing frontend state with Vuex - ASP.NET Core: From 0 to overkill 16) Episode 015 - Calling the Web API from the frontend - ASP.NET Core: From 0 to overkill 17) Episode 016 - Authentication with Identity and Razor Pages - ASP.NET Core: From 0 to overkill 18) Episode 017 - More Identity, more Razor Pages - ASP.NET Core: From 0 to overkill 19) Episode 018 - Internationalization - ASP.NET Core: From 0 to overkill 20) Episode 019 - Roles, claims and policies - ASP.NET Core: From 0 to overkill 21) Episode 020 - The backend for frontend and the HttpClient - ASP.NET Core: From 0 to overkill 22) Episode 021 - Integrating IdentityServer4 - Part 1 - Overview - ASP.NET Core: From 0 to overkill 23) Episode 022 - Integrating IdentityServer4 - Part 2 - Auth Service - ASP.NET Core: From 0 to overkill 24) Episode 023 - Integrating IdentityServer4 - Part 3 - API - ASP.NET Core: From 0 to overkill 25) Episode 024 - Integrating IdentityServer4 - Part 4 - Back for Front - ASP.NET Core: From 0 to overkill 26) Episode 025 - Integrating IdentityServer4 - Part 5 - Frontend - ASP.NET Core: From 0 to overkill 27) Episode 026 - Getting started with Docker - ASP.NET Core: From 0 to overkill 28) Episode 027 - Up and running with Docker Compose - ASP.NET Core: From 0 to overkill 29) Episode 028 - Multiple service instances tweaks - ASP.NET Core: From 0 to overkill 30) Episode 029 - Simplifying the BFF with ProxyKit - ASP.NET Core: From 0 to overkill 31) Episode 030 - Analyzing performance with BenchmarkDotNet - ASP.NET Core: From 0 to overkill 32) Episode 031 - Some simple unit tests with xUnit - ASP.NET Core: From 0 to overkill 33) Episode 032 - Upgrading to ASP.NET Core 3.0 - ASP.NET Core: From 0 to overkill 34) E033 - Redesigning the API: Improving the internal architecture - ASPF02O 35) E034 - Segregating use cases with MediatR - ASPF02O 36) E035 - Experimenting with (yet) another approach to data access organization - ASPF02O 37) E036 - Making things more object oriented with rich domain entities - ASPF02O 38) Better use of types - avoiding nulls with an Optional type - ASPF02O|E037 39) More explicit domain error handling and fewer exceptions with Either and Error types [ASPF02O|E038] 40) Event-driven integration - Overview [ASPF02O|E039] 41) Event-driven integration #1 - Intro to the transactional outbox pattern [ASPF02O|E040] 42) Event-driven integration #2 - Inferring events from EF Core changes [ASPF02O|E041] 43) Event-driven integration #3 - Storing events in the outbox table [ASPF02O|E042] 44) Event-driven integration #4 - Outbox publisher (feat. IHostedService & Channels) [ASPF02O|E043] 45) Event-driven integration #5 - Quick intro to Apache Kafka [ASPF02O|E044]

Posted on Dec 6 '18 by:

Discussion

markdown guide