DEV Community

Cover image for Real-Time Data Stream Processing in Go: Backpressure, Windowing, and Fault Tolerance Explained
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

Real-Time Data Stream Processing in Go: Backpressure, Windowing, and Fault Tolerance Explained

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Let’s talk about building a system that can handle a constant flow of data. Imagine you’re trying to count cars on a busy highway in real-time. The cars keep coming, non-stop. You need to count them, maybe group them by color every minute, and do it all without your system falling over when traffic suddenly doubles. That’s the world of stream processing. In this article, I'll show you how to build the core of such a system using Go. We’ll focus on three critical ideas: managing flow so we don’t get overwhelmed (backpressure), grouping data into time buckets (windowing), and doing it all reliably.

Go is a fantastic language for this. Its straightforward concurrency model with goroutines and channels feels like it was designed for data pipelines. You can think of a channel as a conveyor belt. One goroutine puts items on the belt, and another takes them off for processing. Our job is to build a network of these conveyor belts that can speed up, slow down, and organize items without breaking.

Let’s start with the very foundation: moving data from a source to a destination. A basic pipeline might look like this.

package main

import (
    "fmt"
    "time"
)

func main() {
    // A source channel producing data
    source := make(chan int)
    // A sink channel receiving processed data
    sink := make(chan int)

    // Start a data producer
    go func() {
        for i := 1; i <= 5; i++ {
            fmt.Printf("Producing: %d\n", i)
            source <- i
            time.Sleep(100 * time.Millisecond) // Simulate work
        }
        close(source)
    }()

    // Start a processing stage
    go func() {
        for value := range source {
            result := value * 2 // Simple processing
            sink <- result
        }
        close(sink)
    }()

    // Consume the final results
    for result := range sink {
        fmt.Printf("Result: %d\n", result)
    }
}
Enter fullscreen mode Exit fullscreen mode

This is a simple chain: produce, transform, consume. But what happens if the consumer is slower than the producer? The channel will fill up. In Go, a channel with a buffer can hold a certain number of items. Once it’s full, the producer will be blocked until space frees up. This is a primitive form of backpressure—the slow consumer indirectly slows down the fast producer. For a robust system, we need to manage this explicitly and gracefully.

A real-world source, like a sensor or a message queue, often produces data continuously. We need a way to represent this and control its speed.

type DataSource struct {
    dataChan chan interface{}
    rate     int // desired messages per second
    running  bool
    mu       sync.RWMutex // protects the 'running' state
}

func NewDataSource(rate int) *DataSource {
    return &DataSource{
        dataChan: make(chan interface{}, 1000), // Buffer to smooth bursts
        rate:     rate,
        running:  true,
    }
}

func (ds *DataSource) Emit(value interface{}) bool {
    ds.mu.RLock()
    defer ds.mu.RUnlock()
    if !ds.running {
        return false
    }
    // Try to send without blocking indefinitely
    select {
    case ds.dataChan <- value:
        return true
    default:
        // Channel buffer is full. This is backpressure signal.
        fmt.Println("Source buffer full, dropping data.")
        return false
    }
}
Enter fullscreen mode Exit fullscreen mode

Here, if our dataChan buffer is full, the Emit function will drop the data in this simple example. In a more advanced system, you might want to pause the source or retry. The key is that the source is aware of the downstream capacity.

Now, let's connect sources to processors. An operator is a processing stage. It takes data in, transforms it, and passes it along.

type StreamOperator interface {
    Process(interface{}) (interface{}, error)
    Close() error
}

// A concrete operator that doubles an integer
type MapOperator struct {
    fn func(interface{}) interface{}
}

func (mo *MapOperator) Process(data interface{}) (interface{}, error) {
    return mo.fn(data), nil
}
Enter fullscreen mode Exit fullscreen mode

To manage the flow between many processors, we need a traffic controller. This is where we implement explicit backpressure. The goal is to limit the number of data items being processed at any given moment. If we have too many in flight, we risk running out of memory.

type ThroughputController struct {
    maxInFlight int32        // Maximum allowed concurrent items
    inFlight    int32        // Currently processing count
    semaphore   chan struct{} // Controls concurrent access
}

func NewThroughputController(limit int) *ThroughputController {
    return &ThroughputController{
        maxInFlight: int32(limit),
        semaphore:   make(chan struct{}, limit), // Buffered channel as semaphore
    }
}

// Acquire tries to get a processing slot.
func (tc *ThroughputController) Acquire() bool {
    select {
    case tc.semaphore <- struct{}{}:
        atomic.AddInt32(&tc.inFlight, 1)
        return true
    default:
        // No slots available. Backpressure applied.
        return false
    }
}

