Real-Time Stream Processing
Real-time data processing operates on continuous, unbounded streams of events, delivering results with latency constraints that vary by application. In contrast to batch processing, which aggregates fixed datasets for periodic analysis, streaming systems ingest and transform events as they arrive, maintaining state across an infinite sequence.
Latency requirements differ significantly across domains. For algorithmic trading, sub-millisecond delays are critical to capitalize on market fluctuations. In ride-sharing or delivery tracking, latencies up to 1–5 seconds suffice for updating user interfaces with vehicle positions or estimated arrival times.
Key challenges include preserving event order despite network variability, ensuring exactly-once processing to avoid duplicates, performing deduplication on redundant events, and managing persistent state for aggregations or joins under failures.
Execution Environments
Real-time stream processing spans low-latency ingestion to distributed computation, addressing diverse latency and scalability needs. Different execution environments handle specific pipeline stages, from event capture to complex analytics.
- Golang delivers event ingestion and lightweight transformations typically in 1–5 ms (p50) and 5–20 ms (p95) latency on a single node.
- Apache Flink manages distributed, stateful streams with ~20–150 ms (p50) and 50–400 ms (p95) end-to-end latency, depending on checkpointing and window size.
- Apache Spark processes micro-batch streams with ~1–30 s typical end-to-end latency, governed by trigger interval and shuffle overhead; triggers below ~500 ms are possible but rarely stable in production.
These environments form integrated pipelines. Kafka transports events with durability but does not perform computation, while RocksDB provides local operator state when embedded by engines such as Flink, each component serving a distinct purpose within the streaming pipeline.
Real-time stream processing combines event ingestion, computation, and analytics into cohesive workflows. Tools like Golang, Flink, and Spark integrate to address these stages, adapting to diverse system demands. A data bus, such as Kafka, can facilitate their interaction, while solutions like RocksDB manage state persistence when required.
What Golang Represents in Stream Processing
Golang (Go) is a high-performance runtime for real-time stream processing, delivering event ingestion and lightweight transformations at ~1–5 ms (p50) and 5–20 ms (p95) latency on a single node. Its design prioritizes concurrency, low overhead, and direct resource control, making it ideal for lightweight, latency-sensitive streaming tasks. Go’s core strengths in streaming include:
- Fast Event Ingestion: Go’s net package handles TCP, UDP, or WebSocket streams with minimal latency, ingesting high-throughput data (e.g., 10–50k events/s in market feeds) and normalizing it via efficient parsing (e.g., Protobuf encoding).
- Concurrent Processing: Goroutines, lightweight threads managed by Go’s user-space scheduler, enable parallel event transformations, such as filtering or enrichment, with low CPU and memory overhead.
- Backpressure and Routing: Built-in channels manage event flow, supporting patterns like fan-out/fan-in and worker pools to handle bursts and prevent overload.
- In-Memory State Management: Go maintains lightweight, shard-local state (e.g., keyed caches) for real-time aggregations, with serialization for snapshot durability.
For instance, in a trading system, Go performs local processing (ingestion, sequence validation, timestamping) in under 1 ms, but end-to-end latency including publication to Kafka typically ranges from 1–5 ms (p50).. In IoT, it filters sensor data at the edge before forwarding to a distributed system. Go’s compiled binaries and minimal garbage collection ensure predictable performance, but its lack of native event-time or distributed state handling limits it to lightweight, non-fault-tolerant pipelines, where frameworks like Flink take over.
Architecture
Golang’s architecture supports low-latency stream processing through a runtime optimized for concurrency, I/O, and memory efficiency. Its components work together to handle high-throughput event streams with minimal overhead, tailored for lightweight, real-time pipelines.
User-Space Scheduler and M:N Model Go’s runtime includes a user-space scheduler that manages goroutines, lightweight threads with 2–8 KB stack sizes. The M:N model maps M goroutines to N OS threads, typically thousands to a few, avoiding kernel-level context switches. The scheduler uses a work-stealing algorithm, balancing tasks across threads in microseconds, enabling concurrent processing of event streams (e.g., handling multiple socket connections) with minimal CPU overhead.
Netpoller for I/O operations leverage a netpoller built on epoll (Linux) or kqueue (BSD/macOS), which polls file descriptors for readiness. This enables non-blocking reads from sockets with very low overhead, critical for high-throughput stream ingestion. The netpoller integrates with the scheduler, parking idle goroutines until data arrives, minimizing CPU usage for high-throughput stream ingestion.
Channel-Based Synchronization Channels provide typed FIFO coordination with built-in synchronization, acting as bounded queues (e.g., make(chan T, n) for buffered channels).They enable event handoff between goroutines, supporting streaming patterns like pipeline staging or load-balanced routing. Channels handle bursty streams by buffering events, ensuring ordered processing with sub-millisecond latency, without requiring mutexes or locks.
Memory-Managed Heap Go’s garbage-collected heap uses a concurrent mark-and-sweep collector with low-pause design (typically a few milliseconds) optimized for streaming workloads. Escape analysis reduces allocations by keeping temporary objects (e.g., event buffers) on the stack. Compiled binaries, free of JVM bytecode, ensure predictable execution, supporting in-memory state for real-time aggregations with consistent sub-millisecond performance.
These components make Go ideal for lightweight streaming tasks but lack distributed state or fault tolerance, deferring to systems like Flink for such requirements.
Execution and Dataflow
Golang’s execution model drives stream processing with concurrency primitives, enabling sub-millisecond latency for event sequences in lightweight, real-time pipelines. It assumes events form keyed sequences (e.g., user ID or stream ID), prioritizing ordered processing within keys and parallel processing across keys.
Event Ingestion: Goroutines handle event ingestion via non-blocking I/O from sockets (e.g., TCP or UDP), processing up to 50k events per second. Worker pools distribute high-throughput streams across a fixed set of goroutines (e.g., 10–100), preventing resource exhaustion while ensuring low-latency ingestion.
Transformation and Routing: Event sequences undergo transformation and routing through concurrency patterns:
- Fan-Out: A goroutine routes events to multiple channels, enabling parallel processing across tasks (e.g., validation or enrichment) for different keys.
- Fan-In: Multiple goroutines merge results into a single channel, ensuring ordered output or load-balanced routing across keys.
Buffered channels (e.g., make(chan Event, 1000)) absorb bursts, maintaining throughput with handoff latency under 500 µs under variable event rates.
Ordered Processing of Keyed Sequences: To preserve order within keyed sequences, Go routes events with the same key to a dedicated goroutine using a hash function (e.g., hash(key) % N). Each goroutine’s channel, a FIFO queue, ensures sequential processing within a key, with latency under 500 µs. Buffered channels scale to thousands of keys, handling bursts without blocking producers, though distributed event-time ordering requires systems like Flink.
State Management: In-memory keyed state uses maps (map[key]value) or sync.Map for concurrent access, storing aggregates or session data indexed by keys. Shard-local caches, tied to goroutines, minimize contention and enable sub-millisecond lookups.
State Durability: In-memory state is lost if a goroutine crashes or the process restarts unless explicitly saved. State is serialized (e.g., to JSON or Protobuf) and written to Kafka or Redis. Kafka stores snapshots with offsets for replay as a durable event log, while Redis offers fast, in-memory persistence. Snapshots occur every 50–500 ms, balancing latency, throughput, and durability.
Kafka Integration: Kafka serves as a transport layer, not a processing engine. Goroutines publish serialized events to Kafka topics using async producers (e.g., sarama) for <1 ms latency, tracking offsets for durability and replay. Go handles computation locally, avoiding Kafka’s processing overhead, but relies on transactional or idempotent sinks (e.g., Kafka EOS) to achieve exactly-once semantics.
Pipeline Scope: This execution model excels in low-latency, single-node streaming but lacks distributed coordination. Systems like Flink are needed for fault-tolerant, multi-node pipelines.
// Compact Go example: ingestion → fan-out → keyed ordering → local state → periodic snapshots.
package main
import (
"context"
"crypto/sha1"
"encoding/json"
"fmt"
"os/signal"
"sync"
"syscall"
"time"
)
// Event is a single stream record belonging to a keyed sequence (e.g., user/session).
type Event struct {
Key string // routing/ordering key
TS int64 // ingestion timestamp (ms)
}
// DurableSink abstracts durability (Kafka/Redis). Replace NopSink with a production implementation if required.
type DurableSink interface {
// Publish is for raw/processed event transport (e.g., Kafka topic).
Publish(ctx context.Context, topic string, value []byte) error
// StoreSnapshot persists shard-local state snapshots (e.g., compacted topic or Redis SET).
StoreSnapshot(ctx context.Context, key string, value []byte) error
}
// NopSink is a stub implementation used for example purposes.
type NopSink struct{}
func (NopSink) Publish(context.Context, string, []byte) error { return nil }
func (NopSink) StoreSnapshot(context.Context, string, []byte) error { return nil }
//
// Worker (shard-local processing) ---------------------------------------------
// Worker owns a shard (subset of keys), guarantees per-key ordering via FIFO channel,
// maintains shard-local aggregates, and periodically flushes snapshots to DurableSink.
type Worker struct {
in chan Event // FIFO input for this shard; preserves per-key order
state map[string]int64 // shard-local aggregates (e.g., per-key counters)
sink DurableSink // durability target (Kafka/Redis)
id int // shard/worker identifier
}
// NewWorker constructs a worker with buffered input.
func NewWorker(id int, sink DurableSink) *Worker {
return &Worker{
in: make(chan Event, 1024),
state: make(map[string]int64),
sink: sink,
id: id,
}
}
// Run starts the worker loop: validates+updates state and flushes periodic snapshots.
// Stops on channel close or context cancellation.
func (w *Worker) Run(ctx context.Context, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(50 * time.Millisecond) // 10–100 ms window aligns with theory
defer ticker.Stop()
for {
select {
case e, ok := <-w.in:
if !ok {
w.flushSnapshot()
return
}
// Lightweight validation + state update (no external calls).
if e.Key != "" {
w.state[e.Key]++
}
case <-ticker.C:
w.flushSnapshot()
case <-ctx.Done():
w.flushSnapshot()
return
}
}
}()
}
// flushSnapshot serializes shard-local state and writes it to the durable sink.
func (w *Worker) flushSnapshot() {
if len(w.state) == 0 {
return
}
b, _ := json.Marshal(w.state) // compact snapshot per shard window
_ = w.sink.StoreSnapshot(context.Background(),
fmt.Sprintf("shard:%d", w.id), b)
}
//
// Dispatcher (fan-out by key) -------------------------------------------------
// Dispatcher routes events to workers by hash(key) % N, preserving per-key order inside each worker.
type Dispatcher struct {
workers []*Worker
}
// NewDispatcher builds N workers with a shared durable sink.
func NewDispatcher(n int, sink DurableSink) *Dispatcher {
ws := make([]*Worker, n)
for i := range ws {
ws[i] = NewWorker(i, sink)
}
return &Dispatcher{workers: ws}
}
// Start launches all workers.
func (d *Dispatcher) Start(ctx context.Context, wg *sync.WaitGroup) {
for _, w := range d.workers {
w.Run(ctx, wg)
}
}
// Route sends the event to its shard; worker FIFO preserves order within the key.
func (d *Dispatcher) Route(e Event) {
idx := shardIndex(e.Key, len(d.workers))
d.workers[idx].in <- e
}
// Stop closes all worker input channels to trigger graceful termination.
func (d *Dispatcher) Stop() {
for _, w := range d.workers {
close(w.in)
}
}
// shardIndex computes a stable shard id from the key. Swap for consistent hashing if needed.
func shardIndex(key string, n int) int {
h := sha1.Sum([]byte(key))
return int((uint32(h[0])<<24 | uint32(h[1])<<16 | uint32(h[2])<<8 | uint32(h[3])) % uint32(n))
}
//
// Ingestion (simulated source) ------------------------------------------------
// startIngestion emits ~50k events/s into a buffered channel to emulate non-blocking socket ingestion.
// Close(ingest) signals end-of-stream.
func startIngestion(ctx context.Context, ingest chan<- Event) {
go func() {
t := time.NewTicker(20 * time.Microsecond) // ~50k ev/s
defer t.Stop()
i := 0
for {
select {
case <-ctx.Done():
close(ingest)
return
case <-t.C:
key := fmt.Sprintf("user-%d", i%1000) // 1k distinct keys (burst-friendly)
ingest <- Event{Key: key, TS: time.Now().UnixMilli()}
i++
}
}
}()
}
//
// Main ------------------------------------------------------------------------
// main wires ingestion → dispatcher → workers and performs graceful shutdown.
// This is an example implementation; in production, plug in Kafka/Redis instead of NopSink.
func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// Durable sink: example only; in production map to Kafka compacted topic or Redis.
sink := NopSink{}
// Fixed number of shards/workers. For stability when resizing, consider consistent hashing.
const workers = 8
dispatch := NewDispatcher(workers, sink)
var wg sync.WaitGroup
dispatch.Start(ctx, &wg)
// Buffered ingestion channel absorbs bursts and keeps handoff latency low.
ingest := make(chan Event, 4096)
startIngestion(ctx, ingest)
// Fan-out: route events by key; close workers when source ends.
go func() {
for e := range ingest {
dispatch.Route(e)
}
dispatch.Stop()
}()
// Block until termination signal, then wait for workers to flush and exit.
<-ctx.Done()
wg.Wait()
}
Limitations and Failure Modes
Golang’s lightweight streaming model faces constraints that limit its use in complex or distributed scenarios, requiring careful handling of specific failure modes.
Processing and Concurrency Limits: Go lacks native event-time processing or watermarking, essential for managing out-of-order events. Developers must build custom timestamp logic, increasing complexity and error risk. Goroutine leaks arise from unclosed channels or unterminated goroutines, exhausting memory under sustained loads (e.g., 50k events/s). Garbage collection introduces latency spikes, typically under 1 ms but higher in memory-intensive streams, disrupting sub-millisecond processing. File descriptor exhaustion risks I/O failures when high-throughput socket ingestion exceeds system limits without caps.
Recovery Challenges: Process restarts lose in-memory state, requiring external systems for recovery, with custom offset tracking prone to errors or duplicates.
When to Transition:
Go suits low-latency, single-node tasks but struggles with distributed needs:
- Flink: Handles event-time watermarking and fault-tolerant checkpoints.
- Spark: Supports complex analytics and batch integration for large-scale data.
These limitations push complex, distributed streaming to Flink or Spark, while Go excels in lightweight pipelines.
Flink as a Streaming Engine
Apache Flink is a distributed dataflow engine optimized for continuous processing of infinite event streams, delivering stateful results with ~20–150 ms (p50) and 50–400 ms (p95) in scalable pipelines. Unlike batch systems that process finite datasets, Flink handles unbounded streams, maintaining consistent state for aggregations, joins, or pattern detection across distributed nodes.
Core Design
Flink’s streaming engine executes programs as directed acyclic graphs (DAGs) of long-lived operators, designed for continuous event processing. Each operator, a user-defined function (e.g., map, filter, window), processes events in a streaming pipeline, supporting stateful (e.g., aggregations) or stateless operations. Key design principles include:
- Operator Execution: Operators run continuously, consuming events from input streams and emitting results, with parallel instances handling partitioned data for scalability (e.g., millions of events/s).
- Dataflow Mechanics: Events flow through operators via serialized streams, using in-memory buffers for low-latency transfers (sub-10 ms) and TCP for cross-node communication.
- Parallelism: Configurable parallelism (e.g., parallelism.default) splits operators into subtasks, enabling high-throughput processing across distributed resources.
- State Integration: Stateful operators manage keyed or non-keyed state (e.g., window aggregates), with (sub-)millisecond to few-millisecond access latency, backed by pluggable storage like RocksDB.
These principles enable Flink to process unbounded streams with high throughput and low latency, distinct from its architectural components that manage distributed execution.
Streaming Capabilities
Flink supports event-time semantics, using watermarks to manage out-of-order events, ensuring accurate time-based operations like windowed aggregations. It provides exactly-once processing guarantees through coordinated checkpoints that persist both state and offsets to durable storage. End-to-end exactly-once semantics with Kafka are achieved only when checkpoints are enabled and a two-phase commit sink (such as Kafka with transactions) is used; without checkpoints, the semantics fall back to at-least-once. Built-in state management handles large-scale keyed state, scaling to millions of keys with low-latency access (sub-100 ms). Flink’s ability to process streams at scale, with fault tolerance and precise time handling, makes it ideal for distributed, stateful streaming workloads.
Architecture
Apache Flink’s architecture enables distributed, fault-tolerant stream processing at scale, coordinating computation across clusters to handle unbounded event streams with 20–200 ms end-to-end latency. It decouples control plane responsibilities from data plane execution, optimizing for throughput (millions of events/s) while minimizing synchronization overhead through asynchronous state persistence and dynamic resource provisioning.
Cluster Components
Flink’s core revolves around a master-worker model, with components interacting via RPC for control signals and TCP for data exchanges. This design trades off centralized coordination (via JobManager) for decentralized execution (via TaskManagers), enabling linear scalability but requiring careful HA configurations to avoid single points of failure.
Press enter or click to view image in full size
- JobManager: Acts as the master orchestrator, comprising three tightly coupled subcomponents — ResourceManager, Dispatcher, and JobMaster — that handle distinct phases of job lifecycle.
- The ResourceManager: provisions task slots dynamically (e.g., via YARN or Kubernetes APIs), monitoring availability and reallocating on failures; in standalone mode, it statically distributes pre-existing slots without provisioning new nodes.
- The Dispatcher: exposes a REST endpoint for job submissions (flink run), spawning a JobMaster per JobGraph and hosting the WebUI for metrics like checkpoint duration (exposed via /jobs/:jobid/checkpoints).
- The JobMaster: compiles the JobGraph into an execution DAG, schedules subtasks based on data locality (prioritizing co-location to reduce shuffle costs), and issues heartbeats every 200 ms to detect failures.
- Interactions: JobMaster communicates with TaskManagers via RPC for task deployment and status updates. In HA mode, Flink uses ZooKeeper, Kubernetes, or built-in leader election (depending on configuration), with standby JobManagers recovering state in under 10 seconds via leader logs.
Trade-off: Centralized scheduling simplifies global optimization but introduces contention under high submission rates — best practice: Use application-mode clusters to isolate per-job ResourceManagers.
- TaskManagers: Worker JVM processes that host subtasks, buffering incoming streams (e.g., 1–4 GB managed memory per node) and facilitating inter-task exchanges via Netty-based TCP channels. Each TaskManager registers with the ResourceManager on startup, offering its slots and reporting metrics like input/output rates. Subtasks run in dedicated threads, with the runtime multiplexing network I/O to handle bursts without blocking.
- Interactions: TaskManagers pull task artifacts from the JobManager, exchange data with peers during shuffles (e.g., hash-partitioned routing), and acknowledge checkpoints upstream.
Engineering note: In session clusters, multiple jobs share TaskManagers, risking network contention during concurrent submissions; application clusters dedicate resources per job for isolation.
Best practice: Tune taskmanager.memory.managed.size to 70–80% of heap for off-heap state, avoiding GC pauses >50 ms in high-throughput scenarios.
- Task Slots: The granular unit of resource isolation, each slot allocates a fixed fraction of TaskManager resources (e.g., 1/3 memory for 3 slots), enforcing managed memory quotas but not CPU isolation (relying on OS scheduling). Configured via taskmanager.numberOfTaskSlots (default: 1), slots determine concurrent subtask capacity — fewer slots enhance isolation (e.g., one per container in Kubernetes) at the cost of utilization.
- Interactions: ResourceManager assigns slots to JobMasters based on requests; slots host chained operators, sharing JVM structures like connection pools to amortize TCP setup costs. Performance implication: Slot sharing within jobs (default) matches total slots to max parallelism, preventing over-provisioning — e.g., a pipeline with varying parallelism (2 for sources, 6 for windows) utilizes resources efficiently by co-locating light subtasks.
Trade-off: Sharing boosts throughput by 20–30% via multiplexing but risks noisy-neighbor interference; best practice: Set slots equal to CPU cores for balanced loads, monitoring via numSlotsAvailable metric.
- Operator Chains: Flink fuses compatible sequential operators (e.g., map → filter) into single-threaded tasks, eliminating inter-thread serialization and buffering for sub-10 ms handoffs. Chaining is automatic for one-to-one streams but can be configured more precisely via methods such as disableChaining() or startNewChain() on specific operators, or by using slot-sharing groups for isolation.
- Interactions: JobMaster decides chains during DAG optimization, grouping based on locality; chained tasks occupy one slot, reducing thread overhead.
Advanced feature: Slot sharing groups or resource profiles allow explicit control of task isolation (for example, isolating CPU-intensive operators).
Implication: Operator chaining often improves throughput in linear pipelines but can complicate debugging; it is generally recommended to disable chaining for heavy stateful operators to isolate contention and improve observability.
Event Routing and Key-Groups
Flink routes events via key-groups, the atomic unit for state partitioning and redistribution, ensuring locality between streams and keyed state to avoid cross-node transactions. Defined by an operator’s maxParallelism (default 128; configurable per operator or job), key-groups hash keys (hash(key) % maxParallelism) into fixed buckets, with each parallel subtask owning one or more groups during execution and redistributing them transparently during rescaling.
Routing: Upstream operators emit to downstream via hash-partitioned shuffles, directing same-key events to the same subtask for ordered updates. On rescaling (e.g., env.setParallelism(16)), Flink redistributes key-groups transparently via state migration, serializing partial groups to checkpoints.
Performance: Adds <5 ms alignment latency per checkpoint but enables seamless scaling to millions of keys with sub-100 ms access; Key-groups are defined by maxParallelism, which defaults to 128 and can be configured up to 32 768. The default is not a hard cap; increasing maxParallelism for large key spaces provides better load balance and future rescaling headroom, while RocksDB compaction helps mitigate I/O spikes during redistribution.
Checkpointing Mechanism
Checkpoint barriers — lightweight markers (a few bytes) — are injected by the sources when instructed by the checkpoint coordinator on the JobManager, typically every 10–100 s (execution.checkpointing.interval), and then flow downstream embedded in the stream to trigger distributed snapshots without pausing processing.
Flow: Barriers propagate FIFO through operators, aligning parallels at boundaries (buffering records if needed); on arrival, operators invoke snapshotState() for sync-phase capture (e.g., serializing keyed maps), forwarding the barrier while async backends persist to storage (HDFS/S3).
State backends such as RocksDB handle incremental diffs when state.backend.incremental: true is enabled, coordinating with sources like Kafka through two-phase commits to maintain exactly-once guarantees. Completion: Sinks acknowledge when barriers arrive, notifying JobMaster; failures in async phase tolerate up to tolerable-failed-checkpoints: 3.
Advanced: Unaligned checkpoints (enableUnalignedCheckpoints()) allow barriers to overtake buffers under backpressure, capturing in-flight data to keep checkpoint duration stable under heavy load, at the cost of slightly larger snapshots, which makes them suitable for pipelines that experience prolonged backpressure. Configs: setMinPauseBetweenCheckpoints(500 ms) prevents overlap; maxConcurrentCheckpoints(1) limits concurrency.
Implication: Aligned mode trades <10 ms latency spikes for minimal state; unaligned reduces tail latency by 50–80% in asymmetric pipelines.
Best practice: Use incremental checkpoints with file-merging optimizations for large states, and monitor alignment time via the UI to tune intervals for less than 1% throughput loss; avoid referring to non-existent configuration keys such as execution.checkpointing.file-merging.enabled.
This architecture’s elegance lies in its barrier protocol (inspired by Chandy-Lamport), balancing consistency with asynchrony for sub-second recovery times, outperforming rigid batch models in continuous workloads.
Execution and Time Semantics
Apache Flink’s execution model drives continuous stream processing with precise temporal alignment, handling unbounded, out-of-order events in distributed pipelines. It leverages event-time, watermarks, timers, and state partitioning to ensure deterministic, scalable computations.
Event-time processing anchors computations to event timestamps, not processing time, enabling consistent windowed operations despite network delays. The assignTimestampsAndWatermarks API extracts timestamps from events (via TimestampAssigner) and assigns watermarks to track time progress, ensuring accurate aggregations (e.g., 5-second windows).
Watermarks are markers that track event-time progress, signaling when all events up to a timestamp have arrived. Generated by sources or assigners like BoundedOutOfOrdernessWatermarks (e.g., with a 2-second delay), they trigger operations like window closures. Watermarks flow FIFO through the DAG, with parallel subtasks aligning on the earliest watermark from input channels to ensure consistent computation, tuned via maxOutOfOrderness for delay tolerance.
Timers, managed via KeyedProcessFunction, schedule time-driven callbacks (e.g., window triggers) stored in per-key priority queues. Event-time timers fire when watermarks surpass their timestamp, persisted during checkpoints for fault tolerance, enabling dynamic operations like session gap detection.
State is managed as:
- Keyed State: Partitioned by key-groups (hash(key) % maxParallelism), supporting ValueState, ListState, or MapState for per-key aggregates, stored in memory or RocksDB.
- Operator State: Evenly distributed across subtasks for non-keyed data (e.g., broadcast variables), accessed via CheckpointedFunction. This model ensures fault-tolerant stream processing, scaling to millions of keys with precise temporal control.
Example: Detecting Anomalous Event Sequences (CEP Pattern in Flink)
CEP (Complex Event Processing) is an approach where a system analyzes not individual events, but sequences of events over time.
When events are observed in isolation, their context is often unclear, but by combining them into a temporal pattern
(for example: “A happened, then B, then C within five seconds”), it becomes possible to detect correlations, anomalies, or complex behavioral dependencies.
Apache Flink includes a built-in CEP module that allows such patterns to be described declaratively and processed in real time, even when events arrive late or out of order.
Detection Logic
events are analyzed by user key and event time;
the system identifies a sequence of multiple “order creation” actions followed by the cancellation of most of those orders within a short interval;
when the pattern is detected, Flink emits an event indicating anomalous behavior.
CEP (Complex Event Processing) in Flink lets you declare temporal patterns over event streams.
Key ideas:
- Works per key (e.g., per accountId) and in event time (watermarks handle out-of-order events).
- Under the hood, CEP builds an NFA (non-deterministic finite automaton) that keeps partial matches in state and advances them as new events arrive.
- Each active partial match is called a branch — a lightweight copy of the automaton that represents one possible continuation of the pattern.
- Several branches may exist simultaneously if multiple events can start or extend the pattern.
- Timeouts (within) automatically remove expired branches.
- When all thresholds are satisfied within their time constraints, CEP emits a match.
Quantitative thresholds
CEP transitions between states based on explicit numeric conditions that you define:
.times(N) → exactly N events
.timesOrMore(N) → at least N events (branch keeps extending)
.oneOrMore → same as .timesOrMore(1)
.optional → zero or one event allowed
Each threshold defines a transition condition in the automaton. Together, they form a chain of user-defined thresholds — a declarative sequence of “if this condition is met, move to the next state”.
//Example 1 — 10 CREATED + minimum 5 CANCELLED, close by time limit
Pattern
.begin[OrderEvent]("creates")
.where(_.eventType == "CREATED")
.times(10) // exactly 10 CREATED to start the cancel stage
.within(Time.seconds(2)) // 10 must arrive within ≤ 2 s
.next("cancels")
.where(_.eventType == "CANCELLED")
.timesOrMore(5) // at least 5 CANCELLED
.greedy // keep collecting until the time limit closes the stage
.within(Time.seconds(1)) // cancel stage lasts ≤ 1 s after the burst
//Example 2 — 10 CREATED + exactly 8 CANCELLED (80%), emit on completion
Pattern
.begin[OrderEvent]("creates")
.where(_.eventType == "CREATED")
.times(10) // exactly 10 CREATED
.within(Time.seconds(2))
.next("cancels")
.where(_.eventType == "CANCELLED")
.times(8) // exactly 8 CANCELLED
.within(Time.seconds(1)) // must arrive within ≤ 1 s
Task Overview
The goal is to detect atypical sequences of actions in a stream of trading events.
Anomalous behavior in this context refers to situations where a market participant rapidly places a series of orders at different price levels and then quickly cancels most of them (over 80–90%).
Such activity can temporarily create the illusion of increased demand or liquidity without leading to actual trades, and may indicate system malfunctions or incorrect algorithmic behavior.
// AnomalousSequenceCepKafka.scala
// Scala example: Kafka -> CEP -> Kafka
// Pattern: burst of CREATED at multiple price levels -> quick mass CANCELLED (>=80%)
// Notes:
// - Uses event-time with bounded out-of-orderness watermarks
// - CEP pattern uses followedBy + skipPastLastEvent to tolerate noise and avoid duplicate matches
// - Keyed by (accountId, symbol) for business-accurate grouping
// - Kafka sink configured with exactly-once delivery guarantee (when checkpoints are enabled)
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
import org.apache.flink.connector.base.DeliveryGuarantee
import java.time.Duration
// Flink's shaded Jackson (JSON parser / serializer without extra deps)
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.module.scala.DefaultScalaModule
// --- Domain model -----------------------------------------------------------
final case class OrderEvent(
accountId: String,
symbol: String,
eventType: String, // "CREATED" | "CANCELLED"
price: Double,
qty: Int,
eventTimeMs: Long
)
final case class AnomalyAlert(
account_id: String,
symbol: String,
window_start_ms: Long,
window_end_ms: Long,
created: Int,
canceled: Int,
cancel_ratio: Double,
unique_price_levels: Int,
created_price_levels: Seq[Long],
note: String
)
object AnomalousSequenceCepKafka {
// --- Tunable config (via -D... system properties) ------------------------
val MinCreated = sys.props.getOrElse("min.created", "10").toInt // >= 10 CREATED
val MinCancels = sys.props.getOrElse("min.cancels", "8").toInt // >= 8 CANCELLED (lower bound)
val CancelRatioThreshold = sys.props.getOrElse("cancel.ratio", "0.80").toDouble // post-filter: >= 80%
val UniquePricesMin = sys.props.getOrElse("min.price.levels", "3").toInt // >= 3 distinct price levels (on CREATED)
val T1CreatesSeconds = sys.props.getOrElse("t1.creates.sec", "2").toInt // burst window for creates
val T2CancelsSeconds = sys.props.getOrElse("t2.cancels.sec", "1").toInt // cancel window after burst
val OutOfOrderSeconds = sys.props.getOrElse("watermark.lateness.sec", "3").toInt
val TickSize = sys.props.getOrElse("tick.size", "0.01").toDouble // price normalization
// Normalize price to a tick level to avoid FP artifacts
def normPriceToLevel(p: Double): Long = Math.round(p / TickSize)
// Jackson mapper (register Scala module for case classes)
private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
// Parse a JSON line into OrderEvent; return None for malformed inputs
private def parseEvent(json: String): Option[OrderEvent] =
try {
val n: JsonNode = mapper.readTree(json)
Some(OrderEvent(
accountId = n.get("accountId").asText(),
symbol = n.get("symbol").asText(),
eventType = n.get("eventType").asText(),
price = n.get("price").asDouble(),
qty = n.get("qty").asInt(),
eventTimeMs = n.get("eventTimeMs").asLong()
))
} catch { case _: Throwable => None }
def main(args: Array[String]): Unit = {
// --- Runtime params (Kafka, topics, group) -----------------------------
val BOOTSTRAP = sys.props.getOrElse("brokers", "localhost:9092")
val IN_TOPIC = sys.props.getOrElse("in.topic", "orders-events")
val OUT_TOPIC = sys.props.getOrElse("out.topic", "orders-anomalies")
val GROUP_ID = sys.props.getOrElse("group.id", "cep-demo")
val env = StreamExecutionEnvironment.getExecutionEnvironment
// NOTE: In production, align parallelism with Kafka partitions and deployment sizing.
env.setParallelism(1)
// (Enable checkpoints for exactly-once sinks)
// import org.apache.flink.streaming.api.CheckpointingMode
// env.enableCheckpointing(10_000L, CheckpointingMode.EXACTLY_ONCE)
// --- Kafka source: JSON lines with eventTimeMs -------------------------
val source = KafkaSource.builder[String]()
.setBootstrapServers(BOOTSTRAP)
.setTopics(IN_TOPIC)
.setGroupId(GROUP_ID)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
val raw: DataStream[String] =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-orders")
// --- Parse JSON -> OrderEvent; drop malformed --------------------------
val events: DataStream[OrderEvent] =
raw.flatMap(parseEvent).name("parse-json")
// --- Event-time watermarks ---------------------------------------------
val wms = WatermarkStrategy
.forBoundedOutOfOrderness[OrderEvent](Duration.ofSeconds(OutOfOrderSeconds))
.withTimestampAssigner(new SerializableTimestampAssigner[OrderEvent] {
override def extractTimestamp(e: OrderEvent, recordTs: Long): Long = e.eventTimeMs
})
// --- Business key: (accountId, symbol) ---------------------------------
val keyed: KeyedStream[OrderEvent, (String, String)] = events
.assignTimestampsAndWatermarks(wms)
.keyBy(e => (e.accountId, e.symbol))
// --- CEP pattern -------------------------------------------------------
// Stage 1 "creates": >= MinCreated CREATED within T1 (burst)
// Stage 2 "cancels": >= MinCancels CANCELLED within T2 (after creates)
// Use followedBy + skipPastLastEvent to allow noise and reduce duplicates
val skip = AfterMatchSkipStrategy.skipPastLastEvent()
val pattern: Pattern[OrderEvent, OrderEvent] =
Pattern
.begin[OrderEvent]("creates", skip).where(_.eventType == "CREATED")
.timesOrMore(MinCreated)
.within(Time.seconds(T1CreatesSeconds))
.followedBy("cancels").where(_.eventType == "CANCELLED")
.timesOrMore(MinCancels)
.greedy()
.within(Time.seconds(T2CancelsSeconds))
val matches = CEP.pattern(keyed, pattern)
// --- Post-filter & alert serialization ---------------------------------
val alerts: DataStream[String] = matches.select { m =>
val creates = m.getOrElse("creates", List.empty).toList
val cancels = m.getOrElse("cancels", List.empty).toList
if (creates.isEmpty) {
null // no alert
} else {
val created = creates.size
val canceled = cancels.size
// Ratio capped by created count to avoid >1.0 when more cancels arrive than creates
val ratio = if (created == 0) 0.0 else math.min(canceled, created).toDouble / created.toDouble
val createdLevels = creates.map(ev => normPriceToLevel(ev.price))
val uniquePriceLevels = createdLevels.distinct.size
val pass =
created >= MinCreated &&
canceled >= MinCancels &&
ratio >= CancelRatioThreshold &&
uniquePriceLevels >= UniquePricesMin
if (pass) {
val acc = creates.head.accountId
val sym = creates.head.symbol
val start = creates.map(_.eventTimeMs).min
val end = (creates ++ cancels).map(_.eventTimeMs).max
val alert = AnomalyAlert(
account_id = acc,
symbol = sym,
window_start_ms = start,
window_end_ms = end,
created = created,
canceled = canceled,
cancel_ratio = BigDecimal(ratio).setScale(3, BigDecimal.RoundingMode.HALF_UP).toDouble,
unique_price_levels = uniquePriceLevels,
created_price_levels = createdLevels.distinct.sorted,
note = "burst creates at multiple prices -> mass cancel"
)
mapper.writeValueAsString(alert)
} else {
null
}
}
}.filter(_ != null).name("anomaly-select")
// --- Kafka sink for anomalies (exactly-once) ---------------------------
val sink = KafkaSink.builder[String]()
.setBootstrapServers(BOOTSTRAP)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder[String]()
.setTopic(OUT_TOPIC)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// .setTransactionalIdPrefix("cep-anoms-") // uncomment & set when using multiple parallel sinks; needs checkpointing for EOS
.build()
alerts.sinkTo(sink).name("anomalies-to-kafka")
env.execute("CEP Anomalous Sequence Detection (Kafka, followedBy + ratio)")
}
}
RocksDB in Context of Flink
In Apache Flink, RocksDB serves as a key-value store–based state backend — integrated via JNI (Java Native Interface) — that enables large-scale, fault-tolerant stream processing. It is not a distributed database; Flink integrates RocksDB via JNI (Java Native Interface) inside each TaskManager, with state kept on the local filesystem and durability provided by Flink’s checkpointing. This keeps state off the JVM heap, avoids GC pressure, and scales to very large keyed state with a controlled latency trade-off.
Internal Architecture: LSM Tree
RocksDB is built on a Log-Structured Merge Tree (LSM Tree), turning random writes into sequential I/O. Each update passes through:
- WAL (Write-Ahead Log): durability before apply.
- MemTable: in-memory buffer (e.g., skip-list).
- SSTables: immutable sorted files flushed from MemTables and merged by compaction to bound read amplification. This structure allows Flink to persist keyed state efficiently with ordered inserts and predictable disk behavior.
State Representation in Flink
Each Flink key–namespace–value tuple is serialized into bytes and stored as a key–value pair in RocksDB’s internal keyspace. Compact binary layouts shrink disk I/O and checkpoint size; prefix encoding improves iteration locality and range scans; separate column families per logical state isolate workloads and increase checkpoint concurrency. RocksDB instances are local per subtask; scalability comes from key-group partitioning, not from RocksDB distribution.
Performance, Checkpointing, and Recovery
Compaction, caching, and checkpointing collectively define latency and recovery behavior:
- Compaction balance: overly frequent compaction inflates latency; insufficient compaction expands SST hierarchies and read cost.
- Tuning knobs: write_buffer_size, max_background_jobs, and target_file_size_base shape write/read balance.
- Block cache: keeps frequently accessed data blocks off disk, lowering read latency.
- Bloom filters: prevent unnecessary disk seeks for non-existent keys.
- Incremental checkpoints: upload only changed SST files, minimizing I/O and improving restore speed.
- Recovery logic: during restore, Flink reconstructs RocksDB from checkpoint or savepoint files, which include SST and metadata files, while the internal RocksDB WAL only ensures local consistency and is not replayed as part of Flink’s recovery process.
Local RocksDB directories can be reused only if operator IDs and key-group assignments match the current job graph and parallelism. Otherwise, Flink discards local directories and restores state from external checkpoints or savepoints stored in durable storage such as S3 or HDFS.
When to Use RocksDB
RocksDB is the right choice when:
- State outgrows available memory per TaskManager.
- Durability and exact recovery outweigh microsecond-level access.
- Jobs must maintain state continuity across restarts or rescaling.
For lightweight, transient workloads, in-memory backends offer faster response and simpler operation. Once state exceeds tens of gigabytes with strict recovery guarantees, RocksDB is the only backend that sustains throughput without compromising reliability.
Spark as a Distributed In-Memory Engine
Apache Spark is a distributed computation framework optimized for large-scale analytical and streaming workloads executed primarily in memory. Intermediate data is cached across executors, avoiding repeated disk I/O and enabling iterative computations such as joins, aggregations, or machine learning to complete with high throughput.
Performance depends on how effectively the system handles memory, shuffles, and partitioning — the 5S factors that define Spark’s runtime behavior:
- Spill — data exceeding executor memory is written to disk, increasing latency by orders of magnitude.
- Skew — uneven partition sizes cause executor imbalance and stage slowdowns.
- Shuffle — network redistribution of data introduces serialization and transfer overhead.
- Storage (caching and persistence) — inefficient caching or checkpointing inflates heap usage and GC activity, while Unlike Flink’s use of RocksDB, Spark relies on local disk and write-ahead logs for its State Store, making stateful operations 1–2 orders of magnitude slower at scale.
- Serialization — encoding and decoding structures affect CPU utilization and memory layout.
Careful tuning of partition sizes, shuffle parameters, and memory fractions determines whether Spark maintains its in-memory advantage or degrades into disk-bound execution.
Execution Model: Micro-Batch vs Continuous Streaming
Spark — Micro-Batch, Trigger-Driven
Spark processes data in discrete micro-batches rather than as a continuous stream.
- Time basis: wall-clock triggers; data accumulates until the trigger fires.
- Execution cadence: each trigger starts a new DAG, processes accumulated data, and commits results atomically.
- State & recovery: streaming state is maintained in the State Store across micro-batches, and fault tolerance is achieved through checkpointed progress combined with deterministic recomputation of lost partitions.
- Latency envelope: the minimum practically stable trigger interval is ~100 ms; smaller values are possible but not guaranteed to remain stable and are rarely used in production. Effective latency typically ranges from 100 ms — 5 s, limited by trigger interval, scheduling, and shuffle overhead.
- Consistency model: exactly-once via transactional or idempotent sinks synchronized with checkpoints; event-time is supported through watermarks and windowing, but execution remains micro-batch–driven rather than continuously event-time–driven like Flink.
Flink — Continuous, Event-Time Driven
Flink runs a continuously active pipeline where operators process events as soon as they arrive.
- Time basis: event time is derived from timestamps embedded in events; the system tracks progress using watermarks that indicate when all earlier events have been received.
- Execution cadence: operators are long-lived tasks; processing is continuous without restarts between windows.
- State & recovery: keyed and operator state are maintained persistently and saved through asynchronous checkpoints coordinated across the job.
- Latency envelope: typically 10–200 ms end-to-end, including checkpointing overhead.
- Consistency model: exactly-once ensured through barrier-aligned checkpoints and event-time ordering.
Data Abstractions and Lazy Evaluation
Spark exposes three principal data abstractions that represent an evolution of usability and optimization:
RDD (Resilient Distributed Dataset): a low-level API offering fine-grained control over partitioning and persistence. Rarely used directly except for custom transformations or legacy code.
Dataset: a typed interface built atop RDDs, providing compile-time safety in Scala but limited adoption elsewhere.
DataFrame: the standard abstraction in production. It models data as a distributed table with a known schema and leverages the Catalyst Optimizer for query-plan generation, column pruning, and operator fusion.
All transformations in Spark are lazy. Operations such as map, filter, or join build a logical plan but are not executed until an action (count, collect, write) triggers computation.
This enables Catalyst to analyze dependencies, collapse compatible stages, and minimize shuffle boundaries.
Transformations are categorized as:
- Narrow — depend only on local partitions (e.g., map, filter); executed without network shuffle.
- Wide — require data redistribution across executors (e.g., groupBy, join); define physical stage boundaries. This combination of lazy evaluation, in-memory caching, and deterministic lineage defines Spark’s computational identity: a scalable, fault-tolerant engine optimized for analytical workloads and near-real-time streaming, but fundamentally distinct from Flink’s continuous, event-time model.
Execution Architecture and Core Runtime Components
Apache Spark operates as a distributed computation engine built on a layered runtime architecture that separates control, execution, and storage responsibilities. Its design targets large-scale, fault-tolerant, and memory-optimized workloads by coordinating driver logic, worker processes, and cluster resource management through a unified execution model.
Core Components
Driver
The driver process orchestrates execution. It runs the main application logic, builds the logical plan for every action, and submits physical execution plans to the cluster. Internally, the driver hosts the SparkContext and the DAG Scheduler, which convert high-level transformations into stages of tasks separated by shuffle boundaries. It also maintains lineage metadata for fault recovery — recomputing lost partitions instead of relying solely on replication.
Executors
Executors are long-lived JVM processes deployed across cluster nodes. Each executor runs multiple task threads that process data partitions in parallel, keeping intermediate results in memory whenever possible. Executors maintain local caches, spill to disk when memory pressure occurs, and periodically send heartbeat and metric updates to the driver. When an executor fails, Spark reassigns its partitions using lineage reconstruction, restoring deterministic state without global checkpoints.
Cluster Manager
Spark abstracts resource allocation through pluggable cluster managers:
- Standalone — lightweight built-in scheduler for small to medium clusters.
- YARN — integrates with Hadoop environments for multi-tenant scheduling.
- Kubernetes — provisions executors as pods, enabling containerized elasticity and isolation.
The cluster manager negotiates CPU cores and memory per executor, controls container lifecycle, and ensures resource fairness across concurrent jobs.
SparkSession and Contexts
The SparkSession unifies APIs for SQL, streaming, and DataFrame operations, replacing older entry points (SQLContext, HiveContext, SparkContext). It bridges user code to the driver, manages catalog metadata, and handles logical plan generation through the Catalyst optimizer. Every Spark application starts by instantiating a session, defining configuration parameters such as master URL, shuffle partitions, and serialization mode.
Execution Flow
- Job Construction — Transformations form a logical DAG of operations.
- Planning and Optimization — Catalyst analyzes and rewrites the DAG into a physical plan, minimizing shuffle and stage boundaries.
- Task Scheduling — The driver divides stages into tasks mapped to partitions and submits them to executors via the cluster manager.
- Execution and Caching — Executors compute results, cache intermediate RDDs or DataFrames, and stream metrics back to the driver.
- Fault Recovery — If an executor or node fails, Spark recomputes only the lost partitions based on lineage, maintaining deterministic results without explicit replication.
Data and Communication Path
Data flows between executors through shuffle operations managed by the BlockManager. Each block of data — serialized and optionally compressed — is stored in memory or on disk and fetched over Netty-based shuffle services. The design minimizes unnecessary serialization by co-locating dependent tasks and leveraging broadcast variables for static datasets. Shuffle dependencies form stage boundaries, defining the units of parallelism and fault isolation.
Performance Implications
- In-memory persistence eliminates redundant reads, achieving sub-second iterative computations when datasets fit in memory.
- Shuffle optimization through adaptive query execution (AQE) dynamically coalesces partitions and rebalances skewed data.
- Serialization choices (Kryo vs. Java) directly affect CPU efficiency and memory footprint.
- Executor sizing (cores × memory) governs concurrency and GC behavior; oversubscription leads to pauses, while underutilization limits throughput.
Spark’s architecture forms a cohesive runtime that unifies batch, streaming, and SQL workloads under a single execution engine. Its layered coordination between driver, executors, and cluster managers enables high scalability while preserving deterministic fault recovery and consistent performance across heterogeneous clusters.
Structured Streaming: Modes, Windows, and Watermarks
Structured Streaming in Spark executes continuous data processing as a sequence of deterministic state updates. Each micro-batch reads new records from the source, updates aggregation or join state, and writes results according to the defined output mode. The system maintains recovery consistency through checkpointed offsets and write-ahead logs. Every trigger is an atomic transaction that can be re-executed without data loss or duplication.
Output Modes
Spark defines three output modes that control how intermediate results are emitted and when state is cleared.
Append Mode: Emits only finalized results that will not change. Used for event-time windows or aggregations that close permanently after watermark expiration. Spark removes corresponding state immediately after emission to limit memory usage.
Update Mode: Emits only rows whose aggregations have changed since the previous trigger. Suitable for running totals or open windows where values evolve with every batch.
Complete Mode: Outputs the full result table after each trigger. Ensures deterministic global snapshots at higher I/O cost. Common for materialized analytical results or offline reconciliation.
Windowed Computation
Windows define how Spark groups events by event-time boundaries. Each window keeps independent state until the watermark passes its end time plus any configured delay.
Tumbling Windows: Non-overlapping fixed intervals. Each event belongs to one window.
val tumblingCounts =
events
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "5 minutes"),
$"userId"
).count()
Sliding Windows: Overlapping windows with a shorter slide step. Produce finer-grained results and maintain multiple active windows per key.
val slidingCounts =
events
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"userId"
).count()
Session Windows: Dynamic intervals that expand with activity and close after inactivity. Model user or device sessions.
val sessionizedCounts =
events
.withWatermark("timestamp", "5 minutes")
.groupBy(
session_window($"timestamp", "5 minutes"),
$"userId"
).count()
Spark tracks each active window in the state store and releases it once the watermark advances beyond its boundary. This mechanism guarantees bounded state size and predictable cleanup behavior.
Handling Late Data and Watermarking: Event-time order is rarely consistent across distributed sources. Watermarks establish a controlled notion of time progress so Spark can decide when it is safe to finalize results.A watermark at time T indicates that no new events with timestamps ≤ T are expected. When the watermark crosses a window’s end plus the allowed delay, Spark commits its aggregates and removes its state. Watermark computation is local to partitions but synchronized through the global minimum value across operators.This ensures all partitions keep enough history without prematurely evicting late data on faster streams.
Streaming Deduplication: Duplicate events appear when a source replays confirmed offsets or retries delivery. Spark maintains a hash index of processed keys in the streaming state store, persisted across checkpoints to preserve exactly-once guarantees.
val unique = events
.withWatermark("eventTime", "10 minutes")
.dropDuplicates("eventId", "eventTime")
.writeStream
.outputMode("append")
.start()
Each micro-batch compares incoming keys against the stored index and removes entries older than the watermark.
This keeps deduplication state bounded while maintaining deterministic results.
Analytical Capabilities
Apache Spark extends beyond streaming to analytical computation over both real-time and historical data. Its design allows a single runtime to perform aggregations, joins, and machine-learning workloads using the same execution engine and unified APIs. This integration makes Spark suitable for hybrid pipelines where streaming data must be correlated with reference or historical datasets.
Example: Extracting Behavioral Patterns in Real Time (Structured Streaming in Spark)
Task Overview
The goal is to detect shifts in user engagement during active sessions to improve retention and content relevance.
Behavioral patterns are derived from user interactions such as views, scrolls, and clicks, forming short-term metrics like Average Session Duration (ASD) and content completion rate.
The system consumes events from Kafka, aggregates them in micro-batches, and maintains per-user state to track how engagement changes over time.
When a user’s ASD decreases by more than 30 percent compared to their recent average, the pipeline raises an alert that can trigger real-time content adjustments or user-level interventions.
// SparkBehavioralPatterns.scala
// Kafka -> Structured Streaming -> Kafka
// Goal: per-user session metrics (ASD, completion, clicks) and alert when ASD drops > threshold.
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}
import java.sql.Timestamp
// ---- Domain ----
case class RawEvent(userId: String, contentId: String, eventType: String, eventTimeMs: Long, dwellMs: Long, completed: Boolean)
case class Event(userId: String, contentId: String, eventType: String, eventTime: Timestamp, dwellMs: Long, completed: Boolean)
case class SessionAgg(userId: String, sessionStart: Timestamp, sessionEnd: Timestamp, asdMs: Double,
completionRate: Double, clicks: Long)
case class Baseline(emaAsdMs: Double, seen: Long)
case class Alert(userId: String, sessionStart: Timestamp, sessionEnd: Timestamp, asdMs: Double,
baselineAsdMs: Double, dropRatio: Double, note: String)
object SparkBehavioralPatterns {
def main(args: Array[String]): Unit = {
// ---- Config (only essentials) ----
val BOOTSTRAP = sys.props.getOrElse("brokers", "localhost:9092")
val IN_TOPIC = sys.props.getOrElse("in.topic", "user-events")
val OUT_TOPIC = sys.props.getOrElse("out.topic", "engagement-alerts")
val CHECKPOINT = sys.props.getOrElse("checkpoint.dir", "/tmp/spk-checkpoints")
val LATE_SEC = sys.props.getOrElse("watermark.lateness.sec", "120")
val GAP_SEC = sys.props.getOrElse("session.gap.sec", "1800")
val EMA_ALPHA = sys.props.getOrElse("ema.alpha", "0.3").toDouble
val DROP_THRESH = sys.props.getOrElse("asd.drop.threshold", "0.30").toDouble
val spark = SparkSession.builder().appName("StructuredStreaming-BehavioralPatterns").getOrCreate()
import spark.implicits._
// ---- Source: Kafka JSON -> Event ----
val schema = spark.implicits.newProductEncoder[RawEvent].schema
val events: Dataset[Event] =
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", BOOTSTRAP)
.option("subscribe", IN_TOPIC)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING) AS json")
.select(from_json(col("json"), schema).as("j")).select("j.*").as[RawEvent]
.flatMap(r => Option(Event(r.userId, r.contentId, r.eventType,
new Timestamp(r.eventTimeMs), math.max(r.dwellMs,0L), r.completed)))
// ---- Watermark + session aggregates ----
val dwellForAvg = when(col("eventType")==="view_end" && col("dwellMs")>0, col("dwellMs")).otherwise(null)
val viewEnd = when(col("eventType")==="view_end", 1).otherwise(0)
val completedView = when(col("completed")===true, 1).otherwise(0)
val isClick = when(col("eventType")==="click", 1).otherwise(0)
val sessAgg: Dataset[SessionAgg] = events
.withWatermark("eventTime", s"$LATE_SEC seconds")
.groupBy(col("userId"), session_window(col("eventTime"), s"$GAP_SEC seconds").as("sess"))
.agg(
avg(dwellForAvg).as("asdMs"),
(sum(completedView).cast("double") / greatest(sum(viewEnd).cast("double"), lit(1.0))).as("completionRate"),
sum(isClick).cast("long").as("clicks")
)
.select(
col("userId"),
col("sess.start").as("sessionStart"),
col("sess.end").as("sessionEnd"),
coalesce(col("asdMs"), lit(0.0)).as("asdMs"),
coalesce(col("completionRate"), lit(0.0)).as("completionRate"),
col("clicks")
)
.as[SessionAgg]
// ---- Per-user EMA baseline + alerts ----
val alerts = sessAgg
.groupByKey(_.userId)
.flatMapGroupsWithState[Baseline, Alert](OutputMode.Append(), GroupStateTimeout.NoTimeout()) {
case (userId, sessions, state) =>
var s = state.getOption.getOrElse(Baseline(0.0, 0L))
val out = scala.collection.mutable.ListBuffer.empty[Alert]
sessions.foreach { sess =>
val current = math.max(sess.asdMs, 0.0)
val base = if (s.seen == 0) current else s.emaAsdMs
val drop = if (base > 0.0) (base - current) / base else 0.0
if (s.seen > 0 && drop >= DROP_THRESH)
out += Alert(userId, sess.sessionStart, sess.sessionEnd, current, base, drop, "ASD drop exceeds threshold")
val newEma = if (s.seen == 0) current else EMA_ALPHA * current + (1 - EMA_ALPHA) * s.emaAsdMs
s = Baseline(newEma, s.seen + 1)
}
state.update(s)
out.iterator
}
.select(to_json(struct(
col("userId"), col("sessionStart"), col("sessionEnd"),
round(col("asdMs"),1).as("asdMs"),
round(col("baselineAsdMs"),1).as("baselineAsdMs"),
round(col("dropRatio"),3).as("dropRatio"),
col("note")
)).as("value"))
// ---- Sink: alerts -> Kafka ----
alerts.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", BOOTSTRAP)
.option("topic", OUT_TOPIC)
.option("checkpointLocation", s"$CHECKPOINT/alerts")
.outputMode("append")
.start()
spark.streams.awaitAnyTermination()
}
}
Hybrid Analytical System: Spark is used when multiple data streams, historical datasets, and derived models need to meet in one computational layer. It’s the point where operational data becomes analyzable: where aggregates are built, historical context is applied, and metrics are reconciled before they move further into ML or reporting systems.
Compatibility with the Lakehouse Ecosystem: Spark is a native computation engine for the lakehouse ecosystem, tightly integrated with formats such as Delta Lake, Apache Iceberg, and Hudi. It provides transactional guarantees, schema evolution, and time-travel capabilities required for maintaining analytical accuracy over mutable datasets. This compatibility allows streaming and batch pipelines to operate directly on the same storage layer without duplication, making Spark the operational core for reliable lakehouse architectures and enabling MERGE/UPSERT operations that keep slowly changing or late-arriving data consistent across both modes.
Advanced Analytics and Machine Learning Integration: Spark provides broad analytical and machine-learning capabilities in a single execution environment. It supports integration with Python frameworks such as PyTorch and TensorFlow for distributed model training and inference, and works seamlessly with MLflow for experiment tracking and model lifecycle management. This combination makes Spark suitable for building analytical pipelines that include data processing, feature generation, model training, and evaluation in a unified execution environment.
Architectural Models
Golang, Flink, and Spark can be used individually or combined within one data flow, depending on the task context — the type of operations applied to data, their latency targets, time semantics, and the scale of state or analytical depth required.
Golang handles millisecond-level transformations and flow control at the ingestion tier. Flink sustains long-running, stateful computations with event-time semantics and coordinated recovery. Spark performs large-scale joins, reprocessing, and feature or metric computation across streaming and historical datasets.
Go
When: Used when maximum speed and minimal complexity are required — for ingestion, filtering, routing, lightweight enrichment, and maintaining only transient state.
Typical use cases
- Event gateways and collectors (HTTP, gRPC, WebSocket)
- Filtering, mapping, normalization, and routing between topics
- Quick enrichment from in-memory cache or key–value stores
- Real-time APIs and webhooks for immediate responses
Latency
~1–20 ms end-to-end, depending on network and serialization overhead.
Example flow
Producers (HTTP / gRPC / WS)
— Go microservices (filter / map / route)
— Kafka / Redis / downstream APIs
Flink
When: Used when low latency and advanced streaming semantics are required — including event-time processing, windowing, joins, complex event patterns (CEP), and exactly-once guarantees.
Typical use cases
- Tumbling, sliding, or session windows
- Top-N metrics and stateful deduplication
- CEP patterns for fraud detection or anomaly tracking
- Alerting and threshold monitoring
Latency
~20–400 ms end-to-end, depending on window size and checkpoint interval.
Example flow
Kafka (raw events)
— Flink jobs (SQL / Table API / CEP)
— Kafka / Elastic / ClickHouse / Redis / database targets
Spark (Structured Streaming)
When: Used when near–real-time pipelines are required — for large-scale joins, heavy ETL, data lake integration, or machine learning workflows.
Typical use cases
- Real-time data marts and materialized views
- Upserts and merges into Delta Lake, Iceberg, or Parquet
- Building feature datasets for ML training
- Near–real-time BI and reporting pipelines
Latency
~1–30 s end-to-end, depending on micro-batch interval and job complexity.
Example flow
Kafka
— Spark Structured Streaming (SQL / DataFrame)
— Delta / Iceberg / Parquet / ClickHouse
Go > Flink
When: Used when fast ingestion and validation at the edge must be combined with low-latency stream analytics and alerting.
Typical use cases
- Edge collection and validation in Go
- Flink for aggregation, enrichment, CEP, and windowing
- Go layer for API responses, alerts, or notifications
Latency
~50–300 ms end-to-end, depending on event rate and network overhead.
Example flow
Devices or applications
— Go (ingestion and validation)
— Kafka
— Flink (windowing / CEP / enrichment)
— Kafka / Redis / Elastic
— Go (APIs / alerts / webhooks)
Go > Flink > Spark
When: Used when both real-time and near real-time analytics are required — immediate reactions combined with long-term aggregations or model training.
Typical use cases
- Flink for real-time metrics, alerts, or online features
- Spark for historical aggregations, deep joins, data marts, and ML training
- Go for ingestion, validation, and serving endpoints
Latency
Flink (real-time): <500 ms
Spark (batch / near real-time): 5–60 s+
Example flow
Sources
— Go (ingestion)
— Kafka
— Flink (real-time metrics / CEP / feature computation)
— KV stores / Kafka / APIs
— Spark (historical marts / ML / lakehouse)
— Delta / Iceberg / BI / MLflow
Designing Real-Time Analytics Systems: From Requirements to Operations
The design of a real-time analytics system begins not with choosing Go, Flink, or Spark, but with decomposing tasks across latency, correctness, and scale axes. Input: business requirements — “fraud alert in ❤00 ms,” “dashboard refresh every 5 s,” “daily aggregate ready by 07:00.” Each translates into an SLA contract: latency budgets (p50/p95/p99), data freshness, processing semantics (exactly-once vs. at-least-once), and acceptable loss tolerance. Next comes source analysis: event rate (eps), schema evolution, burstiness, out-of-order arrival, duplication patterns. This mapping defines where Golang owns ingestion and validation, where Flink takes over event-time logic and stateful processing, and where Spark enters for analytical materialization.
The critical mistake is attempting to solve everything in one layer. Responsibility boundaries must be explicit: Go handles raw sockets and first-mile validation (sequence gaps, schema drift), Flink owns business logic with event-time and state (CEP, windowing, deduplication), and Spark manages analytical views (joins with reference data, session reconstruction, feature stores). Between layers lie data contracts: Kafka compacted topics as the source of truth, Schema Registry enforcement, Protobuf/Avro with backward/forward compatibility. This separation enables independent scaling: add Go shards under load spikes, increase Flink parallelism for key cardinality, or retune Spark with AQE under skew — without cross-layer regression.
Data operations and observability are not an afterthought — they are baked into the design. Every layer must expose streaming golden signals: input/output rates, processing latency, backlog depth, watermark lag, checkpoint duration, GC pauses, and RocksDB compaction queue. Go exports per-shard Prometheus histograms, Flink surfaces barrier latency and backpressure via its metrics API, and Spark provides stage-level insights through the UI and event logs. These signals drive closed-loop control: Go applies channel backpressure and shard-level circuit breaking, Flink triggers task manager scaling when watermark lag exceeds configured thresholds, and Spark dynamically coalesces partitions via AQE when skew is detected in shuffle read metrics. The end-to-end pipeline — from ingestion to analytics — runs as a self-regulating system, with each tier observable and automatically adaptive.







Top comments (0)