DEV Community

Cover image for Event Sourcing in Go: An Append-Only Store and a 200-Line Replay
Gabriel Anhaia
Gabriel Anhaia

Posted on

Event Sourcing in Go: An Append-Only Store and a 200-Line Replay


You've seen the support ticket. A customer claims their balance was $312 yesterday and $187 this morning, and nobody can explain the gap. Someone opens the database. The accounts.balance column shows $187. There is one row. There is no history. Whatever happened between yesterday and this morning is gone, because the system overwrote it.

That gap is the problem event sourcing solves. The database stores what is. Event sourcing stores what happened. The current balance is no longer a column — it is what you get when you fold all the deposits and withdrawals from the beginning of the account.

The pattern sounds heavy. In Go, the core fits in about 200 lines. Here it is, end to end.

The Append-Only Event Store

The store is an interface with two methods. Append events for an aggregate. Load events for an aggregate. That is the whole port.

package eventstore

import (
    "context"
    "encoding/json"
    "time"
)

type Event struct {
    AggregateID string
    Version     int
    Type        string
    Payload     json.RawMessage
    OccurredAt  time.Time
}

type Store interface {
    Append(
        ctx context.Context,
        aggregateID string,
        expectedVersion int,
        events []Event,
    ) error

    Load(
        ctx context.Context,
        aggregateID string,
    ) ([]Event, error)
}
Enter fullscreen mode Exit fullscreen mode

expectedVersion is the optimistic-concurrency lever. The caller says "I read up to version 7, here are events 8 and 9." If another writer slipped a version 8 in between, the append fails and the caller retries.

The Postgres adapter is one table and two queries.

CREATE TABLE events (
    aggregate_id TEXT NOT NULL,
    version      INT  NOT NULL,
    type         TEXT NOT NULL,
    payload      JSONB NOT NULL,
    occurred_at  TIMESTAMPTZ NOT NULL,
    PRIMARY KEY (aggregate_id, version)
);
Enter fullscreen mode Exit fullscreen mode

The primary key is what makes the store correct. Two writers racing on the same aggregate_id cannot both insert version 8 — Postgres rejects the second one with a unique-violation error. That is your concurrency control, free of charge, no advisory locks.

package postgres

import (
    "context"
    "database/sql"
    "errors"

    "github.com/lib/pq"
    "example.com/banking/eventstore"
)

var ErrConcurrency = errors.New("concurrency conflict")

type EventStore struct {
    db *sql.DB
}

func (s *EventStore) Append(
    ctx context.Context,
    aggregateID string,
    expectedVersion int,
    events []eventstore.Event,
) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    for i, e := range events {
        _, err := tx.ExecContext(
            ctx,
            `INSERT INTO events
             (aggregate_id, version, type,
              payload, occurred_at)
             VALUES ($1, $2, $3, $4, $5)`,
            aggregateID,
            expectedVersion+i+1,
            e.Type,
            e.Payload,
            e.OccurredAt,
        )
        if err != nil {
            var pqErr *pq.Error
            if errors.As(err, &pqErr) &&
                pqErr.Code == "23505" {
                return ErrConcurrency
            }
            return err
        }
    }
    return tx.Commit()
}
Enter fullscreen mode Exit fullscreen mode

Load is one ordered scan.

func (s *EventStore) Load(
    ctx context.Context,
    aggregateID string,
) ([]eventstore.Event, error) {
    rows, err := s.db.QueryContext(
        ctx,
        `SELECT version, type, payload, occurred_at
         FROM events
         WHERE aggregate_id = $1
         ORDER BY version ASC`,
        aggregateID,
    )
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var out []eventstore.Event
    for rows.Next() {
        e := eventstore.Event{AggregateID: aggregateID}
        if err := rows.Scan(
            &e.Version, &e.Type,
            &e.Payload, &e.OccurredAt,
        ); err != nil {
            return nil, err
        }
        out = append(out, e)
    }
    return out, rows.Err()
}
Enter fullscreen mode Exit fullscreen mode

That is the entire infrastructure side. About 70 lines including the SQL. Everything else is the aggregate.

The Aggregate That Rebuilds Itself

An event-sourced aggregate has two halves. Commands take input and produce events. Apply takes an event and mutates state. State is never written directly — it always comes from applying events.

package account

import (
    "encoding/json"
    "errors"
    "time"

    "example.com/banking/eventstore"
)

type Account struct {
    id      string
    balance int64
    closed  bool
    version int
}

type Deposited struct {
    Amount int64     `json:"amount"`
    At     time.Time `json:"at"`
}

type Withdrawn struct {
    Amount int64     `json:"amount"`
    At     time.Time `json:"at"`
}

type Closed struct {
    At time.Time `json:"at"`
}

var (
    ErrClosed       = errors.New("account closed")
    ErrInsufficient = errors.New("insufficient funds")
    ErrNonPositive  = errors.New("amount must be > 0")
)
Enter fullscreen mode Exit fullscreen mode

Commands return events. They never mutate state directly — that happens in Apply. Splitting the two halves is what makes replay safe.

func (a *Account) Deposit(amount int64) (any, error) {
    if a.closed {
        return nil, ErrClosed
    }
    if amount <= 0 {
        return nil, ErrNonPositive
    }
    return Deposited{
        Amount: amount,
        At:     time.Now(),
    }, nil
}