// Release frees a slot.
func (tc *ThroughputController) Release() {
    <-tc.semaphore
    atomic.AddInt32(&tc.inFlight, -1)
}
Enter fullscreen mode Exit fullscreen mode

Now, in our main processing loop, we can use this controller before starting to work on a piece of data.

func (sp *StreamProcessor) processItem(data interface{}) {
    if !sp.throughput.Acquire() {
        log.Println("Backpressure: cannot acquire slot, skipping.")
        return // Or buffer, or slow down the source.
    }
    defer sp.throughput.Release()

    // ... actual processing ...
}
Enter fullscreen mode Exit fullscreen mode

This ensures that the number of items being processed concurrently never exceeds maxInFlight. If all slots are busy, the system signals backpressure upstream, which could cause sources to slow down or data to be temporarily buffered.

So far, we've handled a continuous flow. But for analytics, we often want to ask questions like "What was the average temperature in the last 5 minutes?" This requires grouping data into windows of time.

A window is simply a bucket for data that falls within a specific time range. The most basic type is a tumbling window—fixed, non-overlapping intervals. Let's define one.

type TimeWindow struct {
    Size     time.Duration // e.g., 5 * time.Minute
    Function string        // "sum", "avg", "count"
}
Enter fullscreen mode Exit fullscreen mode

We need a place to hold data for an active window. This is our window state.

type WindowState struct {
    Start    time.Time
    End      time.Time
    Elements []interface{}
    Count    int
    mu       sync.Mutex
}
Enter fullscreen mode Exit fullscreen mode

Now, the big question: when do we process a window? We could wait until its time period is completely over. But in a real-time system, we often want early results. We can trigger processing based on two things: time (e.g., every second) or count (e.g., after 100 items). This is called a trigger.

Let's integrate windowing into our operator. We'll modify our StreamOperator interface.

type StreamOperator interface {
    Process(interface{}) (interface{}, error)
    Window() *TimeWindow // Returns nil if not a windowed operator
    Close() error
}
Enter fullscreen mode Exit fullscreen mode

When an operator has a window, our processor needs to route data to the correct window state. Here’s a simplified version of that logic.

func (sp *StreamProcessor) applyWindowing(data interface{}, window *TimeWindow, eventTime time.Time) {
    // Calculate which window this event belongs to.
    // For a tumbling window of 1 minute:
    windowEnd := eventTime.Truncate(window.Size).Add(window.Size)
    windowStart := windowEnd.Add(-window.Size)

    windowID := fmt.Sprintf("%d-%d", windowStart.Unix(), windowEnd.Unix())

    sp.windowMu.Lock()
    state, exists := sp.windowState[windowID]
    if !exists {
        state = &WindowState{
            Start:    windowStart,
            End:      windowEnd,
            Elements: []interface{}{},
        }
        sp.windowState[windowID] = state
    }
    sp.windowMu.Unlock()

    state.mu.Lock()
    state.Elements = append(state.Elements, data)
    state.Count++
    trigger := state.Count >= 100 // Example count-based trigger
    state.mu.Unlock()

    if trigger {
        go sp.processWindow(state, window)
    }
}
Enter fullscreen mode Exit fullscreen mode

The processWindow function would then apply the aggregation (like sum or average) to all the elements in state.Elements and send the result downstream to the sink.

But time is tricky in distributed systems. Data doesn't always arrive in order. A network delay might cause a 9:01 AM event to arrive after a 9:02 AM event. If we close the 9:00-9:01 window at 9:01 on the dot, we might miss that late event. This is where the concept of a watermark comes in.

A watermark is a timestamp that says, "I believe all events with a timestamp earlier than X have been processed." It's an estimate of event time progress. We can use it to decide when it's safe to finalize a window.

type WatermarkManager struct {
    watermarks map[string]time.Time // per-source watermarks
    current    time.Time             // global watermark
    mu         sync.RWMutex
}

func (wm *WatermarkManager) Advance(sourceID string, timestamp time.Time) {
    wm.mu.Lock()
    defer wm.mu.Unlock()
    wm.watermarks[sourceID] = timestamp
    // The global watermark is the minimum of all source watermarks.
    // This is a conservative estimate.
    min := time.Now()
    for _, t := range wm.watermarks {
        if t.Before(min) {
            min = t
        }
    }
    wm.current = min
}
Enter fullscreen mode Exit fullscreen mode

Periodically, or when the watermark advances, we can check all our window states.

