Introduction
If you have ever built an event driven service you know this situation: a request comes in, the database stores the business entity but the domain event times out when publishing to the message broker. Now your database is updated but your consumers never hear about this change.
The transactional outbox pattern solves this problem. It writes the entity and the associated event atomically within the same transaction. Later, a background worker retrieves the events and publishes them.
In this post you will see how to easily implement the pattern in Go using the outbox library.
The Outbox library
outbox is a lightweight open source Go library that implements the transactional outbox pattern. It has the following key features:
- Lightweight: Adds only one external dependency: google/uuid
- Database Agnostic: Works with PostgreSQL, MySQL, Oracle and other relational databases.
- Message Broker Agnostic: Works with any message broker or external system.
- Easy integration: Designed for easy integration into your own projects.
- Observability: Exposes channels for processing errors and discarded messages that you can connect to your metrics and alerting systems.
- Fast Publishing: Optional immediate async message publishing after transaction commit for reduced latency, with guaranteed delivery fallback.
- Configurable Retry & Backoff Policies: Fixed, exponential or custom backoff strategies when delivery fails.
- Max Attempts Safeguard: Automatically discards poison messages that exceed a configurable maxAttempts threshold.
How it works
It provides two components:
- Writer: Stores your entity and associated outbox message atomically in the same transaction.
- Reader: Runs in the background retrieving, publishing and deleting pending messages.
High level flow:
Request arrives:
a) Writer opens a transaction, inserts/modifies the entity and its associated outbox message, then commits.
b) (Optional): Immediate delivery: an optimistic publisher attempts to send the message asynchronously right after the transaction commit.Background worker: Reader keeps polling; it publishes any remaining messages and deletes them on success.
Therefore every message created by a committed transaction is delivered — at least once, eventually.
Sample Code
The code below assume you are using Postgres and Kafka, but you can use any other relational database or broker.
Writer: Store Entity and Message
// 1. Open your DB connection.
db, _ := sql.Open("pgx",
"postgres://user:password@localhost:5432/outbox?sslmode=disable")
// 2. Create DBContext for your dialect.
dbCtx := outbox.NewDBContext(db, outbox.SQLDialectPostgres)
// 3. Create Writer
writer := outbox.NewWriter(dbCtx)
// ...
// 4. Later, inside your handler/use case:
// 4.1. Create message to be delivered
payload, _ := json.Marshal(entity)
metadata := json.RawMessage(`{"trace_id":"abc123","correlation_id":"xyz789"}`)
msg := outbox.NewMessage(payload,
outbox.WithMetadata(metadata),
outbox.WithCreatedAt(entity.CreatedAt))
// 4.2 Use the writer to insert message and entity in a single transaction
err := writer.Write(ctx, msg,
// This user defined callback executes queries within the
// same transaction that stores the outbox message
func(ctx context.Context, txQueryer outbox.TxQueryer) error {
_, err := txQueryer.ExecContext(ctx,
"INSERT INTO entity (id, created_at) VALUES ($1, $2)",
entity.ID, entity.CreatedAt,
)
return err
})
Reader: Background Delivery Worker
For brevity the kafkaPublisher implementation has been omitted. You can check the full code here.
// 1. Implement the Publisher interface for your broker.
type kafkaPublisher struct {
// Your message broker client
}
func (p *kafkaPublisher) Publish(ctx context.Context, msg *outbox.Message) error {
// Marshal & push to Kafka (or NATS, RabbitMQ, …).
return nil
}
// 2. Create Reader at service startup.
reader := outbox.NewReader(
dbCtx,
&kafkaPublisher{}, // your publisher
outbox.WithInterval(5*time.Second), // poll every 5 s
outbox.WithReadBatchSize(200), // read up to 200 messages per poll
outbox.WithDeleteBatchSize(50), // delete in batches of 50
outbox.WithMaxAttempts(300), // discard after 300 failed attempts
outbox.WithExponentialDelay( // Delay between attempts (default: Exponential; can also use Fixed or Custom)
500*time.Millisecond, // Initial delay (default: 200ms)
30*time.Minute), // Maximum delay (default: 1h)
)
reader.Start()
defer reader.Stop(context.Background())
// 3-A. Monitor standard processing errors
// (publish / update / delete / read).
go func() {
for err := range reader.Errors() {
switch e := err.(type) {
case *outbox.PublishError:
log.Printf("Failed to publish message | ID: %s | Error: %v",
e.Message.ID, e.Err)
case *outbox.UpdateError:
log.Printf("Failed to update message | ID: %s | Error: %v",
e.Message.ID, e.Err)
case *outbox.DeleteError:
log.Printf("Batch message deletion failed | Count: %d | Error: %v",
len(e.Messages), e.Err)
for _, msg := range e.Messages {
log.Printf("Failed to delete message | ID: %s", msg.ID)
}
case *outbox.ReadError:
log.Printf("Failed to read outbox messages | Error: %v", e.Err)
default:
log.Printf("Unexpected error occurred | Error: %v", e)
}
}
}()
// 3-B. Monitor discarded messages
// (only fires when a message exceeds maxAttempts).
go func() {
for msg := range reader.DiscardedMessages() {
log.Printf("outbox message %s discarded after %d attempts",
msg.ID, msg.TimesAttempted)
// Example next steps:
// - forward to a dead letter topic
// - raise an alert / metric
// - persist for manual inspection
}
}()
(Optional) Immediate Delivery
You can enable “Optimistic publisher” feature to send messages asynchronously right after the transaction is committed . If it succeeds, the message is removed from the outbox table. If not, the reader will send the message eventually.
publisher := &kafkaPublisher{} // same implementation as above for reader
writer := outbox.NewWriter(
dbCtx,
outbox.WithOptimisticPublisher(publisher),
)
Things to remember:
- Consumers must be idempotent: a message can be published twice, by optimistic publisher and by the Reader.
- Publish failures never roll back your transaction. The reader keeps retrying until the message is delivered.
- The Reader reschedules retries using the selected backoff strategy. Once a message exceeds maxAttempts value it is sent to the DiscardedMessages() channel.
Database Setup
Add a migration for the outbox table (PostgreSQL example):
CREATE TABLE IF NOT EXISTS outbox (
id UUID PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL,
metadata BYTEA,
payload BYTEA NOT NULL,
times_attempted INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_outbox_created_at ON outbox (created_at);
CREATE INDEX IF NOT EXISTS idx_outbox_scheduled_at ON outbox (scheduled_at);
End to End Examples
Here you can find complete working examples for different databases and message brokers
Source
👉 Check out outbox on GitHub: https://github.com/oagudo/outbox.
Give it a try in your next project and if you find it helpful, feel free to give the repo a star.
Contributions are welcome: report issues, suggest features or submit PRs.
Thank you!
Top comments (0)
Some comments may only be visible to logged-in visitors. Sign in to view all comments.