DEV Community

Cover image for Event Sourcing and CQRS in Go: Building Resilient Systems That Remember Everything
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

Event Sourcing and CQRS in Go: Building Resilient Systems That Remember Everything

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Let's talk about building systems that are reliable, easy to understand, and can grow without breaking. I often face a problem: a complex business process happens, and later, someone asks, "Why is the data in this state?" Traditional approaches might only store the current result, losing the story of how we got there. There's a way to keep that entire story, and it can make your systems much more resilient. It involves two main ideas: keeping a permanent record of every change, and separating the tasks of updating data from reading it.

Imagine your application's state isn't a single static picture. It's a filmstrip. Every single change—a user registration, an updated address, a completed purchase—is one frame in that film. You can always rewind and play the film from the beginning to see exactly how you arrived at the current scene. This is the core of event sourcing. Instead of overwriting a customer's address in a database table, you record an event: CustomerAddressUpdated. The current address is simply the latest event in that sequence.

This pairs powerfully with another idea: CQRS. This is a fancy acronym for a simple concept. It means you use different models for writing data (Commands) and reading data (Queries). Think of it like a kitchen in a restaurant. The chefs (write side) receive orders, work in a specific, controlled area with their own tools, and produce finished dishes. The waitstaff (read side) have a completely separate station for retrieving those dishes and presenting them to customers. They don't interfere with the cooking process. In software, this separation lets you scale and optimize the two sides independently.

When you combine these two patterns, you get a robust architecture. You have an immutable record of everything that's happened (the event log), and you can build as many specialized, optimized views of that data as you need (the query models). Let me show you how this can work in Go, focusing on keeping the code clear and performant.

First, we need a place to store our filmstrip—the immutable sequence of events. We call this the Event Store.

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "sync"
    "time"
    "github.com/google/uuid"
)

// An Event is a fact, something that happened in the past. It is immutable.
type Event struct {
    ID        string                 // Unique identifier for this event
    StreamID  string                 // Which aggregate this belongs to (e.g., "Customer-123")
    Type      string                 // The kind of event, like "CustomerAddressUpdated"
    Data      []byte                 // The details, stored as JSON
    Version   int64                  // The order of this event in its stream
    Timestamp time.Time
}

// The EventStore is the single source of truth for all events.
type EventStore struct {
    mu     sync.RWMutex                     // Protects concurrent access
    events map[string][]*Event              // Map of StreamID to its event history
}

// NewEventStore creates a new in-memory event store.
func NewEventStore() *EventStore {
    return &EventStore{
        events: make(map[string][]*Event),
    }
}

// Append is the critical write operation. It adds a new event to a stream.
func (es *EventStore) Append(ctx context.Context, streamID string, eventType string, data interface{}) (*Event, error) {
    es.mu.Lock()
    defer es.mu.Unlock()

    // Get the current event list for this stream
    streamEvents := es.events[streamID]
    expectedVersion := int64(len(streamEvents))

    // Create the new event
    dataBytes, err := json.Marshal(data)
    if err != nil {
        return nil, fmt.Errorf("failed to marshal event data: %w", err)
    }

    event := &Event{
        ID:        uuid.New().String(),
        StreamID:  streamID,
        Type:      eventType,
        Data:      dataBytes,
        Version:   expectedVersion + 1, // The next sequential version
        Timestamp: time.Now().UTC(),
    }

    // Store it. In a real system, this would be a durable write to a database.
    es.events[streamID] = append(streamEvents, event)
    return event, nil
}

// LoadEvents retrieves the entire history for a stream.
func (es *EventStore) LoadEvents(streamID string) ([]*Event, error) {
    es.mu.RLock()
    defer es.mu.RUnlock()

    events, exists := es.events[streamID]
    if !exists {
        return nil, fmt.Errorf("stream %s not found", streamID)
    }
    // Return a copy to prevent modification of the original
    result := make([]*Event, len(events))
    copy(result, events)
    return result, nil
}
Enter fullscreen mode Exit fullscreen mode

The event store is simple but powerful. Its main job is to append events and guarantee their order. Notice the Version field. It's crucial for handling situations where two actions try to update the same customer at the same time (optimistic concurrency control). If you try to append an event expecting version 5, but the stream is already at version 6, you know something has changed since you last looked, and you can reject the command or retry.

