As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
Let’s talk about building a system that can handle a constant flow of data. Imagine you’re trying to count cars on a busy highway in real-time. The cars keep coming, non-stop. You need to count them, maybe group them by color every minute, and do it all without your system falling over when traffic suddenly doubles. That’s the world of stream processing. In this article, I'll show you how to build the core of such a system using Go. We’ll focus on three critical ideas: managing flow so we don’t get overwhelmed (backpressure), grouping data into time buckets (windowing), and doing it all reliably.
Go is a fantastic language for this. Its straightforward concurrency model with goroutines and channels feels like it was designed for data pipelines. You can think of a channel as a conveyor belt. One goroutine puts items on the belt, and another takes them off for processing. Our job is to build a network of these conveyor belts that can speed up, slow down, and organize items without breaking.
Let’s start with the very foundation: moving data from a source to a destination. A basic pipeline might look like this.
package main
import (
"fmt"
"time"
)
func main() {
// A source channel producing data
source := make(chan int)
// A sink channel receiving processed data
sink := make(chan int)
// Start a data producer
go func() {
for i := 1; i <= 5; i++ {
fmt.Printf("Producing: %d\n", i)
source <- i
time.Sleep(100 * time.Millisecond) // Simulate work
}
close(source)
}()
// Start a processing stage
go func() {
for value := range source {
result := value * 2 // Simple processing
sink <- result
}
close(sink)
}()
// Consume the final results
for result := range sink {
fmt.Printf("Result: %d\n", result)
}
}
This is a simple chain: produce, transform, consume. But what happens if the consumer is slower than the producer? The channel will fill up. In Go, a channel with a buffer can hold a certain number of items. Once it’s full, the producer will be blocked until space frees up. This is a primitive form of backpressure—the slow consumer indirectly slows down the fast producer. For a robust system, we need to manage this explicitly and gracefully.
A real-world source, like a sensor or a message queue, often produces data continuously. We need a way to represent this and control its speed.
type DataSource struct {
dataChan chan interface{}
rate int // desired messages per second
running bool
mu sync.RWMutex // protects the 'running' state
}
func NewDataSource(rate int) *DataSource {
return &DataSource{
dataChan: make(chan interface{}, 1000), // Buffer to smooth bursts
rate: rate,
running: true,
}
}
func (ds *DataSource) Emit(value interface{}) bool {
ds.mu.RLock()
defer ds.mu.RUnlock()
if !ds.running {
return false
}
// Try to send without blocking indefinitely
select {
case ds.dataChan <- value:
return true
default:
// Channel buffer is full. This is backpressure signal.
fmt.Println("Source buffer full, dropping data.")
return false
}
}
Here, if our dataChan buffer is full, the Emit function will drop the data in this simple example. In a more advanced system, you might want to pause the source or retry. The key is that the source is aware of the downstream capacity.
Now, let's connect sources to processors. An operator is a processing stage. It takes data in, transforms it, and passes it along.
type StreamOperator interface {
Process(interface{}) (interface{}, error)
Close() error
}
// A concrete operator that doubles an integer
type MapOperator struct {
fn func(interface{}) interface{}
}
func (mo *MapOperator) Process(data interface{}) (interface{}, error) {
return mo.fn(data), nil
}
To manage the flow between many processors, we need a traffic controller. This is where we implement explicit backpressure. The goal is to limit the number of data items being processed at any given moment. If we have too many in flight, we risk running out of memory.
type ThroughputController struct {
maxInFlight int32 // Maximum allowed concurrent items
inFlight int32 // Currently processing count
semaphore chan struct{} // Controls concurrent access
}
func NewThroughputController(limit int) *ThroughputController {
return &ThroughputController{
maxInFlight: int32(limit),
semaphore: make(chan struct{}, limit), // Buffered channel as semaphore
}
}
// Acquire tries to get a processing slot.
func (tc *ThroughputController) Acquire() bool {
select {
case tc.semaphore <- struct{}{}:
atomic.AddInt32(&tc.inFlight, 1)
return true
default:
// No slots available. Backpressure applied.
return false
}
}
// Release frees a slot.
func (tc *ThroughputController) Release() {
<-tc.semaphore
atomic.AddInt32(&tc.inFlight, -1)
}
Now, in our main processing loop, we can use this controller before starting to work on a piece of data.
func (sp *StreamProcessor) processItem(data interface{}) {
if !sp.throughput.Acquire() {
log.Println("Backpressure: cannot acquire slot, skipping.")
return // Or buffer, or slow down the source.
}
defer sp.throughput.Release()
// ... actual processing ...
}
This ensures that the number of items being processed concurrently never exceeds maxInFlight. If all slots are busy, the system signals backpressure upstream, which could cause sources to slow down or data to be temporarily buffered.
So far, we've handled a continuous flow. But for analytics, we often want to ask questions like "What was the average temperature in the last 5 minutes?" This requires grouping data into windows of time.
A window is simply a bucket for data that falls within a specific time range. The most basic type is a tumbling window—fixed, non-overlapping intervals. Let's define one.
type TimeWindow struct {
Size time.Duration // e.g., 5 * time.Minute
Function string // "sum", "avg", "count"
}
We need a place to hold data for an active window. This is our window state.
type WindowState struct {
Start time.Time
End time.Time
Elements []interface{}
Count int
mu sync.Mutex
}
Now, the big question: when do we process a window? We could wait until its time period is completely over. But in a real-time system, we often want early results. We can trigger processing based on two things: time (e.g., every second) or count (e.g., after 100 items). This is called a trigger.
Let's integrate windowing into our operator. We'll modify our StreamOperator interface.
type StreamOperator interface {
Process(interface{}) (interface{}, error)
Window() *TimeWindow // Returns nil if not a windowed operator
Close() error
}
When an operator has a window, our processor needs to route data to the correct window state. Here’s a simplified version of that logic.
func (sp *StreamProcessor) applyWindowing(data interface{}, window *TimeWindow, eventTime time.Time) {
// Calculate which window this event belongs to.
// For a tumbling window of 1 minute:
windowEnd := eventTime.Truncate(window.Size).Add(window.Size)
windowStart := windowEnd.Add(-window.Size)
windowID := fmt.Sprintf("%d-%d", windowStart.Unix(), windowEnd.Unix())
sp.windowMu.Lock()
state, exists := sp.windowState[windowID]
if !exists {
state = &WindowState{
Start: windowStart,
End: windowEnd,
Elements: []interface{}{},
}
sp.windowState[windowID] = state
}
sp.windowMu.Unlock()
state.mu.Lock()
state.Elements = append(state.Elements, data)
state.Count++
trigger := state.Count >= 100 // Example count-based trigger
state.mu.Unlock()
if trigger {
go sp.processWindow(state, window)
}
}
The processWindow function would then apply the aggregation (like sum or average) to all the elements in state.Elements and send the result downstream to the sink.
But time is tricky in distributed systems. Data doesn't always arrive in order. A network delay might cause a 9:01 AM event to arrive after a 9:02 AM event. If we close the 9:00-9:01 window at 9:01 on the dot, we might miss that late event. This is where the concept of a watermark comes in.
A watermark is a timestamp that says, "I believe all events with a timestamp earlier than X have been processed." It's an estimate of event time progress. We can use it to decide when it's safe to finalize a window.
type WatermarkManager struct {
watermarks map[string]time.Time // per-source watermarks
current time.Time // global watermark
mu sync.RWMutex
}
func (wm *WatermarkManager) Advance(sourceID string, timestamp time.Time) {
wm.mu.Lock()
defer wm.mu.Unlock()
wm.watermarks[sourceID] = timestamp
// The global watermark is the minimum of all source watermarks.
// This is a conservative estimate.
min := time.Now()
for _, t := range wm.watermarks {
if t.Before(min) {
min = t
}
}
wm.current = min
}
Periodically, or when the watermark advances, we can check all our window states.
func (wm *WatermarkManager) TriggerCompletedWindows() {
wm.mu.RLock()
defer wm.mu.RUnlock()
for id, state := range wm.windowState {
// If the watermark has passed the end of the window, we can process it.
if state.End.Before(wm.current) {
go wm.processWindowFinally(state)
delete(wm.windowState, id) // Clean up
}
}
}
With watermarks, we can handle late-arriving data to some extent. If data arrives for a window that hasn't been finalized yet (watermark hasn't passed its end), we can still add it. If it arrives after the window is finalized, we might send it to a special side output for late data handling.
Now, let's talk about not losing data. Systems fail. A process can crash. We need fault tolerance. A common method is checkpointing. Periodically, we save the state of our entire pipeline to a durable store. If we crash, we restart from the last checkpoint.
What state do we need to save? For our windowing operator, it's the contents of all active WindowState objects. For a simple counter operator, it's just the counter value.
We can add a method to our operator interface for checkpointing.
type StatefulOperator interface {
Snapshot() ([]byte, error) // Returns serialized state
Restore([]byte) error // Restores state from bytes
}
Our pipeline would then have a checkpoint coordinator that periodically tells all operators to snapshot their state, collects them, and writes them to a file or database. This is a complex topic, but the essence is to make state explicit and serializable.
Let's look at a more complete example, tying the concepts together. This is a simplified orchestrator.
type StreamProcessor struct {
sources []*DataSource
operators []StreamOperator
sink chan interface{}
throughput *ThroughputController
watermark *WatermarkManager
windowState map[string]*WindowState
windowMu sync.RWMutex
}
func (sp *StreamProcessor) Run(ctx context.Context) {
// Start all sources
for _, source := range sp.sources {
go source.Run(ctx)
}
// Main processing loop
for {
select {
case <-ctx.Done():
return
case data := <-sp.nextDataItem():
sp.processWithBackpressure(data)
case ts := <-sp.watermarkTick:
sp.watermark.Advance("system", ts)
sp.watermark.TriggerCompletedWindows()
}
}
}
func (sp *StreamProcessor) processWithBackpressure(data interface{}) {
if !sp.throughput.Acquire() {
// Apply backpressure: slow down or buffer.
time.Sleep(50 * time.Millisecond)
// Optionally, try again or signal source.
return
}
defer sp.throughput.Release()
for _, op := range sp.operators {
result, err := op.Process(data)
if err != nil {
log.Printf("Operator error: %v", err)
break
}
data = result // Pass result to next operator
// If this is a windowed operator, handle grouping.
if win := op.Window(); win != nil {
// We need an event time. Let's assume it's in the data.
eventTime := extractTimestamp(data)
sp.applyWindowing(data, win, eventTime)
}
}
// Send final result to sink
select {
case sp.sink <- data:
// success
default:
log.Println("Sink is busy, applying backpressure.")
}
}
When you run this, you’ll see data flowing, windows being created, aggregated, and emitted. The ThroughputController will prevent runaway memory usage, and the WatermarkManager will keep event time moving forward.
In production, you'd need to add monitoring. How many items are processed per second? What's the current latency? How many windows are active? You can expose these as metrics.
type ProcessorStats struct {
ProcessedCount uint64
SinkCount uint64
WindowCount int
}
You'd also need to think about deployment. This single-process design is great for learning, but for high throughput, you'd want to partition your data and run multiple instances of your pipeline in parallel. That introduces new challenges like distributed state and watermark aggregation, which are beyond our scope today.
Building this from scratch teaches you the mechanics. In the real world, you might use a framework like Apache Flink or Kafka Streams, which handle these complex distributed problems for you. But understanding what happens under the hood helps you use those tools effectively and debug them when things go wrong.
The beauty of Go for streaming is in its simplicity. Channels and goroutines give you the primitives to model data flow directly. Backpressure becomes a matter of channel buffers and semaphores. State, while careful, can be managed with maps and mutexes. It’s a powerful way to build responsive, resilient systems that can make sense of data as it happens, not after the fact.
Start with a simple pipeline. Add a source, an operator, and a sink. Then introduce a buffer to handle bursts. Add a semaphore to limit concurrency. Then try grouping items by time. Step by step, you’ll build a system that can not only handle a stream of data but also understand it in real-time.
📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
Top comments (0)