Introduction to the Dual Write problem
When dealing with an application that uses the Event Driven Architecture (EDA), for asynchronous communication between components, we often come against a situation that we must update state in the database and after publish a messsage into a broker, or make an HTTP call. This scenario is typically called the Dual Write Problem, when we need to make an operation in two different systems at the same time and it must be atomic.
As we know, those operations aren't atomic, so if we write/update a record in the database and then we have an error publishing to the broker, we need to undo our command.
Transactional Outbox Pattern
The Transactional Outbox Pattern comes to solve this problem in a simple, consistent, resilient and elegant way. It transforms two writes, in different systems, into just one transaction in the database. Following this approach, we ensure strong consistency between the application state, since SQL or even NoSQL databases ensure atomicity in a transaction.
This pattern is implemented using an outbox table, which serves as a durable queue for events. An asynchronous worker, often called a Relay, continually polls this table and publishes the saved events to the message broker. This design ensures both consistency and resilience because the Relay can implement a robust retry strategy to handle issues like a failing broker or a network partition.
Code example
I'll give a small example with a Go code to illustrate and show how to use it properly.
package order
import (
"context"
"database/sql"
"encoding/json"
"log"
_ "github.com/lib/pq" // PG driver
)
func CreateOrder(db *sql.DB, customerID int, product string) error {
tx, err := db.BeginTx(context.Background(), nil)
if err != nil {
return err
}
defer tx.Rollback()
// Inserting the business data into its table for auditing
var orderID int
err = tx.QueryRow(
`INSERT INTO orders (customer_id, product) VALUES ($1, $2) RETURNING id`,
customerID, product,
).Scan(&orderID)
if err != nil {
return err
}
// Creating the event payload and inserting it into the outbox table
eventPayload, err := json.Marshal(map[string]interface{}{
"order_id": orderID,
"customer_id": customerID,
"product": product,
})
if err != nil {
return err
}
_, err = tx.Exec(
`INSERT INTO outbox (topic, payload) VALUES ($1, $2)`,
"order.created", eventPayload,
)
if err != nil {
return err
}
log.Printf("Successfully created order %d and outbox event in one transaction.", orderID)
// If we made both inserts we can commit our operation
return tx.Commit()
}
Now we need to create a Relay Worker that will be pooling events from the outbox table.
package worker
import (
"context"
"database/sql"
"log"
"github.com/streadway/amqp"
)
const MAX_RETRIES = 3 // Maximum retry value when trying to publish an event from the outbox table
func processOutboxEvents(db *sql.DB, ch *amqp.Channel) error {
// The transaction locks the selected rows and ensures the status updates are atomic
tx, err := db.BeginTx(context.Background(), nil)
if err != nil {
return err
}
defer tx.Rollback()
// Select pending events, locking them to prevent other workers from processing them
// In this way we can have multiple relay workers processing the same table
rows, err := tx.Query(`
SELECT id, topic, payload, retry_count FROM outbox
WHERE status = 'PENDING' AND processed_at IS NULL
ORDER BY created_at ASC
LIMIT 10
FOR UPDATE SKIP LOCKED
`)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var id, topic string
var payload []byte
var retryCount int
if err := rows.Scan(&id, &topic, &payload, &retryCount); err != nil {
return err // We should rollback the transaction
}
// Publishing the event into RabbitMQ
err = ch.Publish(
"events.exchange",
topic,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: payload,
})
if err != nil {
log.Printf("Failed to publish event %s (attempt %d): %v", id, retryCount+1, err)
// Since we had an error we must see if we reached the retry limit for this event
if retryCount+1 >= MAX_RETRIES {
// We've reached the max retries, so mark as FAILED (dead-letter it).
log.Printf("Event %s reached max retries. Moving to FAILED state.", id)
_, updateErr := tx.Exec(`
UPDATE outbox SET status = 'FAILED', retry_count = $1, last_error = $2
WHERE id = $3
`, retryCount+1, err.Error(), id)
if updateErr != nil {
return updateErr // We should rollback the transaction since it's an database error
}
} else {
// Since we didn't reach the limit we will just increment it
_, updateErr := tx.Exec(`
UPDATE outbox SET retry_count = $1, last_error = $2 WHERE id = $3`, retryCount+1, err.Error(), id)
if updateErr != nil {
return updateErr // We should rollback the transaction since it's an database error
}
}
} else {
// If we don't have an error we can update in the database the status to success
_, updateErr := tx.Exec(`
UPDATE outbox SET processed_at = NOW(), status = 'PROCESSED', last_error = NULL
WHERE id = $1
`, id)
if updateErr != nil {
return updateErr // We should rollback the transaction since it's an database error
}
log.Printf("Successfully published event %s", id)
}
}
// Commit our transaction
return tx.Commit()
}
Conclusion
Overall, we could've solved this problem by by simply handling potential errors in our application when writing to the database and then publishing the event. However, using the Outbox Pattern, a concise and elegant way of solving the dual write problem, enforces separation of concerns in the application and can be very handy. The next time you come across this scenario give it a try and feel free to leave some comments!
Top comments (0)