DEV Community

Cover image for Domain Events in Go Without a Framework
Gabriel Anhaia
Gabriel Anhaia

Posted on

Domain Events in Go Without a Framework


You want domain events in a Go service. You search for "Go event bus" and the first page is libraries: a pub/sub package here, a CQRS framework there, something that wants Kafka, something that wires reflection into a Dispatch(interface{}) call. You install one, register handlers in an init(), and three months later nobody can trace where an OrderPaid event goes because the wiring lives in a runtime registry instead of in code you can read.

None of that is needed. A domain event is a struct. A dispatcher is a map. Reliable delivery across a transaction boundary is an outbox table. You can build all three in plain Go and standard SQL, and the result is easier to read than any framework you would have pulled in.

This post builds the three pieces in order: the event as a value, the in-process dispatcher, and the transactional outbox that makes delivery survive a crash.

A domain event is a plain struct

An event is a fact that already happened. Past tense, immutable, owned by the domain. It carries the data a handler needs and nothing about how it gets delivered.

// domain/events.go
package domain

import "time"

type Event interface {
    EventName() string
    OccurredAt() time.Time
}

type OrderPaid struct {
    OrderID    string
    CustomerID string
    AmountCents int64
    At         time.Time
}

func (e OrderPaid) EventName() string {
    return "order.paid"
}

func (e OrderPaid) OccurredAt() time.Time {
    return e.At
}
Enter fullscreen mode Exit fullscreen mode

No base class, no annotations, no registry. The Event interface exists so a dispatcher can hold a slice of mixed types and ask each one for its name and timestamp. Every event in the domain implements those two methods and adds whatever fields the fact carries.

The aggregate records events as it changes state. It does not dispatch them. Recording and delivering are different jobs, and keeping them apart is what lets you choose the delivery mechanism later.

// domain/order.go
package domain

type Order struct {
    ID         string
    CustomerID string
    Status     string
    events     []Event
}

func (o *Order) MarkPaid(amount int64) {
    o.Status = "paid"
    o.events = append(o.events, OrderPaid{
        OrderID:     o.ID,
        CustomerID:  o.CustomerID,
        AmountCents: amount,
        At:          time.Now(),
    })
}

func (o *Order) PullEvents() []Event {
    out := o.events
    o.events = nil
    return out
}
Enter fullscreen mode Exit fullscreen mode

PullEvents drains the buffer. The application service calls it after a command finishes and hands the events to the dispatcher. The domain stays pure: it knows what happened, not who listens.

An in-process dispatcher is a map

For handlers that run in the same process (update a read model, send a receipt, bump a counter), you do not need a broker. You need a map from event name to a list of handlers.

// app/dispatcher.go
package app

import (
    "context"

    "yourapp/domain"
)

type Handler func(ctx context.Context, e domain.Event) error

type Dispatcher struct {
    handlers map[string][]Handler
}

func NewDispatcher() *Dispatcher {
    return &Dispatcher{
        handlers: make(map[string][]Handler),
    }
}

func (d *Dispatcher) Subscribe(
    name string, h Handler,
) {
    d.handlers[name] = append(d.handlers[name], h)
}

