DEV Community

Cover image for Event Sourcing beyond the theory
Breno Silva
Breno Silva

Posted on

Event Sourcing beyond the theory

Event Sourcing has always been an interesting subject to me. I used to work at a manufacturing company, specifically on a product that handled inventory management, so event sourcing felt like a natural fit for that domain. However, the concept was never easy to grasp as a junior developer. To make things worse, we were constantly under pressure from deadlines and new feature requests, which meant we never really had the time to properly learn and understand how to apply it.

Fast-forward to today, now working in a healthcare company, I decided it would be worth revisiting Event Sourcingβ€”if only to have it in my toolbox in case the right use case comes along. But while learning, I realized something: although I could understand the concepts, I still had no idea how to actually structure an application around them.

Even though I like to think I have a decent understanding of clean architecture and separation of concerns (or at least I want to believe that I do πŸ˜„), I was having a really hard time. The lack of concrete, real-world examples, in addition to the mental effort of adapting to the differences between Event Sourcing and traditional CRUD-based systems, made my head spin!

So this is my journey trying to figure out not only Event Sourcing itself, but also how to actually build an application around it.

First things first, I won't delve too deeply into event sourcing theory, because there are already a lot of great resources out there that explain it far better than I ever could, but let's go through the basic concepts. If you want to learn more about Event Sourcing in depth, here are some good resources

Event Sourcing is a pattern that asks, what if, instead of storing the current state of an entity, you store all the events that have occurred over time, and use those events to reconstruct the state at any given point in time. This allows a system to be replayable, auditable, and hopefully scalable. These characteristics make Event Sourcing a great candidate for domains like financial systems, logistics, and healthcare.

So, the first building block is the event, which represents a fact that has already occurred in the system. Events are immutable and serializable, and are stored in an event store, an infrastructure component responsible for persisting them. This can be as simple as a PostgreSQL database or a specialized solution like EventStoreDB or KurrentDB.

Next, we have command handlers (sometimes called deciders), which live in the domain layer. Their responsibility is to evaluate incoming commands against the current state and business rules, deciding whether the operation is valid. If it is, they produce new events.

At the center of this sits the aggregate, which encapsulates the business rules and maintains consistency. It rebuilds its state by replaying past events and ensures domain consistency.

Finally, we have projections, which form the read layer. Projections consume events and transform them into queryable views, reconstructing the current state in a way that is optimized for reading. I won't go into details about projections in this article, but they can also be tricky to handle.

Although these layers were not clear to me in the beginning, after fiddling around a little bit, and mixing up a lot of aggregates and business rules in all the wrong places, making tests miserable, I came up with this proposed architecture:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                     Application Layer                       β”‚
β”‚  (Express, Fastify, CLI, etc.) – triggers commands          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                    β”‚
                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  Domain Layer (Pure)                        β”‚
β”‚  β€’ Aggregates (state + reducer)                             β”‚
β”‚  β€’ Command handlers                                         β”‚
β”‚  β€’ Domain errors                                            β”‚
β”‚  β€’ Projection definitions (if read‑side)                    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                    β”‚
                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚               Orchestration Layer (Core)                    β”‚
β”‚  β€’ executeCommand, loadAggregate, createAggregate           β”‚
β”‚  β€’ projection                                               β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                    β”‚
                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              Infrastructure Layer (Adapters)                β”‚
β”‚  β€’ EventStore implementation (PostgreSQL, DynamoDB, etc.)   β”‚
β”‚  β€’ Snapshot store (Redis, S3)                               β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Enter fullscreen mode Exit fullscreen mode

So let's go through this piece by piece and look at some actual code. Let's use a bank account example. An account can be created, funds can be deposited or withdrawn, and the account balance can be queried. Let's keep it simple πŸ˜„

A naive implementation of withdraw functionality, without understanding how those pieces should be separated, might look something like this:

class BankAccountService {
  constructor(
    private readonly db: Database,
    private readonly eventBus: EventBus,
  ) {}

