DEV Community

Cover image for How to Build a Fault-Tolerant Stream Processing System in Go With Exactly-Once Guarantees
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

How to Build a Fault-Tolerant Stream Processing System in Go With Exactly-Once Guarantees

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Building a system that handles a continuous flow of data, like clicks on a website or sensor readings, is a common challenge. You need to group this data into time windows, calculate running totals or averages, and guarantee that every piece of data is processed once, and only once, even if parts of the system fail. Let me walk you through how to build such a system in Go, piece by piece.

Think of a data stream as a never-ending conveyor belt. Items arrive at different times, sometimes out of order. Our job is to sort them, put them into time-based buckets (like "last 5 minutes"), do math on each bucket, and send the results onward, all while keeping perfect track of what we've already done.

The heart of our system is the StreamProcessor. It coordinates everything. When you create one, you give it some settings: how big its internal channels should be, how long to wait for late data, and where to save its progress.

config := ProcessorConfig{
    BufferSize:        10000,
    WatermarkDelay:    5 * time.Second,
    CheckpointInterval: 30 * time.Second,
}
processor := NewStreamProcessor(config)
Enter fullscreen mode Exit fullscreen mode

Events flow into the system through the EventIngestor. This is the front door. Its first job is to deal with a messy reality: events don't always arrive in the order they were created. A network delay might hold up an earlier event, while a later one zips through.

To handle this, we use a concept called a watermark. It's like a moving finish line. We watch the timestamps on incoming events. The watermark is set to the highest timestamp we've seen, minus a waiting period (like 5 seconds). This waiting period is our allowance for late data. We promise not to finalize any calculations for a time period until we're reasonably sure no more data from that period will arrive. Events with a timestamp older than the current watermark are safe to process. Newer ones are held in a buffer.

func (wm *WatermarkManager) Update(eventTime int64) {
    wm.mu.Lock()
    defer wm.mu.Unlock()
    // ... find the maximum timestamp seen ...
    wm.currentWatermark = maxTs - int64(wm.maxDelay/time.Millisecond)
}
Enter fullscreen mode Exit fullscreen mode

The WindowManager is where the core logic lives. A window is just a time range. A "tumbling window" might be every minute, non-overlapping. A "sliding window" might be a five-minute average, updated every minute. When an event is deemed ready (its timestamp is past the watermark), the manager figures out which windows it belongs to.

For each relevant window, we need an Aggregator. This is a simple object that knows how to update a calculation. If we're counting events, the aggregator holds a number and adds one. If we're summing a value, it holds a running total.

type CountAggregator struct {
    count int64
}

func (ca *CountAggregator) Aggregate(event StreamEvent) (interface{}, error) {
    atomic.AddInt64(&ca.count, 1)
    return ca.count, nil
}
Enter fullscreen mode Exit fullscreen mode

The magic of "exactly-once" processing hinges on two things: remembering what you've done and being able to restart from a known good point. This is the job of the StateStore and the Checkpointer.

Every event has a unique ID. Before we even look at an event's data, we ask the StateStore: "Have I seen this ID before?" The store checks a fast cache and, if needed, a persistent database. If the answer is yes, we skip the event. This handles duplicates that might be sent if a producer retries.