func (wm *WatermarkManager) TriggerCompletedWindows() {
    wm.mu.RLock()
    defer wm.mu.RUnlock()

    for id, state := range wm.windowState {
        // If the watermark has passed the end of the window, we can process it.
        if state.End.Before(wm.current) {
            go wm.processWindowFinally(state)
            delete(wm.windowState, id) // Clean up
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

With watermarks, we can handle late-arriving data to some extent. If data arrives for a window that hasn't been finalized yet (watermark hasn't passed its end), we can still add it. If it arrives after the window is finalized, we might send it to a special side output for late data handling.

Now, let's talk about not losing data. Systems fail. A process can crash. We need fault tolerance. A common method is checkpointing. Periodically, we save the state of our entire pipeline to a durable store. If we crash, we restart from the last checkpoint.

What state do we need to save? For our windowing operator, it's the contents of all active WindowState objects. For a simple counter operator, it's just the counter value.

We can add a method to our operator interface for checkpointing.

type StatefulOperator interface {
    Snapshot() ([]byte, error)   // Returns serialized state
    Restore([]byte) error        // Restores state from bytes
}
Enter fullscreen mode Exit fullscreen mode

Our pipeline would then have a checkpoint coordinator that periodically tells all operators to snapshot their state, collects them, and writes them to a file or database. This is a complex topic, but the essence is to make state explicit and serializable.

Let's look at a more complete example, tying the concepts together. This is a simplified orchestrator.

type StreamProcessor struct {
    sources   []*DataSource
    operators []StreamOperator
    sink      chan interface{}
    throughput *ThroughputController
    watermark  *WatermarkManager
    windowState map[string]*WindowState
    windowMu  sync.RWMutex
}

func (sp *StreamProcessor) Run(ctx context.Context) {
    // Start all sources
    for _, source := range sp.sources {
        go source.Run(ctx)
    }

    // Main processing loop
    for {
        select {
        case <-ctx.Done():
            return
        case data := <-sp.nextDataItem():
            sp.processWithBackpressure(data)
        case ts := <-sp.watermarkTick:
            sp.watermark.Advance("system", ts)
            sp.watermark.TriggerCompletedWindows()
        }
    }
}

func (sp *StreamProcessor) processWithBackpressure(data interface{}) {
    if !sp.throughput.Acquire() {
        // Apply backpressure: slow down or buffer.
        time.Sleep(50 * time.Millisecond)
        // Optionally, try again or signal source.
        return
    }
    defer sp.throughput.Release()

    for _, op := range sp.operators {
        result, err := op.Process(data)
        if err != nil {
            log.Printf("Operator error: %v", err)
            break
        }
        data = result // Pass result to next operator

        // If this is a windowed operator, handle grouping.
        if win := op.Window(); win != nil {
            // We need an event time. Let's assume it's in the data.
            eventTime := extractTimestamp(data)
            sp.applyWindowing(data, win, eventTime)
        }
    }
    // Send final result to sink
    select {
    case sp.sink <- data:
        // success
    default:
        log.Println("Sink is busy, applying backpressure.")
    }
}
Enter fullscreen mode Exit fullscreen mode

When you run this, you’ll see data flowing, windows being created, aggregated, and emitted. The ThroughputController will prevent runaway memory usage, and the WatermarkManager will keep event time moving forward.

In production, you'd need to add monitoring. How many items are processed per second? What's the current latency? How many windows are active? You can expose these as metrics.

type ProcessorStats struct {
    ProcessedCount uint64
    SinkCount      uint64
    WindowCount    int
}
Enter fullscreen mode Exit fullscreen mode

You'd also need to think about deployment. This single-process design is great for learning, but for high throughput, you'd want to partition your data and run multiple instances of your pipeline in parallel. That introduces new challenges like distributed state and watermark aggregation, which are beyond our scope today.

Building this from scratch teaches you the mechanics. In the real world, you might use a framework like Apache Flink or Kafka Streams, which handle these complex distributed problems for you. But understanding what happens under the hood helps you use those tools effectively and debug them when things go wrong.

The beauty of Go for streaming is in its simplicity. Channels and goroutines give you the primitives to model data flow directly. Backpressure becomes a matter of channel buffers and semaphores. State, while careful, can be managed with maps and mutexes. It’s a powerful way to build responsive, resilient systems that can make sense of data as it happens, not after the fact.

Start with a simple pipeline. Add a source, an operator, and a sink. Then introduce a buffer to handle bursts. Add a semaphore to limit concurrency. Then try grouping items by time. Step by step, you’ll build a system that can not only handle a stream of data but also understand it in real-time.

📘 Checkout my latest ebook for free on my channel!

Be sure to like, share, comment, and subscribe to the channel!


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)