  async withdraw(command: WithdrawMoneyCommand) {
    // Load all events for the account
    const events = await this.db.loadEvents(command.accountId);

    let balance = 0;

    for (const event of events) {
      // Rebuild balance from past events
      if (event.type === "MoneyDeposited") {
        balance += event.amount;
      }

      if (event.type === "MoneyWithdrawn") {
        balance -= event.amount;
      }
    }

    if (balance < command.amount) {
      throw new Error("Insufficient funds");
    }

    const newEvent = {
      type: "MoneyWithdrawn",
      amount: command.amount,
      occurredAt: new Date(),
    };

    await this.db.appendEvent(command.accountId, newEvent);

    await this.eventBus.publish(newEvent);

    return {
      success: true,
      balance: balance - command.amount,
    };
  }
}
Enter fullscreen mode Exit fullscreen mode

To untrained eyes, this doesn't look too bad. The logic is straightforward and clear and it will actually work if you wire it up correctly. But the cracks start to show once you begin testing it (as they usually do). Even a simple business rule test now requires mocking the database and the event bus, which is usually a pretty bad sign. Then we realize that this class is doing way too much. It's responsible for rebuilding state, enforcing business rules, persisting events and orchestrating side effects all at once.

Once we start expanding the functionality, these flaws will become more and more apparent. Adding new business rules can accidentally break event publishing, adding new events can lead to inconsistent state, and orchestrating events can lead to race conditions. This will become a nightmare to maintain.

Let's try again, this time moving the state reconstruction logic into an aggregate:

type BankAccountEvent =
  | { type: "AccountOpened" }
  | { type: "MoneyDeposited"; amount: number }
  | { type: "MoneyWithdrawn"; amount: number };

// information needed for a withdraw money
type WithdrawMoneyCommand = {
  accountId: string;
  amount: number;
};

// Understands how state evolves given each event
class BankAccountAggregate {
  constructor(public balance = 0) {}

  apply(event: BankAccountEvent) {
    switch (event.type) {
      case "MoneyDeposited":
        this.balance += event.amount;
        break;

      case "MoneyWithdrawn":
        this.balance -= event.amount;
        break;
    }
  }

  static rehydrate(events: BankAccountEvent[]) {
    const aggregate = new BankAccountAggregate();

    for (const event of events) {
      aggregate.apply(event);
    }

    return aggregate;
  }
}

class WithdrawMoneyHandler {
  constructor(
    private readonly eventStore: EventStore,
    private readonly eventBus: EventBus,
  ) {}

  async execute(command: WithdrawMoneyCommand) {
    const events = await this.eventStore.load(command.accountId);

    // rebuilding the current state of the account
    const aggregate = BankAccountAggregate.rehydrate(events);

    if (aggregate.balance < command.amount) {
      throw new Error("Insufficient funds");
    }

    const newEvent = {
      type: "MoneyWithdrawn",
      amount: command.amount,
    };

    await this.eventStore.append(
      command.accountId,
      newEvent,
    );

    await this.eventBus.publish(newEvent);

    aggregate.apply(newEvent);

    return aggregate;
  }
}
Enter fullscreen mode Exit fullscreen mode

This version looks cleaner and more maintainable. Every time that we want to change how the state is rebuilt, we only need to update the apply method. The aggregate is much easier to test now, since we don't have to mock the database or event bus. But this code still has some important issues. The command handler is still doing a lot of work, loading events, rebuilding aggregates and appending events. The aggregate class is still mutable. It modifies its own internal state directly instead of returning a new state. That means tests can start depending on execution order and shared state.

This led me to realize that there's an important separation that needs to be done: deciding what should happen and orchestrating how it should happen.

Let's try improving that one more time:

type BankAccountEvent =
  | { type: "AccountOpened" }
  | { type: "MoneyDeposited"; amount: number }
  | { type: "MoneyWithdrawn"; amount: number };

type BankAccountState = {
  balance: number;
};

