DEV Community

Alex B
Alex B

Posted on • Originally published at atlas9.dev on

Adding an outbox, mail, tokens, CSRF, and more

This post is a grabbag of updates from the last couple weeks, including a background task system, sending email, cross-origin request forgery protection, one-time tokens, and more.

outbox

Applications usually need a way to do work in the background. One reason for this is that it's helpful to avoid lots of work within the lifecycle of an HTTP request, especially when the work to be done depends on external services. For example, if registering a new user requires writing records to the database, sending an email, setting up a billing account, fetching/generating an avatar image, and provisioning some other resources, a failure in any one of those makes for very complex and fragile error handling – it works most of the time, until it doesn't, and then you can have a real mess on your hands.

That's why I prefer to have HTTP requests do as little work as possible, interacting with as few dependencies as possible, ideally just the database. The extra work of integrating with other dependencies is offloaded to a background processing system.

There are lots of ways to get that work submitted into the background system, lots of different job queue options.

For atlas9, I'm starting with a simple outbox pattern, where records are written to the database and to an "outbox" within the same database transaction. For example, I use this when registering a new user to create the user record, write a user.registered event, and create a send verification email task, all within the same database transaction. Having all the information recorded in the same transaction makes reasoning about failures easy – if anything fails, everything fails.

A worker runs in the background, picks up the send verification email task from the database, and sends the email. If emails were failing to send for some reason, it wouldn't affect user registration, and the task could be retried. The system supports multiple workers concurrently consuming the tasks, handling retries and failures, leases, idempotency keys, partition keys, and more.

The outbox interfaces look like this:

// (pseudocode: proper error handling omitted from this code for brevity) 

type Emitter interface {
    Emit(ctx context.Context, eventType, partitionKey string, payload []byte, opts *EmitOptions) error
}

func RegisterUser(ctx context.Context, email, password string) {

  // Start a writeable database transaction
  dbi.ReadWrite(ctx, s.DB, func(tx dbi.DBI) (core.ID, error) {

    users := s.Users(tx)
    passwords := s.Passwords(tx)
    outbox := s.Outbox(tx)
    user := &iam.User{Email: email}
    created := users.Save(ctx, user)

    payload := json.Marshal(UserRegisteredEvent{userID, email})
    outbox.Emit(ctx, "user.registered", userID.String(), payload, nil)
  })
}

type UserRegisteredEvent struct {
    UserID core.ID
    Email string
}

Enter fullscreen mode Exit fullscreen mode

That writes an event to the outbox table:

sqlite> select seq, event_type, partition_key, payload from outbox;
seq event_type partition_key payload
--- --------------- -------------------------- -----------------------------------------------------------------
1 user.registered 06EBYDDXKFEYMKQY7CE8DF2VHM {"UserID":"06EBYDDXKFEYMKQY7CE8DF2VHM","Email":"test@eabuc.com"}
2 user.registered 06EBYDNG5PHC4E8BP741DQ5TWC {"UserID":"06EBYDNG5PHC4E8BP741DQ5TWC","Email":"test2@eabuc.com"}

Enter fullscreen mode Exit fullscreen mode

Events record what happened. Events may be consumed multiple times but different consumers. This allows the "user.registered" event to be processed independently by multiple systems.

type Consumer interface {
    Claim(ctx context.Context, consumer string, leaseDuration time.Duration, limit int) ([]Claimed, error)

    Complete(ctx context.Context, seq int64, consumer string) error

    // Nack requeues a task for retry. 
  // If retryAfter is non-nil, the task won't be claimed again until after that time.
    Nack(ctx context.Context, seq int64, consumer string, retryAfter *time.Time) error

    // Fail permanently marks a task as failed, stopping all future attempts.
    Fail(ctx context.Context, seq int64, consumer string) error
}

// Claimed pairs an event with its task.
type Claimed struct {
    Event Event
    Task Task
}

// Task represents an event assigned to a consumer.
type Task struct {
    Seq int64
    Consumer string
    PartitionKey string
    Status Status
    ProcessAfter *time.Time
    LeaseUntil *time.Time
    CompletedAt *time.Time
    Attempts int
}

Enter fullscreen mode Exit fullscreen mode

Going back to the early example, there may be independent consumers of the user.registered event for "sending verification email", "setting up billing", "fetching avatar image", "provisioning resources", etc.

When an event is recognized by a consumer, a "task" is created, which tracks the c

