DEV Community

Cristian Sifuentes
Cristian Sifuentes

Posted on

CQRS + Event Sourcing in .NET (C#) — A Production‑Minded Developer Guide (with a Smile)

CQRS + Event Sourcing in .NET (C#) — A Production‑Minded Developer Guide (with a Smile)

CQRS + Event Sourcing in .NET (C#) — A Production‑Minded Developer Guide (with a Smile)

CQRS and Event Sourcing are not “fancy CRUD.” They’re patterns for systems where scale, auditability, concurrency, and evolvability matter more than simplicity. Used well, they unlock clean write models, blazing‑fast reads, and a full timeline of truth. Used poorly, they become an expensive hobby.

This guide is a pragmatic, production‑minded walkthrough with code you can adapt immediately.


Table of Contents

  1. Release Overview: What You’re Building
  2. CQRS in One Sentence
  3. Event Sourcing in One Sentence
  4. Architecture Map: Commands → Events → Projections
  5. Domain: Product Catalog (Minimal but Real)
  6. Write Side 6.1 Commands 6.2 Events (Immutable) 6.3 Aggregate Root + Apply 6.4 Event Store Interface 6.5 Optimistic Concurrency
  7. Read Side 7.1 Projections 7.2 Idempotency 7.3 Rebuild / Replay
  8. API Layer: Minimal APIs
  9. Event Versioning (Without Crying)
  10. Common Pitfalls
  11. When to Use (and When Not To)
  12. Next Steps

Release Overview: What You’re Building

We’ll implement a small Product Catalog with:

  • Commands: AddProduct, UpdateProductPrice
  • Events: ProductAdded, ProductPriceUpdated
  • Event Store: interface + in‑memory example
  • Aggregate: reconstructs state by replaying events
  • Projection: builds a read model optimized for queries
  • Minimal API endpoints: write side and read side
  • Production concerns: optimistic concurrency, idempotency, event versioning

CQRS in One Sentence

CQRS separates writes (commands + business rules) from reads (query models optimized for retrieval), so each can scale and evolve independently.

Library analogy: the librarian records new books and changes; readers search efficiently in a catalog view.


Event Sourcing in One Sentence

Event Sourcing stores the history of changes as an append‑only stream of immutable events; the “current state” is derived by replaying that stream.

Bank statement analogy: your balance is computed from transactions, not stored as the single truth.


Architecture Map: Commands → Events → Projections

High‑level flow:

Client ──Command──▶ Command Handler ──append events──▶ Event Store
                      │
                      └──▶ Event Bus / Dispatcher ──▶ Projections ──▶ Read Store
Client ◀──────────── Query Handler / Read API ────────┘
Enter fullscreen mode Exit fullscreen mode

Key idea: write model is authoritative, read model is derived.


Domain: Product Catalog (Minimal but Real)

We’ll treat each Product as an Aggregate Root (the transaction boundary on the write side).

Rules:

  • Product name required.
  • Price must be >= 0.
  • Updates must be concurrency‑safe.

Write Side

Commands

Commands represent intent, not state.

public sealed record AddProduct(Guid ProductId, string Name, decimal Price);
public sealed record UpdateProductPrice(Guid ProductId, decimal Price);
Enter fullscreen mode Exit fullscreen mode

Events (Immutable)

Events represent facts that happened. Never mutate them.

public interface IDomainEvent
{
    Guid AggregateId { get; }
    long Version { get; init; }         // assigned by store
    DateTimeOffset AtUtc { get; init; } // assigned by store
}

public sealed record ProductAdded(Guid AggregateId, string Name, decimal Price) : IDomainEvent
{
    public long Version { get; init; }
    public DateTimeOffset AtUtc { get; init; }
}

public sealed record ProductPriceUpdated(Guid AggregateId, decimal Price) : IDomainEvent
{
    public long Version { get; init; }
    public DateTimeOffset AtUtc { get; init; }
}
Enter fullscreen mode Exit fullscreen mode

Pro tip: Keep events small, stable, and meaningful. “Event explosion” is real.

Aggregate Root + Apply

The aggregate rebuilds state by applying events.

It enforces business rules before emitting new events.

public sealed class ProductAggregate
{
    private readonly List<IDomainEvent> _uncommitted = new();

    public Guid Id { get; private set; }
    public string Name { get; private set; } = "";
    public decimal Price { get; private set; }
    public long Version { get; private set; } = 0;

    public IReadOnlyList<IDomainEvent> UncommittedEvents => _uncommitted;

    public static ProductAggregate FromHistory(IEnumerable<IDomainEvent> history)
    {
        var agg = new ProductAggregate();
        foreach (var e in history) agg.Apply(e);
        agg._uncommitted.Clear();
        return agg;
    }

    public void Handle(AddProduct cmd)
    {
        if (cmd.ProductId == Guid.Empty) throw new ArgumentException("ProductId required");
        if (string.IsNullOrWhiteSpace(cmd.Name)) throw new ArgumentException("Name required");
        if (cmd.Price < 0) throw new ArgumentOutOfRangeException(nameof(cmd.Price));

        if (Version != 0) throw new InvalidOperationException("Product already exists.");

        Raise(new ProductAdded(cmd.ProductId, cmd.Name.Trim(), cmd.Price));
    }

    public void Handle(UpdateProductPrice cmd)
    {
        if (Version == 0) throw new InvalidOperationException("Product does not exist.");
        if (cmd.Price < 0) throw new ArgumentOutOfRangeException(nameof(cmd.Price));

        Raise(new ProductPriceUpdated(cmd.ProductId, cmd.Price));
    }

    private void Raise(IDomainEvent e)
    {
        Apply(e);
        _uncommitted.Add(e);
    }

    private void Apply(IDomainEvent e)
    {
        switch (e)
        {
            case ProductAdded x:
                Id = x.AggregateId;
                Name = x.Name;
                Price = x.Price;
                break;

            case ProductPriceUpdated x:
                Price = x.Price;
                break;

            default:
                throw new NotSupportedException($"Unknown event: {e.GetType().Name}");
        }

        Version = e.Version; // store assigns; during replay it's already set
    }
}
Enter fullscreen mode Exit fullscreen mode

Event Store Interface

A production system typically uses a dedicated event store, but the interface should stay small:

public interface IEventStore
{
    Task<IReadOnlyList<IDomainEvent>> LoadAsync(Guid aggregateId, CancellationToken ct);
    Task AppendAsync(Guid aggregateId, long expectedVersion, IReadOnlyList<IDomainEvent> newEvents, CancellationToken ct);
}
Enter fullscreen mode Exit fullscreen mode

Optimistic Concurrency

Optimistic concurrency prevents lost updates:

  • Client reads events for aggregate (version N)
  • Client writes expecting version N
  • Store rejects if stream is already at version != N

A clean exception:

public sealed class ConcurrencyException : Exception
{
    public ConcurrencyException(string msg) : base(msg) {}
}
Enter fullscreen mode Exit fullscreen mode

In‑memory store example (illustrative):

public sealed class InMemoryEventStore : IEventStore
{
    private readonly object _lock = new();
    private readonly Dictionary<Guid, List<IDomainEvent>> _streams = new();

    public Task<IReadOnlyList<IDomainEvent>> LoadAsync(Guid aggregateId, CancellationToken ct)
    {
        lock (_lock)
        {
            _streams.TryGetValue(aggregateId, out var list);
            return Task.FromResult<IReadOnlyList<IDomainEvent>>(list?.ToList() ?? new List<IDomainEvent>());
        }
    }

    public Task AppendAsync(Guid aggregateId, long expectedVersion, IReadOnlyList<IDomainEvent> newEvents, CancellationToken ct)
    {
        lock (_lock)
        {
            if (!_streams.TryGetValue(aggregateId, out var list))
            {
                list = new List<IDomainEvent>();
                _streams[aggregateId] = list;
            }

            var current = list.Count == 0 ? 0 : list.Max(e => e.Version);
            if (current != expectedVersion)
                throw new ConcurrencyException($"Expected v{expectedVersion}, but stream is v{current}.");

            var v = current;
            foreach (var e in newEvents)
            {
                v++;
                var stamped = Stamp(e, v, DateTimeOffset.UtcNow);
                list.Add(stamped);
            }

            return Task.CompletedTask;
        }
    }

    private static IDomainEvent Stamp(IDomainEvent e, long version, DateTimeOffset atUtc)
    {
        return e switch
        {
            ProductAdded x => x with { Version = version, AtUtc = atUtc },
            ProductPriceUpdated x => x with { Version = version, AtUtc = atUtc },
            _ => throw new NotSupportedException(e.GetType().Name)
        };
    }
}
Enter fullscreen mode Exit fullscreen mode

Read Side

Reads should be fast and simple.

Your read model can be a different database, a view, Redis, Elastic, etc.

Projections

Projection = “apply events to build query‑optimized state”.

public sealed record ProductDetails(Guid ProductId, string Name, decimal Price, long Version);

public interface IReadModel
{
    void Apply(IDomainEvent e);
    ProductDetails? Get(Guid id);
}
Enter fullscreen mode Exit fullscreen mode

In‑memory read model:

using System.Collections.Concurrent;

public sealed class InMemoryProductReadModel : IReadModel
{
    private readonly ConcurrentDictionary<Guid, ProductDetails> _products = new();

    public void Apply(IDomainEvent e)
    {
        switch (e)
        {
            case ProductAdded x:
                _products[x.AggregateId] = new ProductDetails(x.AggregateId, x.Name, x.Price, x.Version);
                break;

            case ProductPriceUpdated x:
                if (_products.TryGetValue(x.AggregateId, out var p))
                    _products[x.AggregateId] = p with { Price = x.Price, Version = x.Version };
                break;
        }
    }

    public ProductDetails? Get(Guid id) => _products.TryGetValue(id, out var p) ? p : null;
}
Enter fullscreen mode Exit fullscreen mode

Idempotency

In real systems, events might be delivered more than once.

Make projections idempotent:

  • Track last processed version per aggregate.
  • Ignore events with version <= last processed.

Rebuild / Replay

Because reads are derived, you can rebuild projections:

public static async Task RebuildAsync(IEventStore store, IReadModel read, IEnumerable<Guid> aggregateIds, CancellationToken ct)
{
    foreach (var id in aggregateIds)
    {
        var events = await store.LoadAsync(id, ct);
        foreach (var e in events) read.Apply(e);
    }
}
Enter fullscreen mode Exit fullscreen mode

API Layer: Minimal APIs

Split routes mentally:

  • Command API (writes)
  • Query API (reads)
var builder = WebApplication.CreateBuilder(args);

builder.Services.AddSingleton<IEventStore, InMemoryEventStore>();
builder.Services.AddSingleton<IReadModel, InMemoryProductReadModel>();

var app = builder.Build();

app.MapPost("/products", async (AddProduct cmd, IEventStore store, IReadModel read, CancellationToken ct) =>
{
    var history = await store.LoadAsync(cmd.ProductId, ct);
    var agg = ProductAggregate.FromHistory(history);

    agg.Handle(cmd);

    var expected = agg.Version - agg.UncommittedEvents.Count;
    await store.AppendAsync(cmd.ProductId, expected, agg.UncommittedEvents, ct);

    // Demo: project inline. Production: async dispatcher/outbox.
    foreach (var e in agg.UncommittedEvents) read.Apply(e);

    return Results.Accepted($"/products/{cmd.ProductId}");
});

app.MapPut("/products/{id:guid}/price", async (Guid id, UpdateProductPrice body, IEventStore store, IReadModel read, CancellationToken ct) =>
{
    var history = await store.LoadAsync(id, ct);
    var agg = ProductAggregate.FromHistory(history);

    agg.Handle(body with { ProductId = id });

    var expected = agg.Version - agg.UncommittedEvents.Count;
    await store.AppendAsync(id, expected, agg.UncommittedEvents, ct);

    foreach (var e in agg.UncommittedEvents) read.Apply(e);

    return Results.NoContent();
});

app.MapGet("/products/{id:guid}", (Guid id, IReadModel read) =>
{
    var p = read.Get(id);
    return p is null ? Results.NotFound() : Results.Ok(p);
});

app.Run();
Enter fullscreen mode Exit fullscreen mode

Production note: projection should typically be async (event bus / outbox). Here we project inline to keep the demo focused.


Event Versioning (Without Crying)

Events live forever. Your schema will evolve. Plan it.

Practical strategy:

  • Keep old events readable.
  • Add new events rather than mutate meaning.
  • Use EventType + SchemaVersion in an envelope and upcast during load.

Common Pitfalls

  • Over‑engineering: CQRS/ES for basic CRUD = pain.
  • Event explosion: events should be business‑meaningful, not “PropertyXChanged”.
  • Skipping idempotency: projections must tolerate duplicates.
  • No concurrency control: you’ll lose updates under load.
  • Leaking domain into read model: keep reads flat, fast, denormalized.
  • Synchronous projection everywhere: prefer async with an outbox.

When to Use (and When Not To)

Use when:

  • You need auditability and “time travel”.
  • Writes are complex, with rich business rules.
  • Reads need to scale independently.
  • You expect integration via events (EDA).

Avoid when:

  • Your app is mostly CRUD with low complexity.
  • Team maturity or ops maturity isn’t ready.
  • You can’t tolerate eventual consistency on reads.

Next Steps

  • Add Outbox pattern to publish events reliably.
  • Swap in a real store (EventStoreDB, Kafka log + persistence, Cosmos DB, SQL stream tables).
  • Add Sagas / Process Managers for long‑running workflows.
  • Add snapshots for large aggregates.
  • Add observability: correlation IDs, event tracing, replay metrics.

TL;DR

CQRS gives you a clean separation: commands enforce rules, queries stay fast.

Event Sourcing gives you a durable timeline of truth: events are state.

Together, they enable scalable, auditable systems—when you actually need them.

Happy shipping. 🚀

Top comments (1)

Collapse
 
shemith_mohanan_6361bb8a2 profile image
shemith mohanan

This is one of the clearest “no-nonsense” CQRS + ES writeups I’ve read 👍
Really liked how you framed it as not fancy CRUD and kept the focus on production concerns (concurrency, idempotency, versioning).
The smile + realism balance is spot on — helpful for devs who are curious and cautious. Bookmarked 🚀