func (a *Account) Withdraw(amount int64) (any, error) {
    if a.closed {
        return nil, ErrClosed
    }
    if amount <= 0 {
        return nil, ErrNonPositive
    }
    if a.balance < amount {
        return nil, ErrInsufficient
    }
    return Withdrawn{
        Amount: amount,
        At:     time.Now(),
    }, nil
}

func (a *Account) Apply(e any) {
    switch ev := e.(type) {
    case Deposited:
        a.balance += ev.Amount
    case Withdrawn:
        a.balance -= ev.Amount
    case Closed:
        a.closed = true
    }
    a.version++
}
Enter fullscreen mode Exit fullscreen mode

Apply is the fold. Hand it the events from the store in order and the in-memory Account rebuilds itself, no SQL involved.

func Load(events []eventstore.Event) (*Account, error) {
    a := &Account{}
    for _, raw := range events {
        ev, err := decode(raw)
        if err != nil {
            return nil, err
        }
        a.Apply(ev)
    }
    return a, nil
}

func decode(e eventstore.Event) (any, error) {
    switch e.Type {
    case "Deposited":
        var d Deposited
        return d, json.Unmarshal(e.Payload, &d)
    case "Withdrawn":
        var w Withdrawn
        return w, json.Unmarshal(e.Payload, &w)
    case "Closed":
        var c Closed
        return c, json.Unmarshal(e.Payload, &c)
    }
    return nil, errors.New("unknown event type")
}
Enter fullscreen mode Exit fullscreen mode

The application service ties the store and the aggregate together. Load events, run a command, append the result.

package app

import (
    "context"
    "encoding/json"
    "time"

    "example.com/banking/account"
    "example.com/banking/eventstore"
)

type AccountService struct {
    store eventstore.Store
}

func (s *AccountService) Deposit(
    ctx context.Context,
    id string,
    amount int64,
) error {
    events, err := s.store.Load(ctx, id)
    if err != nil {
        return err
    }
    acc, err := account.Load(events)
    if err != nil {
        return err
    }

    ev, err := acc.Deposit(amount)
    if err != nil {
        return err
    }

    payload, err := json.Marshal(ev)
    if err != nil {
        return err
    }
    return s.store.Append(ctx, id, len(events),
        []eventstore.Event{{
            AggregateID: id,
            Type:        "Deposited",
            Payload:     payload,
            OccurredAt:  time.Now(),
        }},
    )
}
Enter fullscreen mode Exit fullscreen mode

Count the lines so far. Store interface plus Postgres adapter: about 110. Aggregate, commands, and decode: about 95. Application service: about 40. That is roughly 245 lines all-in for a working event-sourced account that survives restarts, supports concurrency, and lets you reconstruct any historical balance by truncating the event stream at a point in time. Strip the SQL, the imports, and the Postgres plumbing and the replay core — the part you would write again for any aggregate — sits comfortably under 200 lines.

Snapshots: When 200 Lines Becomes Slow

Replay is fine until an aggregate has 50,000 events. Loading every deposit since 2019 to find the current balance is wasteful when 99% of the work is the same fold over the same history.

A snapshot is a point-in-time copy of aggregate state plus the version it was taken at. Add a second table.

CREATE TABLE snapshots (
    aggregate_id TEXT PRIMARY KEY,
    version      INT  NOT NULL,
    state        JSONB NOT NULL,
    taken_at     TIMESTAMPTZ NOT NULL
);
Enter fullscreen mode Exit fullscreen mode

Loading becomes: read the snapshot, read events with version > snapshot.version, apply them to the snapshot state. Save a new snapshot every N events (1000 is a sensible default). Snapshots are an optimisation, not a source of truth — if a snapshot is corrupted, throw it away and replay from version zero. The events are the only thing that has to be right.

The point of having the snapshot as a separate table is exactly this: it can be regenerated from the event log at any time.

When Event Sourcing Pays Off

The pattern is not free. You write more code, you query state through aggregates instead of with WHERE balance > 100, and reporting needs read models built off a separate projection process. Reach for it when the trade is worth it:

  • Audit and compliance. Finance, healthcare, anything with regulators. "Why did this state change at 3am?" has a real answer because the change is a row, not an overwrite.
  • Temporal queries. "What did the cart look like before checkout?" "What was the inventory at the moment that order shipped?" Traditional schemas can answer these only with carefully maintained history tables. Event sourcing answers them by truncating the stream.
  • Complex domains with many state transitions. Domains where business rules change often, where the sequence of events matters as much as the current state.

It is overkill for CRUD apps with no audit requirements, simple state, or short-lived data. Most internal tools are not event-sourcing candidates. The honest answer for a typical web app is: keep your boring tables, add a domain-events outbox, sleep at night.

What to Try Next

Take the 200 lines, drop them into a fresh module, and write one test that deposits, withdraws, restarts the process, loads from the store, and asserts the balance. Then break the snapshot table on purpose and watch the replay still produce the right answer. That second test is the moment the pattern stops feeling abstract — events as the source of truth, snapshots as a cache you can throw away whenever you want.


If this was useful

Event sourcing is one of those patterns that reads as theoretical until you wire it through hexagonal ports. The store is an outbound port. The aggregate is the domain. Snapshots are an optimisation adapter. Hexagonal Architecture in Go covers this end-to-end — the port boundaries, the application service shape, projections, and where event sourcing fits next to plain CRUD aggregates in the same service.

Pairs with The Complete Guide to Go Programming if the language fundamentals (interfaces, generics, error handling) are still settling in.

Thinking in Go — the 2-book series on Go programming and hexagonal architecture

Top comments (0)