The outbox_deliveries table maps events to consumers:

sqlite> select * from outbox_deliveries ;
seq consumer partition_key status process_after lease_until completed_at attempts
--- ------------- -------------------------- --------- ------------- ------------------- ------------------- --------
1 welcome_email 06EBYDDXKFEYMKQY7CE8DF2VHM completed 2026-03-05 18:15:28 2026-03-05 18:14:58 1
2 welcome_email 06EBYDNG5PHC4E8BP741DQ5TWC completed 2026-03-05 18:16:28 2026-03-05 18:15:58 1

Enter fullscreen mode Exit fullscreen mode

This is a decent first draft that captures some of the design elements I'm looking for:

  • write to an event log
  • multiple consumers
  • retries, leases, delayed execution (ProcessAfter)
  • partitions (isolate work by tenant, for example)
  • FIFO within a partition
  • fairness

These are some of the things I've typically needed/wanted from a message queue, that make is easy to reason about behavior, overload, failures, ordering, etc.

There are plenty of rough edges here to figure out:

  • event log and task cleanup (delete old rows)
  • rename some things (e.g. outbox_deliveries should be tasks probably)
  • rethink the consumer, worker, and task interfaces

The atlas9.dev/core/outbox/ package contains implementations for sqlite and postgres.

mail

Speaking of email, atlas9.dev/c/mail contains code for sending mail in various ways. It has an SMTP implementation, and an AWS Simple Email Service (SES) wrapper.

It's tempting to think, "AWS SES is super reliable, I don't need a background job", and you'd be right, SES is unlikely to fail (although I'm sure it does), but failures come from all sorts of dimensions: networking, config, bugs in code, etc. Having work captured in a task that can be retried helps avoid having to clean up messes when things go haywire.

This isn't always necessary, of course. For example, in the demo app, the "resend verfication email" and "request password reset" actions don't bother with a background task, they just send the email directly – if these fail, the user can just click the button again.

The demo app now has added some features: verification email, password reset, user profiles, an (untested) GitHub oauth provider.

tokens

There seem to be a lot of different uses for short-lived, single-use tokens: verify email, reset password, OIDC auth, magic link login, multifactor auth, etc.

So I added the atlas9.dev/c/core/tokens package to help create, store, and retrieve such tokens:

type Token[T any] struct {
    Key string
    ExpiresAt time.Time
    Data T
}

type Store[T any] interface {
    // Put stores data under the given key with a configured expiration.
    // If the key already exists, it is replaced (upsert).
    Put(ctx context.Context, key string, data T) (*Token[T], error)

    // Get retrieves a token by key without consuming it.
    // Returns core.ErrNotFound if the key doesn't exist or has expired.
    Get(ctx context.Context, key string) (*Token[T], error)

    // Delete removes a token by key.
    Delete(ctx context.Context, key string) error
}

Enter fullscreen mode Exit fullscreen mode

One interesting thing I learned while building this is timing attacks: comparing two strings can take a different amount of time. If two 32 character strings differ at the second character, the comparison is going to stop after to comparisons, but if the strings differ only at the last character, the comparison is going to take slightly longer because it needs to do 31 comparisons. Apparently, even with the noisy nature of networks, and with the noisy load on database and servers, with enough datapoints it's possible to derive a token value by measuring the time it takes to check (invalid) tokens (which is indirectly measure the time it takes for, say, postgres to look up a string in btree index). Pretty wild!

To protect against that, you separate the token key from the token code. When checking a token, you look up the payload using the key (which is subject to timing analysis) but you verify the token against the code using a constant-time comparison function like Go'scrypto/subtle.ConstantTimeCompare().

The details are in https://atlas9.dev/src/core/tokens/secure.go.

dropping the clock

I thought having a Clock interface would be a good way to remind myself to write testable code that depends on time (instead of using time.Now() directly), but the Go team recently solved that extensively with https://go.dev/blog/testing-time.

csrf protection

I spent some time setting up protection against cross-site request forgery using a variety of tricks (cookies, headers, hidden form tokens, etc), but then I discovered that the Go team, again, solved this much better than I could with:https://github.com/golang/go/issues/73626.

Alex Edwards does a great job explaining the situation here: https://www.alexedwards.net/blog/preventing-csrf-in-go. I learned a ton about cross-origin request security, why it matters, why the old approaches to CSRF protection are insufficient, what browsers have done about it, and why it's actually CORF :)

Top comments (0)