func (sp *StreamProcessor) processEvents(ctx context.Context) error {
    for {
        select {
        case event := <-sp.ingestor.orderedChan:
            // The critical check
            if processed := sp.stateStore.IsProcessed(event.ID); processed {
                continue // Skip duplicate
            }
            // ... process the event ...
            // Mark it as done
            sp.stateStore.MarkProcessed(event.ID, event.Timestamp)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

But what if our entire program crashes? We don't want to lose the results of all the windows we've been calculating. This is where checkpoints come in. On a regular schedule (e.g., every 30 seconds), the Checkpointer tells every component to pause and take a snapshot.

It captures everything: the state of every aggregator in every window, the current watermark, and the list of processed event IDs. It writes this snapshot durably—to disk or a database. This snapshot is a checkpoint.

func (c *Checkpointer) createCheckpoint(store *StateStore) {
    checkpoint := Checkpoint{
        ID:        ksuid.New().String(), // A unique ID
        Timestamp: time.Now().UnixNano(),
        State:     store.Snapshot(), // Grab all state
    }
    store.checkpoints[checkpoint.ID] = checkpoint
    c.commitCheckpoint(store, checkpoint)
}
Enter fullscreen mode Exit fullscreen mode

When our processor starts up, its first step is to look for the latest successful checkpoint. If it finds one, it loads all the saved state. The aggregators are restored with their previous counts, the watermark is reset, and the list of processed IDs is reloaded. Then, it starts reading the event stream from the point after that checkpoint. It will re-process some events, but the duplicate detection in the StateStore will filter them out. The result is that the system's output is exactly as if it had run continuously without failing.

Let's look at what happens to a single event, step-by-step.

  1. Arrival: An event {ID: "abc123", Timestamp: 1625097600000, Data: {value: 42}} is sent to processor.Process().
  2. Buffering: The EventIngestor receives it, updates the watermark, and puts it in a buffer if its timestamp is too new.
  3. Ordering: Once the watermark advances past the event's timestamp, it's pushed into the ordered channel.
  4. Deduplication: The main loop receives it and asks the StateStore if "abc123" was processed. It hasn't been, so we continue.
  5. Window Assignment: The WindowManager checks all active windows. Does a 1-minute tumbling window for [1625097600000, 1625097660000) exist? It creates one if needed.
  6. Aggregation: The manager finds the SumAggregator for that window and calls agg.Aggregate(event). The aggregator adds 42 to its internal total.
  7. State Save: The new state of the aggregator (the new sum) is saved to the StateStore with a key like "window_sum_1min:1625097600000".
  8. Completion: The event's ID "abc123" is marked as processed in the state store.
  9. Emission: If this event caused the window to be complete (e.g., the watermark passed the window's end time), the window triggers. It takes the final result from the aggregator and sends it out as output.

All of this happens concurrently. The ingestor, the watermark updater, the window manager, and the checkpointer all run in their own goroutines, communicating through channels. This is where Go's concurrency model shines. We use mutexes (sync.RWMutex) sparingly, only to protect specific maps or slices when multiple goroutines might access them.

A real implementation needs a solid backend for the StateStore. In-memory maps work for testing, but for production, you need persistence. You could use embedded databases like RocksDB or Badger. They store data locally on disk and are very fast for key-value lookups, which is most of what we do.

// A simplified RocksDB backend
type RocksDBBackend struct {
    db *gorocksdb.DB
}

func (rb *RocksDBBackend) Put(key string, value []byte) error {
    wb := gorocksdb.NewWriteBatch()
    defer wb.Destroy()
    wb.Put([]byte(key), value)
    return rb.db.Write(defaultWriteOptions, wb)
}
Enter fullscreen mode Exit fullscreen mode

Performance tuning is crucial. The size of the input channel buffer affects backpressure. If it fills up, producers will block. The watermark delay is a trade-off: a longer delay means better handling of late events, but it also increases the latency of your results. The checkpoint interval is a trade-off between recovery time (shorter interval) and performance overhead (longer interval).

You'll want to add metrics to every part. How many events per second are you receiving? What's the average processing latency? How far behind the current time is your watermark? This data is vital for operating the system in production.

type StreamMetrics struct {
    EventsReceived      uint64
    EventsProcessed     uint64
    DuplicateEvents     uint64
    WatermarkLag        int64 // in milliseconds
}
Enter fullscreen mode Exit fullscreen mode

In a distributed setup, you would run multiple instances of this processor. You'd need a way to partition your event stream, perhaps by a key in the data. Each instance would be responsible for a subset of the keys. The checkpointing becomes more complex, requiring a distributed coordination system like Apache ZooKeeper or etcd to ensure all instances snapshot their state consistently.

Building this might seem daunting, but by breaking it down into these components—ingestion with watermarks, windowed aggregation, stateful deduplication, and periodic checkpointing—you create a system that is both powerful and understandable. It handles the inherent disorder of real-world data and provides the strong guarantees necessary for accurate, reliable results. You start with events on a conveyor belt and end with timely, correct insights, no matter what happens along the way.

📘 Checkout my latest ebook for free on my channel!

Be sure to like, share, comment, and subscribe to the channel!


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)