Now, how do we initiate changes? We don't modify state directly. We send a Command. A command is an intention or a request to do something. "Change customer address" is a command. It may be rejected if it's invalid. If accepted, it results in one or more events being stored.

// A Command is a request to perform an action. It can be accepted or rejected.
type Command struct {
    ID       string
    Type     string // e.g., "UpdateCustomerAddress"
    StreamID string
    Data     interface{}
}

// A CommandHandler validates business rules and, if valid, produces events.
type CommandHandler struct {
    store *EventStore
}

func NewCommandHandler(store *EventStore) *CommandHandler {
    return &CommandHandler{store: store}
}

// Handle processes a command. This is where your business logic lives.
func (ch *CommandHandler) HandleUpdateAddress(ctx context.Context, cmd *Command) error {
    // 1. Validate the command data
    data, ok := cmd.Data.(map[string]interface{})
    if !ok {
        return fmt.Errorf("invalid command data")
    }
    newAddress, ok := data["address"].(string)
    if !ok || newAddress == "" {
        return fmt.Errorf("address is required")
    }

    // 2. Load current state by replaying past events
    pastEvents, err := ch.store.LoadEvents(cmd.StreamID)
    if err != nil {
        // Might be a new customer, handle accordingly
        return fmt.Errorf("could not load customer: %w", err)
    }

    // 3. Apply business rules. Maybe we can't update an archived customer.
    // We reconstruct state from events to check rules.
    customerState := ch.reconstructState(pastEvents)
    if customerState.IsArchived {
        return fmt.Errorf("cannot update address for archived customer")
    }

    // 4. If all is good, record the event.
    _, err = ch.store.Append(ctx, cmd.StreamID, "CustomerAddressUpdated", map[string]string{
        "newAddress": newAddress,
        "oldAddress": customerState.Address,
    })
    return err
}

// reconstructState replays events to build the current state for validation.
func (ch *CommandHandler) reconstructState(events []*Event) CustomerState {
    state := CustomerState{}
    for _, e := range events {
        switch e.Type {
        case "CustomerRegistered":
            var data CustomerRegisteredData
            json.Unmarshal(e.Data, &data)
            state.Name = data.Name
            state.Address = data.Address
            state.IsActive = true
        case "CustomerAddressUpdated":
            var data map[string]string
            json.Unmarshal(e.Data, &data)
            state.Address = data["newAddress"]
        case "CustomerArchived":
            state.IsActive = false
            state.IsArchived = true
        }
    }
    return state
}

// Example supporting types
type CustomerState struct {
    Name       string
    Address    string
    IsActive   bool
    IsArchived bool
}
type CustomerRegisteredData struct {
    Name    string `json:"name"`
    Address string `json:"address"`
}
Enter fullscreen mode Exit fullscreen mode

The command handler is the brain of the write side. It contains the rules. It says, "Given this request and the history of what's happened before, what should happen next?" It loads history, makes a decision, and if the decision is "yes," it tells the event store to record a new fact.

Reconstructing state from events every time can be slow for entities with long histories. This is where Snapshots help. A snapshot is a saved version of the state at a specific point in time (e.g., at version 100). To get the current state, you load the snapshot and then only replay events that happened after it.

type Snapshot struct {
    StreamID  string
    State     CustomerState // The actual state object
    Version   int64         // Event version this snapshot represents
    TakenAt   time.Time
}

type SnapshotStore struct {
    snapshots map[string]*Snapshot
    mu        sync.RWMutex
}

// SaveSnapshot stores a snapshot for a stream.
func (ss *SnapshotStore) Save(snap *Snapshot) {
    ss.mu.Lock()
    defer ss.mu.Unlock()
    ss.snapshots[snap.StreamID] = snap
}

// LoadSnapshot retrieves the latest snapshot for a stream.
func (ss *SnapshotStore) Load(streamID string) (*Snapshot, bool) {
    ss.mu.RLock()
    defer ss.mu.RUnlock()
    snap, exists := ss.snapshots[streamID]
    return snap, exists
}
Enter fullscreen mode Exit fullscreen mode

