DEV Community

Cover image for Using a Transactional Pub/Sub Outbox to Guarantee Consistency
Marcus Kohlberg for Encore

Posted on

Using a Transactional Pub/Sub Outbox to Guarantee Consistency

🤔One of the hardest parts of building an event-driven application is ensuring consistency between services.

A common pattern is for each service to have its own database and using Pub/Sub to notify other systems of business events.

😬Inevitably this leads to inconsistencies since the Pub/Sub publishing is not transactional with the database writes.

While there are several approaches to solving this, it's important that the solution doesn't add too much complexity
to what is often an already complex architecture.

💡An excellent solution, which keeps things simple, is the transactional outbox pattern.

👩‍💻 Implementing a Transactional Outbox

Encore, the backend development platform for automating infrastructure in your cloud, provides built-in support for the transactional outbox pattern in the x.encore.dev/infra/pubsub/outbox package.

The transactional outbox works by binding a Pub/Sub topic to a database transaction, translating all calls to topic.Publish
into inserting a database row in an outbox table. If/when the transaction later commits, the messages are picked up by
a Relay that polls the outbox table and publishes the messages to the actual Pub/Sub topic.

✉️ Publishing messages to the outbox

To publish messages to the outbox, a topic must first be bound to the outbox. This is done using Pub/Sub topic references which allows you to retain complete type safety and the same interface as regular Pub/Sub topics, allowing existing code to continue to work without changes.

📒Note: In regular (non-outbox) usage the message id returned by topic.Publish is the same as the message id the subscriber receives when processing the message. With the outbox, this message id is not available until the transaction commits, so topic.Publish returns an id referencing the outbox row instead.

The topic binding supports pluggable storage backends, enabling use of the outbox pattern with any transactional storage backend. Implementation are provided out-of-the-box for use with Encore's encore.dev/storage/sqldb package, as well as the standard library database/sql and github.com/jackc/pgx/v5 drivers, but it's easy to write your own for other use cases. See the Go package reference for more information.

🖼 Example usage

For example, to use a transactional outbox to notify subscribers when a user is created:

// Create a SignupsTopic somehow.
var SignupsTopic = pubsub.NewTopic[*SignupEvent](/* ... */)

// Create a topic ref with publisher permissions.
ref := pubsub.TopicRef[pubsub.Publisher[*SignupEvent]](SignupsTopic)

// Bind it to the transactional outbox
import "x.encore.dev/infra/pubsub/outbox"
var tx *sqldb.Tx // somehow get a transaction
ref = outbox.Bind(ref, outbox.TxPersister(tx))

// Calls to ref.Publish() will now insert a row in the outbox table.
Enter fullscreen mode Exit fullscreen mode
// The database used must contain the below database table:
// See https://pkg.go.dev/x.encore.dev/infra/pubsub/outbox#SQLDBStore
CREATE TABLE outbox (
    id BIGSERIAL PRIMARY KEY,
    topic TEXT NOT NULL,
    data JSONB NOT NULL,
    inserted_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX outbox_topic_idx ON outbox (topic, id);
Enter fullscreen mode Exit fullscreen mode

Once the transaction commits any published messages via ref above will be stored in the outbox table.

📩 Consuming messages from the outbox

Once committed, the messages are ready to be picked up and published to the actual Pub/Sub topic.

That is done via the Relay.
The relay continuously polls the outbox table and publishes any new messages to the actual Pub/Sub topic.

The relay supports pluggable storage backends, enabling use of the outbox pattern with any transactional storage backend. An implementation is provided out-of-the-box that uses Encore's built-in SQL database support, but it's easy to write your own for other databases.

The topics to poll must be registered with the relay, typically during service initialization. For example:

package user

import (
    "context"

    "encore.dev/pubsub"
    "encore.dev/storage/sqldb"
    "x.encore.dev/infra/pubsub/outbox"
)

type Service struct {
    signupsRef pubsub.Publisher[*SignupEvent]
}

// db is the database the outbox table is stored in
var db = sqldb.NewDatabase(...)

// Create the SignupsTopic somehow.
var SignupsTopic = pubsub.NewTopic[*SignupEvent](/* ... */)

func initService() (*Service, error) {
    // Initialize the relay to poll from our database.
    relay := outbox.NewRelay(outbox.SQLDBStore(db))

    // Register the SignupsTopic to be polled.
    signupsRef := pubsub.TopicRef[pubsub.Publisher[*SignupEvent]](SignupsTopic)
    outbox.RegisterTopic(relay, signupsRef)

    // Start polling.
    go relay.PollForMessage(context.Background(), -1)

    return &Service{signupsRef: signupsRef}, nil
}
Enter fullscreen mode Exit fullscreen mode

🙋‍♂️ Try it yourself

If you're curious about implementing the transactional outbox pattern, it's easy to do using Encore.

👉This guide shows you how to build a fully Type-Safe event-driven backend in Go, using Encore, implementing an Uptime Monitoring system as an example.

It can then easily be extended with the transactional outbox pattern using the examples above.👆

👯‍♀️ Join the friendliest backend development community

If you have questions or want to hangout with other backend developers, join Encore's developer community on Slack.👈

Top comments (0)