const initialState: BankAccountState = {
  balance: 0,
};

function bankAccountReducer(
  state: BankAccountState,
  event: BankAccountEvent,
): BankAccountState {
  switch (event.type) {
    case "MoneyDeposited":
      return {
        ...state,
        balance: state.balance + event.amount,
      };

    case "MoneyWithdrawn":
      return {
        ...state,
        balance: state.balance - event.amount,
      };

    default:
      return state;
  }
}
Enter fullscreen mode Exit fullscreen mode

So this time we introduced the concept of reducer, which is a pure function that has no mutations and no persistence. It simply returns a new state given the current state and an event.

(state, event) => newState
Enter fullscreen mode Exit fullscreen mode

Then we can have a command handler, which actually decides what should happen, according to the business rules (don't mind the Ok and Err, we will get into them later):

type WithdrawMoneyCommand = {
  accountId: string;
  amount: number;
};

// command handler
function withdrawMoney(
  state: BankAccountState,
  command: WithdrawMoneyCommand,
) {
  if (command.amount <= 0) {
    return Err({
      type: "InvalidAmount",
    });
  }

  if (state.balance < command.amount) {
    return Err({
      type: "InsufficientFunds",
    });
  }

  return Ok([
    {
      type: "MoneyWithdrawn",
      amount: command.amount,
    },
  ]);
}
Enter fullscreen mode Exit fullscreen mode

It's important to realize that the command handler doesn't know anything about how we rebuilt the state. It doesn't know anything about the database or event bus. It only knows how to decide whether withdrawing money is valid, given the passed state and command. This function becomes very easy to test because it has no infrastructure concerns.

And lastly, we have the orchestration of everything:

async function executeCommand({
  store,
  streamId,
  reducer,
  handler,
  command,
}) {
  const stream = await store.load(streamId);

  if (!stream.ok) {
    return stream;
  }

  const currentState = stream.events.reduce(
    reducer,
    initialState,
  );

  const decision = handler(
    currentState,
    command,
  );

  if (!decision.ok) {
    return decision;
  }

  const appendResult = await store.append({
    streamId,
    expectedVersion: stream.version,
    events: decision.value,
  });

  if (!appendResult.ok) {
    return appendResult;
  }

  const newState = decision.value.reduce(
    reducer,
    currentState,
  );

  return Ok({
    state: newState,
    events: decision.value,
  });
}
Enter fullscreen mode Exit fullscreen mode

The command executor just puts all the pieces together: it loads the stream, rebuilds the state, executes the command, and appends the events if successful. Finally, we rebuild the new state and return it. See, this function doesn't even know if we're talking about a bank account or a shopping cart. This function is generic and can be used with any stream and reducer and that's the whole point.

Introducing ts-event-sourcing

So, to make things easier for me (and you), I decided to create a library to encapsulate the concepts that we just went through. The idea of the library is to give you a simple, purely functional and type-safe way to implement event sourcing in your applications, helping you to put the right logic in the right place. There are some parts of the API that are intentionally opinionated that you might like (or not).

Let's go over the example once again, but with the library functions:

import { AggregateDefinition, matchEvent } from "ts-event-sourcing/core"

type AccountOpened  = { type: "AccountOpened";  ownerId: string }
type MoneyDeposited = { type: "MoneyDeposited"; amount: number }
type MoneyWithdrawn = { type: "MoneyWithdrawn"; amount: number }

type AccountEvent = AccountOpened | MoneyDeposited | MoneyWithdrawn

type AccountState = {
  ownerId: string
  balance: number
}

const accountAggregate: AggregateDefinition<AccountState, AccountEvent> = {
  initialState: { ownerId: "", balance: 0 },
  reduce: (state, event) =>
    matchEvent(event, {
      AccountOpened:  (e) => ({ ownerId: e.ownerId, balance: 0 }),
      MoneyDeposited: (e) => ({ ...state, balance: state.balance + e.amount }),
      MoneyWithdrawn: (e) => ({ ...state, balance: state.balance - e.amount }),
    }),
}
Enter fullscreen mode Exit fullscreen mode

So, this is very similar to the example we went through earlier, but with some sugar on top of it. The first one is the AggregateDefinition type, which defines a format that has a initialState and a reduce function. This type guarantees that the initial state and reducer share the correct types. It also enforces the reducer signature: (state, event) => newState

One other cool thing is the "matchEvent" function. This one is an exhaustive event matcher and unlike a switch statement, it is an object where every key in your event union must appear. If you add a new event type and forget to update a reducer, the project won't compile. This is one of the library's central guarantees.

Then, we have the command handler:

// Domain errors
type AccountError =
  | { type: "INVALID_AMOUNT" }
  | { type: "INSUFFICIENT_FUNDS" }

type WithdrawMoney = { amount: number }

const withdrawMoneyHandler: CommandHandler<AccountState, WithdrawMoney, AccountEvent, AccountError> =
  ({ state, command }) => {
    if (command.amount <= 0) {
      return Err({ type: "INVALID_AMOUNT" })
    }

    if (state.balance < command.amount) {
      return Err({ type: "INSUFFICIENT_FUNDS" })
    }

    return Ok([{ type: "MoneyWithdrawn", amount: command.amount }])
  }

const withdrawMoney = defineCommand({ aggregate: accountAggregate, handler: withdrawMoneyHandler })
Enter fullscreen mode Exit fullscreen mode

Here we have some other cool things to do with commands. First we define domain errors, meaning possible issues that we can have, according to our business rules. This is especially useful for error handling, making sure that we're sending the correct message to downstream services. Then we have the CommandHandler type, which is a generic type that defines the shape of a command handler function. This function can return either domain events or domain errors.

One other interesting thing that I hinted earlier is the Ok and Err types. The library uses the Result type pattern to indicate success or failure of a command handler function. The Ok type wraps the result of the function, while the Err type wraps the domain error. This means that the library never throws an error, instead you have to handle it in your application code, according to the return of each function.

Finally, we have the defineCommand function, which is used to bind a command handler function to an aggregate. This is a helper to avoid having a lot of boilerplate code for each command handler.

So, the usage is simply:

const store = new InMemoryEventStore<AccountEvent>()

await withdrawMoney.execute({
  store,
  streamId: "acc-1",
  command: { amount: 40 },
  idempotencyKey: "wd-1", // idempotency key used in the library to avoid duplicate events
})
Enter fullscreen mode Exit fullscreen mode

Store, as we explained before, is the infrastructure layer that stores the events. The library provides a in-memory store that you can use for development and testing purposes. You can implement your own store by implementing the EventStore interface and it should work exactly the same.

And then, you can call execute on the defined command to execute it, which under the hood will call a function very similar to what we defined in executeCommand in the last example. Keeping things simple and clear.

So, in summary, you have each part with its own responsibility, never stepping into each other's code. You define the events, which are the facts that happen to your entity. You define the aggregate, which is the way that the state evolves according to the events and you define command handlers, which are the decisions on how to handle commands. The store lives in the infrastructure layer, but doesn't know anything about the aggregate or the commands. And we have the execution commands, which orchestrate everything.

This architecture gave me what I was looking for: easier tests, fewer mocks, clearer responsibilities, deterministic business logic and infrastructure independence! It's a very solid way to implement event sourcing in your applications. In the README.md file you can find more details about the implementation and the design philosophy. It also has a section on how to implement in a real application, with suggested Postgres event store implementation, express wiring and testing strategies for your own code.

There are a lot of topics that I purposely didn't cover in this article, because I know some of you have the attention span of a TikTok feed πŸ˜„. But we can cover them in a future article, like why aggregate creation should be treated differently from mutations, how to do projections (reads), how idempotency works, how to evolve event schema and handling snapshots. Let me know if you're interested in a second part!

I hope you learned something and feel free to ask questions or open Github issues criticizing the library!

Have a great day!

Top comments (0)