Your command handler logic can be modified to check for a snapshot first. If one exists at version 50, you load it and then only ask the event store for events from version 51 onward to rebuild the current state. This dramatically speeds up loading for active entities.

So far, we've focused on the write side: commands and events. Now, let's look at the read side, or Queries. This is where CQRS shines. The event log is the truth, but it's not a good format for answering specific questions like "Show me a list of customer names and their cities." For that, we build Projections.

A projection listens to events and builds a tailor-made, optimized database table (or in-memory structure) for answering specific questions.

// A Projection builds a read-optimized view from events.
type CustomerListView struct {
    mu        sync.RWMutex
    customers map[string]CustomerSummary // Map CustomerID -> Summary
}

type CustomerSummary struct {
    ID      string
    Name    string
    City    string
    Active  bool
}

func NewCustomerListView() *CustomerListView {
    return &CustomerListView{
        customers: make(map[string]CustomerSummary),
    }
}

// Project is called for each relevant event to update the view.
func (clv *CustomerListView) Project(event *Event) error {
    clv.mu.Lock()
    defer clv.mu.Unlock()

    switch event.Type {
    case "CustomerRegistered":
        var data CustomerRegisteredData
        json.Unmarshal(event.Data, &data)
        // Extract city from address (simple example)
        city := extractCity(data.Address)
        clv.customers[event.StreamID] = CustomerSummary{
            ID:     event.StreamID,
            Name:   data.Name,
            City:   city,
            Active: true,
        }
    case "CustomerAddressUpdated":
        var data map[string]string
        json.Unmarshal(event.Data, &data)
        summary, exists := clv.customers[event.StreamID]
        if exists {
            summary.City = extractCity(data["newAddress"])
            clv.customers[event.StreamID] = summary
        }
    case "CustomerArchived":
        summary, exists := clv.customers[event.StreamID]
        if exists {
            summary.Active = false
            clv.customers[event.StreamID] = summary
        }
    }
    return nil
}

// Query method to serve the read side.
func (clv *CustomerListView) GetCustomersByCity(city string) []CustomerSummary {
    clv.mu.RLock()
    defer clv.mu.RUnlock()
    var result []CustomerSummary
    for _, cust := range clv.customers {
        if cust.City == city {
            result = append(result, cust)
        }
    }
    return result
}
Enter fullscreen mode Exit fullscreen mode

The projection is a simple state machine. It says, "When I see a CustomerRegistered event, I will add a row to my lookup map. When I see an AddressUpdated event, I will find that row and update the city." This model is now perfect for answering the question "Who lives in Boston?" instantly. You can have many different projections for different purposes: one for lists, one for search indexes, one for reporting totals.

How do projections get the events? They subscribe to the event store. In a microservices setup, this could be done through a message broker (like Kafka) that distributes events. For simplicity, let's implement a simple subscription.

// A Subscription allows a projection to listen for new events.
type Subscription struct {
    eventStore *EventStore
    handler    func(*Event) error // The Project method
    quitChan   chan struct{}
}

func NewSubscription(store *EventStore, handler func(*Event) error) *Subscription {
    return &Subscription{
        eventStore: store,
        handler:    handler,
        quitChan:   make(chan struct{}),
    }
}

// Start begins listening. In reality, this would track its position.
// For now, it's a simple example that processes new events for a specific stream.
func (sub *Subscription) Start(streamID string) {
    go func() {
        lastVersion := int64(0)
        for {
            select {
            case <-sub.quitChan:
                return
            default:
                // Poll for new events (in real life, use notifications)
                events, _ := sub.eventStore.LoadEvents(streamID)
                if int64(len(events)) > lastVersion {
                    // Process only new events
                    for i := lastVersion; i < int64(len(events)); i++ {
                        sub.handler(events[i])
                    }
                    lastVersion = int64(len(events))
                }
                time.Sleep(100 * time.Millisecond) // Simple throttle
            }
        }
    }()
}
Enter fullscreen mode Exit fullscreen mode

Finally, let's stitch it all together in a main function to see the flow.