func (d *Dispatcher) Dispatch(
    ctx context.Context, events []domain.Event,
) error {
    for _, e := range events {
        for _, h := range d.handlers[e.EventName()] {
            if err := h(ctx, e); err != nil {
                return err
            }
        }
    }
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Wiring is explicit. You see every subscription in one place at startup, not scattered across init() functions in packages that import each other for the side effect.

// cmd/server/main.go
disp := app.NewDispatcher()

disp.Subscribe("order.paid",
    func(ctx context.Context, e domain.Event) error {
        paid := e.(domain.OrderPaid)
        return readModel.MarkPaid(ctx, paid.OrderID)
    })

disp.Subscribe("order.paid",
    func(ctx context.Context, e domain.Event) error {
        paid := e.(domain.OrderPaid)
        return mailer.SendReceipt(ctx, paid.CustomerID)
    })
Enter fullscreen mode Exit fullscreen mode

The type assertion e.(domain.OrderPaid) is safe here because the handler only ever runs for order.paid, and that name belongs to exactly one struct. If you want a compile-time guarantee instead, generics give you a typed wrapper, but for most services the assertion is fine and reads clearly.

This dispatcher is synchronous. Handlers run in the order they subscribed, on the goroutine that called Dispatch. That is a feature: if you want async, you make a specific handler push to a channel or a worker pool. You do not get surprised by background goroutines you did not ask for.

The problem the in-process dispatcher cannot solve

Synchronous dispatch has a hole, and it is the one that bites in production.

Your command handler does two things: it commits the order to the database, then it dispatches OrderPaid so the email goes out. Two separate operations. The process can die between them.

// WRONG: dispatch outside the transaction
func (s *OrderService) Pay(
    ctx context.Context, id string, amount int64,
) error {
    order, _ := s.repo.Load(ctx, id)
    order.MarkPaid(amount)

    if err := s.repo.Save(ctx, order); err != nil {
        return err // committed nothing, fine
    }
    // CRASH HERE: order is paid, no email ever sent
    return s.disp.Dispatch(ctx, order.PullEvents())
}
Enter fullscreen mode Exit fullscreen mode

If the process crashes after Save and before Dispatch, the order is paid and the receipt never sends. Flip the order and you get the opposite bug: dispatch first, then crash before save, and you have emailed a receipt for a payment that rolled back.

You cannot fix this with ordering. The database commit and the side effect are two systems, and there is no shared transaction across them. The fix is to make recording the event part of the same database transaction as the state change, then deliver it afterward from a durable record.

That durable record is the outbox.

The transactional outbox lives at the adapter

The outbox pattern writes the event into an outbox table inside the same transaction that writes the state change. Commit is atomic: either both the order and the event row land, or neither does. A separate relay reads the table and delivers, retrying until it succeeds.

The table:

CREATE TABLE outbox (
    id           bigserial PRIMARY KEY,
    event_name   text        NOT NULL,
    payload      jsonb       NOT NULL,
    occurred_at  timestamptz NOT NULL,
    processed_at timestamptz
);

CREATE INDEX outbox_unprocessed
    ON outbox (id) WHERE processed_at IS NULL;
Enter fullscreen mode Exit fullscreen mode

The repository adapter takes a transaction and writes both the aggregate and its events in it. This is the key move: the outbox write is the adapter's job, not the domain's. The domain produced plain structs; the adapter serializes them and persists them next to the state they describe.

// adapter/postgres/order_repo.go
package postgres

import (
    "context"
    "encoding/json"

    "github.com/jmoiron/sqlx"
    "yourapp/domain"
)

type OrderRepo struct{ db *sqlx.DB }

func (r *OrderRepo) Save(
    ctx context.Context, o *domain.Order,
) error {
    tx, err := r.db.BeginTxx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    _, err = tx.ExecContext(ctx, `
        UPDATE orders SET status = $1 WHERE id = $2
    `, o.Status, o.ID)
    if err != nil {
        return err
    }

    for _, e := range o.PullEvents() {
        payload, err := json.Marshal(e)
        if err != nil {
            return err
        }
        _, err = tx.ExecContext(ctx, `
            INSERT INTO outbox
              (event_name, payload, occurred_at)
            VALUES ($1, $2, $3)
        `, e.EventName(), payload, e.OccurredAt())
        if err != nil {
            return err
        }
    }
    return tx.Commit()
}
Enter fullscreen mode Exit fullscreen mode

Now the state change and the event are one atomic write. There is no window where the order is paid but the event is lost. If the transaction rolls back, the outbox row goes with it.

The relay drains the outbox

A small goroutine polls the table, dispatches each row through the same in-process dispatcher, and marks it processed. FOR UPDATE SKIP LOCKED lets you run more than one relay without two of them grabbing the same row.

// adapter/postgres/relay.go
package postgres

import (
    "context"
    "encoding/json"
    "fmt"

    "github.com/jmoiron/sqlx"
    "github.com/lib/pq" // for pq.Array

    "yourapp/app"
    "yourapp/domain"
)

func (r *Relay) Tick(ctx context.Context) error {
    tx, err := r.db.BeginTxx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    rows, err := tx.QueryxContext(ctx, `
        SELECT id, event_name, payload
        FROM outbox
        WHERE processed_at IS NULL
        ORDER BY id
        LIMIT 100
        FOR UPDATE SKIP LOCKED
    `)
    if err != nil {
        return err
    }

    var ids []int64
    for rows.Next() {
        var rec struct {
            ID        int64
            EventName string
            Payload   []byte
        }
        if err := rows.StructScan(&rec); err != nil {
            rows.Close()
            return err
        }
        if err := r.deliver(ctx, rec.EventName,
            rec.Payload); err != nil {
            rows.Close()
            return err
        }
        ids = append(ids, rec.ID)
    }
    rows.Close()

    if len(ids) == 0 {
        return tx.Commit()
    }
    _, err = tx.ExecContext(ctx, `
        UPDATE outbox SET processed_at = now()
        WHERE id = ANY($1)
    `, pq.Array(ids))
    if err != nil {
        return err
    }
    return tx.Commit()
}
Enter fullscreen mode Exit fullscreen mode

The relay delivers at least once. If deliver succeeds but the process dies before the UPDATE commits, the row stays unprocessed and gets delivered again on the next tick. That is the contract: your handlers must be idempotent. Marking a read-model row as paid twice is a no-op. Sending an email twice is not, so a handler that emails checks a sent-log first. At-least-once delivery is the realistic guarantee, and designing handlers around it is cheaper than chasing exactly-once.

deliver is the one genuinely non-obvious step: it maps event_name back to a concrete struct, then hands it to the dispatcher you already wrote. A small registry of name-to-decoder keeps that mapping explicit, in the same spirit as the dispatcher's wiring.

type decoder func([]byte) (domain.Event, error)

func decode[T domain.Event](b []byte) (domain.Event, error) {
    var e T
    if err := json.Unmarshal(b, &e); err != nil {
        return nil, err
    }
    return e, nil
}

type Relay struct {
    db    *sqlx.DB
    disp  *app.Dispatcher
    codec map[string]decoder
}

func NewRelay(db *sqlx.DB, disp *app.Dispatcher) *Relay {
    return &Relay{
        db:   db,
        disp: disp,
        codec: map[string]decoder{
            "order.paid": decode[domain.OrderPaid],
        },
    }
}

func (r *Relay) deliver(
    ctx context.Context, name string, payload []byte,
) error {
    dec, ok := r.codec[name]
    if !ok {
        return fmt.Errorf("unknown event %q", name)
    }
    e, err := dec(payload)
    if err != nil {
        return err
    }
    return r.disp.Dispatch(ctx, []domain.Event{e})
}
Enter fullscreen mode Exit fullscreen mode

You register each event name once, next to its struct. An unknown name fails loudly instead of being silently dropped. Drive Tick from a ticker:

go func() {
    t := time.NewTicker(time.Second)
    defer t.Stop()
    for {
        select {
        case <-ctx.Done():
            return
        case <-t.C:
            if err := relay.Tick(ctx); err != nil {
                log.Printf("relay tick: %v", err)
            }
        }
    }
}()
Enter fullscreen mode Exit fullscreen mode

What you ended up with

Events are plain structs the domain owns. The dispatcher is a map you can read top to bottom. The outbox is one table and one polling loop. Delivery survives a crash because the event commits in the same transaction as the state it describes, and the relay retries until every row is processed.

No event bus library. No broker on the critical path. When you do outgrow in-process handlers (a real consumer in another service, a fan-out you cannot do in a goroutine), the relay's deliver is the single place that changes: it publishes to Kafka or NATS instead of calling the local dispatcher. Everything upstream of it, the domain and the outbox write, stays exactly as it is. You built for the broker without buying it early.


If this was useful

The split here — domain that records events, adapter that persists them, relay that delivers — is the same port-and-adapter shape that runs through Hexagonal Architecture in Go. The book works the outbox, the dispatcher, and the handler boundary at length: where idempotency lives, how to evolve event payloads without breaking consumers, and when the in-process path should give way to a real broker.

Thinking in Go — the 2-book series on Go programming and hexagonal architecture

Top comments (0)