- Book: Hexagonal Architecture in Go
- Also by me: Thinking in Go (2-book series) — Complete Guide to Go Programming + Hexagonal Architecture in Go
- My project: Hermes IDE | GitHub — an IDE for developers who ship with Claude Code and other AI coding tools
- Me: xgabriel.com | GitHub
You want domain events in a Go service. You search for "Go event bus" and the first page is libraries: a pub/sub package here, a CQRS framework there, something that wants Kafka, something that wires reflection into a Dispatch(interface{}) call. You install one, register handlers in an init(), and three months later nobody can trace where an OrderPaid event goes because the wiring lives in a runtime registry instead of in code you can read.
None of that is needed. A domain event is a struct. A dispatcher is a map. Reliable delivery across a transaction boundary is an outbox table. You can build all three in plain Go and standard SQL, and the result is easier to read than any framework you would have pulled in.
This post builds the three pieces in order: the event as a value, the in-process dispatcher, and the transactional outbox that makes delivery survive a crash.
A domain event is a plain struct
An event is a fact that already happened. Past tense, immutable, owned by the domain. It carries the data a handler needs and nothing about how it gets delivered.
// domain/events.go
package domain
import "time"
type Event interface {
EventName() string
OccurredAt() time.Time
}
type OrderPaid struct {
OrderID string
CustomerID string
AmountCents int64
At time.Time
}
func (e OrderPaid) EventName() string {
return "order.paid"
}
func (e OrderPaid) OccurredAt() time.Time {
return e.At
}
No base class, no annotations, no registry. The Event interface exists so a dispatcher can hold a slice of mixed types and ask each one for its name and timestamp. Every event in the domain implements those two methods and adds whatever fields the fact carries.
The aggregate records events as it changes state. It does not dispatch them. Recording and delivering are different jobs, and keeping them apart is what lets you choose the delivery mechanism later.
// domain/order.go
package domain
type Order struct {
ID string
CustomerID string
Status string
events []Event
}
func (o *Order) MarkPaid(amount int64) {
o.Status = "paid"
o.events = append(o.events, OrderPaid{
OrderID: o.ID,
CustomerID: o.CustomerID,
AmountCents: amount,
At: time.Now(),
})
}
func (o *Order) PullEvents() []Event {
out := o.events
o.events = nil
return out
}
PullEvents drains the buffer. The application service calls it after a command finishes and hands the events to the dispatcher. The domain stays pure: it knows what happened, not who listens.
An in-process dispatcher is a map
For handlers that run in the same process (update a read model, send a receipt, bump a counter), you do not need a broker. You need a map from event name to a list of handlers.
// app/dispatcher.go
package app
import (
"context"
"yourapp/domain"
)
type Handler func(ctx context.Context, e domain.Event) error
type Dispatcher struct {
handlers map[string][]Handler
}
func NewDispatcher() *Dispatcher {
return &Dispatcher{
handlers: make(map[string][]Handler),
}
}
func (d *Dispatcher) Subscribe(
name string, h Handler,
) {
d.handlers[name] = append(d.handlers[name], h)
}
func (d *Dispatcher) Dispatch(
ctx context.Context, events []domain.Event,
) error {
for _, e := range events {
for _, h := range d.handlers[e.EventName()] {
if err := h(ctx, e); err != nil {
return err
}
}
}
return nil
}
Wiring is explicit. You see every subscription in one place at startup, not scattered across init() functions in packages that import each other for the side effect.
// cmd/server/main.go
disp := app.NewDispatcher()
disp.Subscribe("order.paid",
func(ctx context.Context, e domain.Event) error {
paid := e.(domain.OrderPaid)
return readModel.MarkPaid(ctx, paid.OrderID)
})
disp.Subscribe("order.paid",
func(ctx context.Context, e domain.Event) error {
paid := e.(domain.OrderPaid)
return mailer.SendReceipt(ctx, paid.CustomerID)
})
The type assertion e.(domain.OrderPaid) is safe here because the handler only ever runs for order.paid, and that name belongs to exactly one struct. If you want a compile-time guarantee instead, generics give you a typed wrapper, but for most services the assertion is fine and reads clearly.
This dispatcher is synchronous. Handlers run in the order they subscribed, on the goroutine that called Dispatch. That is a feature: if you want async, you make a specific handler push to a channel or a worker pool. You do not get surprised by background goroutines you did not ask for.
The problem the in-process dispatcher cannot solve
Synchronous dispatch has a hole, and it is the one that bites in production.
Your command handler does two things: it commits the order to the database, then it dispatches OrderPaid so the email goes out. Two separate operations. The process can die between them.
// WRONG: dispatch outside the transaction
func (s *OrderService) Pay(
ctx context.Context, id string, amount int64,
) error {
order, _ := s.repo.Load(ctx, id)
order.MarkPaid(amount)
if err := s.repo.Save(ctx, order); err != nil {
return err // committed nothing, fine
}
// CRASH HERE: order is paid, no email ever sent
return s.disp.Dispatch(ctx, order.PullEvents())
}
If the process crashes after Save and before Dispatch, the order is paid and the receipt never sends. Flip the order and you get the opposite bug: dispatch first, then crash before save, and you have emailed a receipt for a payment that rolled back.
You cannot fix this with ordering. The database commit and the side effect are two systems, and there is no shared transaction across them. The fix is to make recording the event part of the same database transaction as the state change, then deliver it afterward from a durable record.
That durable record is the outbox.
The transactional outbox lives at the adapter
The outbox pattern writes the event into an outbox table inside the same transaction that writes the state change. Commit is atomic: either both the order and the event row land, or neither does. A separate relay reads the table and delivers, retrying until it succeeds.
The table:
CREATE TABLE outbox (
id bigserial PRIMARY KEY,
event_name text NOT NULL,
payload jsonb NOT NULL,
occurred_at timestamptz NOT NULL,
processed_at timestamptz
);
CREATE INDEX outbox_unprocessed
ON outbox (id) WHERE processed_at IS NULL;
The repository adapter takes a transaction and writes both the aggregate and its events in it. This is the key move: the outbox write is the adapter's job, not the domain's. The domain produced plain structs; the adapter serializes them and persists them next to the state they describe.
// adapter/postgres/order_repo.go
package postgres
import (
"context"
"encoding/json"
"github.com/jmoiron/sqlx"
"yourapp/domain"
)
type OrderRepo struct{ db *sqlx.DB }
func (r *OrderRepo) Save(
ctx context.Context, o *domain.Order,
) error {
tx, err := r.db.BeginTxx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
_, err = tx.ExecContext(ctx, `
UPDATE orders SET status = $1 WHERE id = $2
`, o.Status, o.ID)
if err != nil {
return err
}
for _, e := range o.PullEvents() {
payload, err := json.Marshal(e)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, `
INSERT INTO outbox
(event_name, payload, occurred_at)
VALUES ($1, $2, $3)
`, e.EventName(), payload, e.OccurredAt())
if err != nil {
return err
}
}
return tx.Commit()
}
Now the state change and the event are one atomic write. There is no window where the order is paid but the event is lost. If the transaction rolls back, the outbox row goes with it.
The relay drains the outbox
A small goroutine polls the table, dispatches each row through the same in-process dispatcher, and marks it processed. FOR UPDATE SKIP LOCKED lets you run more than one relay without two of them grabbing the same row.
// adapter/postgres/relay.go
package postgres
import (
"context"
"encoding/json"
"fmt"
"github.com/jmoiron/sqlx"
"github.com/lib/pq" // for pq.Array
"yourapp/app"
"yourapp/domain"
)
func (r *Relay) Tick(ctx context.Context) error {
tx, err := r.db.BeginTxx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
rows, err := tx.QueryxContext(ctx, `
SELECT id, event_name, payload
FROM outbox
WHERE processed_at IS NULL
ORDER BY id
LIMIT 100
FOR UPDATE SKIP LOCKED
`)
if err != nil {
return err
}
var ids []int64
for rows.Next() {
var rec struct {
ID int64
EventName string
Payload []byte
}
if err := rows.StructScan(&rec); err != nil {
rows.Close()
return err
}
if err := r.deliver(ctx, rec.EventName,
rec.Payload); err != nil {
rows.Close()
return err
}
ids = append(ids, rec.ID)
}
rows.Close()
if len(ids) == 0 {
return tx.Commit()
}
_, err = tx.ExecContext(ctx, `
UPDATE outbox SET processed_at = now()
WHERE id = ANY($1)
`, pq.Array(ids))
if err != nil {
return err
}
return tx.Commit()
}
The relay delivers at least once. If deliver succeeds but the process dies before the UPDATE commits, the row stays unprocessed and gets delivered again on the next tick. That is the contract: your handlers must be idempotent. Marking a read-model row as paid twice is a no-op. Sending an email twice is not, so a handler that emails checks a sent-log first. At-least-once delivery is the realistic guarantee, and designing handlers around it is cheaper than chasing exactly-once.
deliver is the one genuinely non-obvious step: it maps event_name back to a concrete struct, then hands it to the dispatcher you already wrote. A small registry of name-to-decoder keeps that mapping explicit, in the same spirit as the dispatcher's wiring.
type decoder func([]byte) (domain.Event, error)
func decode[T domain.Event](b []byte) (domain.Event, error) {
var e T
if err := json.Unmarshal(b, &e); err != nil {
return nil, err
}
return e, nil
}
type Relay struct {
db *sqlx.DB
disp *app.Dispatcher
codec map[string]decoder
}
func NewRelay(db *sqlx.DB, disp *app.Dispatcher) *Relay {
return &Relay{
db: db,
disp: disp,
codec: map[string]decoder{
"order.paid": decode[domain.OrderPaid],
},
}
}
func (r *Relay) deliver(
ctx context.Context, name string, payload []byte,
) error {
dec, ok := r.codec[name]
if !ok {
return fmt.Errorf("unknown event %q", name)
}
e, err := dec(payload)
if err != nil {
return err
}
return r.disp.Dispatch(ctx, []domain.Event{e})
}
You register each event name once, next to its struct. An unknown name fails loudly instead of being silently dropped. Drive Tick from a ticker:
go func() {
t := time.NewTicker(time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
if err := relay.Tick(ctx); err != nil {
log.Printf("relay tick: %v", err)
}
}
}
}()
What you ended up with
Events are plain structs the domain owns. The dispatcher is a map you can read top to bottom. The outbox is one table and one polling loop. Delivery survives a crash because the event commits in the same transaction as the state it describes, and the relay retries until every row is processed.
No event bus library. No broker on the critical path. When you do outgrow in-process handlers (a real consumer in another service, a fan-out you cannot do in a goroutine), the relay's deliver is the single place that changes: it publishes to Kafka or NATS instead of calling the local dispatcher. Everything upstream of it, the domain and the outbox write, stays exactly as it is. You built for the broker without buying it early.
If this was useful
The split here — domain that records events, adapter that persists them, relay that delivers — is the same port-and-adapter shape that runs through Hexagonal Architecture in Go. The book works the outbox, the dispatcher, and the handler boundary at length: where idempotency lives, how to evolve event payloads without breaking consumers, and when the in-process path should give way to a real broker.

Top comments (0)