func main() {
    // 1. Create the core infrastructure
    eventStore := NewEventStore()
    cmdHandler := NewCommandHandler(eventStore)

    // 2. Create a read-model projection
    customerListView := NewCustomerListView()

    // 3. Subscribe the projection to events (for all customers)
    // In a real app, you'd have a more sophisticated subscription mechanism.
    // For demo, we'll manually project after each command.
    projectEvents := func(events []*Event) {
        for _, e := range events {
            customerListView.Project(e)
        }
    }

    // 4. Process a command (Write Side)
    ctx := context.Background()
    // First, create a customer
    createCmd := &Command{
        ID:       uuid.New().String(),
        Type:     "UpdateCustomerAddress", // We'd have a specific Create command type
        StreamID: "Customer-1001",
        Data: map[string]interface{}{
            "name":    "Alice",
            "address": "123 Main St, Boston, MA",
        },
    }
    // We'll simulate a registration by appending an event directly for simplicity.
    eventStore.Append(ctx, "Customer-1001", "CustomerRegistered", map[string]string{
        "name":    "Alice",
        "address": "123 Main St, Boston, MA",
    })

    // Load the events we just created to project them
    initialEvents, _ := eventStore.LoadEvents("Customer-1001")
    projectEvents(initialEvents)

    // 5. Now, update the address via a command (real flow)
    updateCmd := &Command{
        ID:       uuid.New().String(),
        Type:     "UpdateCustomerAddress",
        StreamID: "Customer-1001",
        Data: map[string]interface{}{
            "address": "456 Oak Ave, Cambridge, MA",
        },
    }
    err := cmdHandler.HandleUpdateAddress(ctx, updateCmd)
    if err != nil {
        fmt.Println("Command failed:", err)
        return
    }
    fmt.Println("Address updated successfully.")

    // 6. Get the new events and update the projection
    updatedEvents, _ := eventStore.LoadEvents("Customer-1001")
    // Only project the new one (in reality, the subscription would handle this)
    projectEvents(updatedEvents[len(initialEvents):])

    // 7. Query the read model (Read Side)
    fmt.Println("\nCustomers in Cambridge:")
    for _, cust := range customerListView.GetCustomersByCity("Cambridge") {
        fmt.Printf("  - %s (%s)\n", cust.Name, cust.ID)
    }

    // 8. Demonstrate the full event history
    fmt.Println("\nFull event history for Customer-1001:")
    allEvents, _ := eventStore.LoadEvents("Customer-1001")
    for _, e := range allEvents {
        fmt.Printf("  v%d [%s]: %s\n", e.Version, e.Timestamp.Format("15:04:05"), e.Type)
    }
}
Enter fullscreen mode Exit fullscreen mode

This architecture gives you a lot. You have a complete audit trail. You can rebuild your read models from scratch if they become corrupted, because the source of truth is the event log. You can add new types of queries (new projections) without touching the complex command-handling logic. The write side stays focused on maintaining data integrity, and the read side is free to be optimized for speed.

Moving to microservices, this pattern is very helpful. Each service can own its event stream. If the "Payment" service needs to know about an "OrderPlaced" event from the "Order" service, the Order service publishes it. The Payment service listens, updates its own internal state via its own events, and builds its own projections. The services are decoupled, communicating asynchronously through events.

Performance is a key consideration. Go's concurrency primitives—goroutines and channels—are excellent for building this. You can have a pool of goroutines processing commands, another pool handling projection updates, and channels to pass events between components with backpressure. The sync.RWMutex in our examples protects the in-memory state, but for production, you'd use a real database for the event store (like PostgreSQL, or purpose-built stores) and likely use a message queue for publishing events to projections.

The initial learning curve is steeper than a simple CRUD setup. You have to think in terms of events and commands. However, for complex business domains where understanding the history is critical, or where you need to scale reads and writes differently, the investment pays off. Your system gains a form of time travel, and its components become loosely coupled, focused, and easier to reason about in isolation. Start with a bounded context where the business logic is complex, and you'll likely find that event sourcing with CQRS brings a welcome clarity.

📘 Checkout my latest ebook for free on my channel!

Be sure to like, share, comment, and subscribe to the channel!


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)