As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
Building a system that handles a continuous flow of data, like clicks on a website or sensor readings, is a common challenge. You need to group this data into time windows, calculate running totals or averages, and guarantee that every piece of data is processed once, and only once, even if parts of the system fail. Let me walk you through how to build such a system in Go, piece by piece.
Think of a data stream as a never-ending conveyor belt. Items arrive at different times, sometimes out of order. Our job is to sort them, put them into time-based buckets (like "last 5 minutes"), do math on each bucket, and send the results onward, all while keeping perfect track of what we've already done.
The heart of our system is the StreamProcessor. It coordinates everything. When you create one, you give it some settings: how big its internal channels should be, how long to wait for late data, and where to save its progress.
config := ProcessorConfig{
BufferSize: 10000,
WatermarkDelay: 5 * time.Second,
CheckpointInterval: 30 * time.Second,
}
processor := NewStreamProcessor(config)
Events flow into the system through the EventIngestor. This is the front door. Its first job is to deal with a messy reality: events don't always arrive in the order they were created. A network delay might hold up an earlier event, while a later one zips through.
To handle this, we use a concept called a watermark. It's like a moving finish line. We watch the timestamps on incoming events. The watermark is set to the highest timestamp we've seen, minus a waiting period (like 5 seconds). This waiting period is our allowance for late data. We promise not to finalize any calculations for a time period until we're reasonably sure no more data from that period will arrive. Events with a timestamp older than the current watermark are safe to process. Newer ones are held in a buffer.
func (wm *WatermarkManager) Update(eventTime int64) {
wm.mu.Lock()
defer wm.mu.Unlock()
// ... find the maximum timestamp seen ...
wm.currentWatermark = maxTs - int64(wm.maxDelay/time.Millisecond)
}
The WindowManager is where the core logic lives. A window is just a time range. A "tumbling window" might be every minute, non-overlapping. A "sliding window" might be a five-minute average, updated every minute. When an event is deemed ready (its timestamp is past the watermark), the manager figures out which windows it belongs to.
For each relevant window, we need an Aggregator. This is a simple object that knows how to update a calculation. If we're counting events, the aggregator holds a number and adds one. If we're summing a value, it holds a running total.
type CountAggregator struct {
count int64
}
func (ca *CountAggregator) Aggregate(event StreamEvent) (interface{}, error) {
atomic.AddInt64(&ca.count, 1)
return ca.count, nil
}
The magic of "exactly-once" processing hinges on two things: remembering what you've done and being able to restart from a known good point. This is the job of the StateStore and the Checkpointer.
Every event has a unique ID. Before we even look at an event's data, we ask the StateStore: "Have I seen this ID before?" The store checks a fast cache and, if needed, a persistent database. If the answer is yes, we skip the event. This handles duplicates that might be sent if a producer retries.
func (sp *StreamProcessor) processEvents(ctx context.Context) error {
for {
select {
case event := <-sp.ingestor.orderedChan:
// The critical check
if processed := sp.stateStore.IsProcessed(event.ID); processed {
continue // Skip duplicate
}
// ... process the event ...
// Mark it as done
sp.stateStore.MarkProcessed(event.ID, event.Timestamp)
}
}
}
But what if our entire program crashes? We don't want to lose the results of all the windows we've been calculating. This is where checkpoints come in. On a regular schedule (e.g., every 30 seconds), the Checkpointer tells every component to pause and take a snapshot.
It captures everything: the state of every aggregator in every window, the current watermark, and the list of processed event IDs. It writes this snapshot durably—to disk or a database. This snapshot is a checkpoint.
func (c *Checkpointer) createCheckpoint(store *StateStore) {
checkpoint := Checkpoint{
ID: ksuid.New().String(), // A unique ID
Timestamp: time.Now().UnixNano(),
State: store.Snapshot(), // Grab all state
}
store.checkpoints[checkpoint.ID] = checkpoint
c.commitCheckpoint(store, checkpoint)
}
When our processor starts up, its first step is to look for the latest successful checkpoint. If it finds one, it loads all the saved state. The aggregators are restored with their previous counts, the watermark is reset, and the list of processed IDs is reloaded. Then, it starts reading the event stream from the point after that checkpoint. It will re-process some events, but the duplicate detection in the StateStore will filter them out. The result is that the system's output is exactly as if it had run continuously without failing.
Let's look at what happens to a single event, step-by-step.
- Arrival: An event
{ID: "abc123", Timestamp: 1625097600000, Data: {value: 42}}is sent toprocessor.Process(). - Buffering: The
EventIngestorreceives it, updates the watermark, and puts it in a buffer if its timestamp is too new. - Ordering: Once the watermark advances past the event's timestamp, it's pushed into the ordered channel.
- Deduplication: The main loop receives it and asks the
StateStoreif "abc123" was processed. It hasn't been, so we continue. - Window Assignment: The
WindowManagerchecks all active windows. Does a 1-minute tumbling window for[1625097600000, 1625097660000)exist? It creates one if needed. - Aggregation: The manager finds the
SumAggregatorfor that window and callsagg.Aggregate(event). The aggregator adds 42 to its internal total. - State Save: The new state of the aggregator (the new sum) is saved to the
StateStorewith a key like"window_sum_1min:1625097600000". - Completion: The event's ID "abc123" is marked as processed in the state store.
- Emission: If this event caused the window to be complete (e.g., the watermark passed the window's end time), the window triggers. It takes the final result from the aggregator and sends it out as output.
All of this happens concurrently. The ingestor, the watermark updater, the window manager, and the checkpointer all run in their own goroutines, communicating through channels. This is where Go's concurrency model shines. We use mutexes (sync.RWMutex) sparingly, only to protect specific maps or slices when multiple goroutines might access them.
A real implementation needs a solid backend for the StateStore. In-memory maps work for testing, but for production, you need persistence. You could use embedded databases like RocksDB or Badger. They store data locally on disk and are very fast for key-value lookups, which is most of what we do.
// A simplified RocksDB backend
type RocksDBBackend struct {
db *gorocksdb.DB
}
func (rb *RocksDBBackend) Put(key string, value []byte) error {
wb := gorocksdb.NewWriteBatch()
defer wb.Destroy()
wb.Put([]byte(key), value)
return rb.db.Write(defaultWriteOptions, wb)
}
Performance tuning is crucial. The size of the input channel buffer affects backpressure. If it fills up, producers will block. The watermark delay is a trade-off: a longer delay means better handling of late events, but it also increases the latency of your results. The checkpoint interval is a trade-off between recovery time (shorter interval) and performance overhead (longer interval).
You'll want to add metrics to every part. How many events per second are you receiving? What's the average processing latency? How far behind the current time is your watermark? This data is vital for operating the system in production.
type StreamMetrics struct {
EventsReceived uint64
EventsProcessed uint64
DuplicateEvents uint64
WatermarkLag int64 // in milliseconds
}
In a distributed setup, you would run multiple instances of this processor. You'd need a way to partition your event stream, perhaps by a key in the data. Each instance would be responsible for a subset of the keys. The checkpointing becomes more complex, requiring a distributed coordination system like Apache ZooKeeper or etcd to ensure all instances snapshot their state consistently.
Building this might seem daunting, but by breaking it down into these components—ingestion with watermarks, windowed aggregation, stateful deduplication, and periodic checkpointing—you create a system that is both powerful and understandable. It handles the inherent disorder of real-world data and provides the strong guarantees necessary for accurate, reliable results. You start with events on a conveyor belt and end with timely, correct insights, no matter what happens along the way.
📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
Top comments (0)