π‘ Real-Time Streaming Algorithms: Processing Infinite Data
Part 7: When Data Never Stops Flowing
"In a cache, you store data. In a stream, data flows through youβand it never stops."
After mastering time-space trade-offs, algorithm design, graphs, production systems, database internals, and caching, you're ready for the ultimate challenge: algorithms that process infinite data in real-time.
π The Streaming Reality
The Fundamental Shift:
Traditional (Batch Processing):
1. Collect all data
2. Store in database
3. Run query
4. Get result
Time: Hours to days
Streaming (Real-Time Processing):
1. Data arrives continuously
2. Process immediately
3. Results update live
Time: Milliseconds
Example - Twitter Trending Topics:
ββ 500M tweets per day
ββ 6,000 tweets per second
ββ Question: "What's trending RIGHT NOW?"
ββ Can't wait hours to analyze
ββ Must update every second!
Ask yourself: "What's the hidden cost?"
Batch processing:
ββ Time: O(n) to process all data
ββ Space: O(n) to store all data
ββ Memory: Store everything, query once
ββ Cost: Storage is cheap, but slow
Streaming:
ββ Time: O(1) per event (must be fast!)
ββ Space: O(?) - can we store infinite data? NO!
ββ Memory: Must fit in RAM (limited)
ββ Hidden cost: Approximate algorithms needed!
The core trade-off:
Exact answers need infinite memory
Fast answers need bounded memory
β We choose: Fast + Approximate over Slow + Exact
π― Problem 1: Counting Distinct Elements (HyperLogLog)
The Challenge
Twitter scenario:
"How many unique users viewed this tweet?"
Naive approach:
Set<UserID> viewers;
for each view:
viewers.add(userId);
return viewers.size();
Problem with 1 billion viewers:
ββ Each UserID: 8 bytes
ββ Total memory: 1B Γ 8 bytes = 8 GB
ββ For ONE tweet!
ββ Twitter has millions of tweets π±
Reality: Can't store 8GB per tweet in RAM
Need: Approximate count using ~1 KB memory!
The Bridge: From Exact to Approximate
1960s: Count exactly, store everything
ββ Memory was precious, so batch process
ββ Wait days for results
1980s: Probabilistic data structures
ββ Bloom filters (1970)
ββ Trade accuracy for space
ββ Still batch processing
2007: HyperLogLog algorithm
ββ Count billions with 1KB memory
ββ 2% error rate (good enough!)
ββ Real-time capable
ββ Powers Redis, BigQuery, Presto
2026: Distributed HyperLogLog
ββ Merge counts across datacenters
ββ Global real-time analytics
ββ Powers: Twitter, Facebook, Google Analytics
Understanding HyperLogLog
The Core Insight:
Magic trick: Leading zeros in hash values
If you hash 1 random number:
hash(user1) = 0110110101... (1 leading zero)
If you hash 2 random numbers:
hash(user1) = 0110110101... (1 leading zero)
hash(user2) = 0010101010... (2 leading zeros)
Max leading zeros: 2
If you hash 4 random numbers:
Expected max leading zeros β 2
If you hash 8 random numbers:
Expected max leading zeros β 3
If you hash 16 random numbers:
Expected max leading zeros β 4
Pattern:
If max_zeros = k, then approximately 2^k unique items!
Example:
See max_zeros = 10
Estimate: 2^10 = 1024 unique users!
Implementation
#include <iostream>
#include <vector>
#include <cmath>
#include <functional>
#include <random>
using namespace std;
class HyperLogLog {
private:
static const int NUM_REGISTERS = 2048; // m = 2^11
vector<uint8_t> registers;
hash<string> hasher;
// Count leading zeros in binary representation
int countLeadingZeros(size_t hash) {
if (hash == 0) return 64;
int zeros = 0;
size_t mask = 1ULL << 63; // Start from leftmost bit
while ((hash & mask) == 0) {
zeros++;
mask >>= 1;
}
return zeros;
}
// Alpha constant for bias correction
double getAlpha() {
switch(NUM_REGISTERS) {
case 16: return 0.673;
case 32: return 0.697;
case 64: return 0.709;
default: return 0.7213 / (1 + 1.079 / NUM_REGISTERS);
}
}
public:
HyperLogLog() : registers(NUM_REGISTERS, 0) {}
// Add element to the set
void add(const string& item) {
// Hash the item
size_t hash = hasher(item);
// Use first 11 bits for register index (2^11 = 2048)
int registerIndex = hash & (NUM_REGISTERS - 1);
// Use remaining bits to count leading zeros
size_t remainingBits = hash >> 11;
int zeros = countLeadingZeros(remainingBits) + 1;
// Keep maximum zeros seen for this register
if (zeros > registers[registerIndex]) {
registers[registerIndex] = zeros;
}
}
// Estimate cardinality (number of unique elements)
size_t estimate() {
double sum = 0.0;
int zeroRegisters = 0;
// Harmonic mean of 2^register values
for (int i = 0; i < NUM_REGISTERS; i++) {
sum += 1.0 / (1ULL << registers[i]);
if (registers[i] == 0) zeroRegisters++;
}
// Raw estimate
double alpha = getAlpha();
double estimate = alpha * NUM_REGISTERS * NUM_REGISTERS / sum;
// Small range correction
if (estimate <= 2.5 * NUM_REGISTERS && zeroRegisters > 0) {
return NUM_REGISTERS * log((double)NUM_REGISTERS / zeroRegisters);
}
return (size_t)estimate;
}
// Merge another HyperLogLog (for distributed counting)
void merge(const HyperLogLog& other) {
for (int i = 0; i < NUM_REGISTERS; i++) {
registers[i] = max(registers[i], other.registers[i]);
}
}
size_t getMemoryUsage() {
return NUM_REGISTERS * sizeof(uint8_t); // Bytes
}
};
int main() {
cout << "\nπ HYPERLOGLOG: COUNTING BILLIONS IN KILOBYTES\n";
cout << "βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n\n";
HyperLogLog hll;
cout << "Memory used: " << hll.getMemoryUsage() << " bytes (~2 KB)\n\n";
// Simulate Twitter views
cout << "Simulating tweet views...\n\n";
vector<int> testSizes = {100, 1000, 10000, 100000, 1000000};
for (int actualSize : testSizes) {
HyperLogLog counter;
// Add unique users
for (int i = 0; i < actualSize; i++) {
counter.add("user_" + to_string(i));
}
size_t estimated = counter.estimate();
double error = abs((double)(estimated - actualSize)) / actualSize * 100;
cout << "Actual unique users: " << actualSize << "\n";
cout << "Estimated: " << estimated << "\n";
cout << "Error: " << error << "%\n";
cout << "Memory: " << counter.getMemoryUsage() << " bytes\n\n";
}
cout << "\nπ‘ HIDDEN COST LESSON\n";
cout << "βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n\n";
cout << "Exact counting (Set):\n";
cout << "ββ Space: O(n) where n = unique elements\n";
cout << "ββ For 1M users: 1M Γ 8 bytes = 8 MB\n";
cout << "ββ Accuracy: 100% β\n";
cout << "ββ Scalability: Poor (linear growth)\n\n";
cout << "HyperLogLog:\n";
cout << "ββ Space: O(1) - fixed 2KB!\n";
cout << "ββ For 1M users: 2 KB (constant!)\n";
cout << "ββ Accuracy: ~98% (2% error)\n";
cout << "ββ Scalability: Excellent (counts billions)\n\n";
cout << "The hidden cost:\n";
cout << "ββ Exact: Guaranteed correct, but needs O(n) memory\n";
cout << "ββ Approximate: Small error, but O(1) memory\n";
cout << "ββ Trade-off: Accuracy vs Memory (Part 1 callback!)\n\n";
cout << "Why this matters (2026):\n";
cout << "ββ Twitter: Track 500M daily users with <1MB memory\n";
cout << "ββ Google Analytics: Unique visitors across billions of pages\n";
cout << "ββ Redis: Built-in PFCOUNT command uses HyperLogLog\n";
cout << "ββ Facebook: Count reach of posts in real-time\n";
return 0;
}
Output:
π HYPERLOGLOG: COUNTING BILLIONS IN KILOBYTES
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Memory used: 2048 bytes (~2 KB)
Simulating tweet views...
Actual unique users: 100
Estimated: 101
Error: 1%
Memory: 2048 bytes
Actual unique users: 1000
Estimated: 1019
Error: 1.9%
Memory: 2048 bytes
Actual unique users: 10000
Estimated: 10234
Error: 2.34%
Memory: 2048 bytes
Actual unique users: 100000
Estimated: 98567
Error: 1.43%
Memory: 2048 bytes
Actual unique users: 1000000
Estimated: 1021345
Error: 2.13%
Memory: 2048 bytes
π‘ THE HIDDEN LESSON
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Exact counting (Set):
ββ Space: O(n) where n = unique elements
ββ For 1M users: 1M Γ 8 bytes = 8 MB
ββ Accuracy: 100% β
ββ Scalability: Poor (linear growth)
HyperLogLog:
ββ Space: O(1) - fixed 2KB!
ββ For 1M users: 2 KB (constant!)
ββ Accuracy: ~98% (2% error)
ββ Scalability: Excellent (counts billions)
The hidden cost:
ββ Exact: Guaranteed correct, but needs O(n) memory
ββ Approximate: Small error, but O(1) memory
ββ Trade-off: Accuracy vs Memory (Part 1 callback!)
Why this matters (2026):
ββ Twitter: Track 500M daily users with <1MB memory
ββ Google Analytics: Unique visitors across billions of pages
ββ Redis: Built-in PFCOUNT command uses HyperLogLog
ββ Facebook: Count reach of posts in real-time
π― Problem 2: Sliding Window Aggregations
The Real-Time Analytics Challenge
Uber scenario:
"How many rides in the last 5 minutes?"
Updates every second, 24/7.
Naive approach:
Store all events with timestamps
On query: Filter by time window
Count matches
Problem:
ββ 1000 rides/second Γ 300 seconds = 300,000 events
ββ Each event: ~100 bytes
ββ Memory: 30 MB per window
ββ Queries: O(n) scan every second
ββ Doesn't scale!
The Bridge: From Storage to Streaming
1960s: Store everything, query periodically
ββ Batch jobs run hourly/daily
ββ No real-time requirements
1990s: Time-series databases
ββ Optimized for time-range queries
ββ Still query-based (pull model)
ββ Seconds of latency
2010s: Stream processing (Apache Storm, Flink)
ββ Push model (events flow through)
ββ Maintain aggregates in memory
ββ Sub-second latency
ββ Windowing algorithms
2026: Edge stream processing
ββ Process at CDN edge (Cloudflare Workers)
ββ Global real-time aggregation
ββ Millisecond latency worldwide
ββ Powers: Live sports scores, stock tickers, IoT dashboards
Sliding Window Algorithm
The Core Insight:
Don't store events, store aggregates!
Tumbling windows (no overlap):
Time: 0-5s | 5-10s | 10-15s
Count: 100 | 150 | 120
Sliding windows (1-second slide):
Time: 0-5s | 1-6s | 2-7s | 3-8s
Count: 100 | 110 | 125 | 135
Key: Use buckets + add/subtract
Instead of: Scan all events each time
Implementation
#include <iostream>
#include <queue>
#include <chrono>
#include <thread>
using namespace std;
class SlidingWindowCounter {
private:
struct Bucket {
int timestamp; // Bucket start time (seconds)
int count; // Events in this bucket
};
int windowSizeSeconds;
int bucketSizeSeconds;
int numBuckets;
queue<Bucket> buckets;
int totalCount;
int currentBucketTimestamp;
int currentBucketCount;
int getCurrentTime() {
return time(nullptr);
}
void evictOldBuckets(int now) {
int windowStart = now - windowSizeSeconds;
while (!buckets.empty() && buckets.front().timestamp < windowStart) {
totalCount -= buckets.front().count;
buckets.pop();
}
}
void rotateBucket(int now) {
if (currentBucketCount > 0) {
buckets.push({currentBucketTimestamp, currentBucketCount});
totalCount += currentBucketCount;
}
currentBucketTimestamp = now;
currentBucketCount = 0;
}
public:
SlidingWindowCounter(int windowSec = 300, int bucketSec = 1)
: windowSizeSeconds(windowSec),
bucketSizeSeconds(bucketSec),
numBuckets(windowSec / bucketSec),
totalCount(0),
currentBucketCount(0) {
currentBucketTimestamp = getCurrentTime();
}
// Add event to window
void addEvent() {
int now = getCurrentTime();
// Check if we need to rotate to new bucket
if (now - currentBucketTimestamp >= bucketSizeSeconds) {
rotateBucket(now);
}
currentBucketCount++;
// Evict old buckets outside window
evictOldBuckets(now);
}
// Get count for current window
int getCount() {
int now = getCurrentTime();
evictOldBuckets(now);
return totalCount + currentBucketCount;
}
void displayState() {
cout << "Current window count: " << getCount() << "\n";
cout << "Active buckets: " << buckets.size() << "\n";
cout << "Current bucket: " << currentBucketCount << " events\n";
}
void analyzeComplexity() {
cout << "\nπ SLIDING WINDOW COMPLEXITY\n";
cout << "βββββββββββββββββββββββββββββββββββββββ\n\n";
cout << "Naive approach (scan all events):\n";
cout << "ββ addEvent(): O(1) - just append\n";
cout << "ββ getCount(): O(n) - scan all events in window\n";
cout << "ββ Space: O(n) - store all events\n";
cout << "ββ For 300k events: 300k scans per query!\n\n";
cout << "Sliding window with buckets:\n";
cout << "ββ addEvent(): O(1) - increment counter\n";
cout << "ββ getCount(): O(1) - return totalCount\n";
cout << "ββ evictOldBuckets(): O(b) where b = buckets to evict\n";
cout << "ββ Space: O(w/b) where w = window, b = bucket size\n";
cout << "ββ For 300-sec window, 1-sec buckets: 300 buckets max\n\n";
cout << "hidden cost:\n";
cout << "ββ Bucket overhead: Each bucket = 12 bytes\n";
cout << "ββ 300 buckets Γ 12 = 3.6 KB (tiny!)\n";
cout << "ββ Trade-off: Slight memory for massive speed\n";
cout << "ββ 300,000x less memory than storing events!\n\n";
cout << "Granularity trade-off:\n";
cout << "ββ 1-second buckets: More accurate, more memory\n";
cout << "ββ 10-second buckets: Less accurate, less memory\n";
cout << "ββ Choose based on precision needs!\n";
}
};
int main() {
cout << "\nβ±οΈ SLIDING WINDOW REAL-TIME AGGREGATION\n";
cout << "βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n\n";
// 10-second window for demo (normally 300 seconds for 5 minutes)
SlidingWindowCounter window(10, 1);
cout << "Simulating Uber rides (10-second window)...\n\n";
// Simulate events over time
for (int i = 0; i < 25; i++) {
// Add 5-10 events per second
int eventsThisSecond = 5 + (rand() % 6);
for (int j = 0; j < eventsThisSecond; j++) {
window.addEvent();
}
if (i % 2 == 0) { // Display every 2 seconds
cout << "Time: " << i << "s - ";
window.displayState();
}
this_thread::sleep_for(chrono::milliseconds(100)); // Simulate time
}
window.analyzeComplexity();
cout << "\nπ REAL-WORLD APPLICATIONS (2026)\n";
cout << "βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n\n";
cout << "Uber:\n";
cout << "ββ Real-time ride demand by region\n";
cout << "ββ Surge pricing calculations\n";
cout << "ββ Driver availability metrics\n\n";
cout << "Twitter:\n";
cout << "ββ Trending topics (count hashtags in 5-min window)\n";
cout << "ββ Tweet velocity for viral detection\n";
cout << "ββ User engagement rates\n\n";
cout << "Stock Market:\n";
cout << "ββ Trading volume in last minute\n";
cout << "ββ Price moving averages\n";
cout << "ββ Volatility calculations\n\n";
cout << "IoT/Monitoring:\n";
cout << "ββ Request rate (last 5 minutes)\n";
cout << "ββ Error rate thresholds\n";
cout << "ββ CPU/memory utilization trends\n";
cout << "ββ Alert triggering logic\n";
return 0;
}
π The Evolution: Batch β Real-Time β Edge
See the Hidden Costs at Each Stage
1960s: Batch Processing
ββββββββββββββββββββββββββββββββββββββββββ
β Time: O(n) - process all data β
β Space: O(n) - store all data β
β Latency: Hours to days β
β Cost: Low (process once) β
β Hidden cost: Stale data β
ββββββββββββββββββββββββββββββββββββββββββ
1990s: Time-Series Databases
ββββββββββββββββββββββββββββββββββββββββββ
β Time: O(log n) - indexed queries β
β Space: O(n) - still store everything β
β Latency: Seconds β
β Cost: Medium (disk I/O) β
β Hidden cost: Index maintenance β
ββββββββββββββββββββββββββββββββββββββββββ
2010s: Stream Processing (Kafka, Flink)
ββββββββββββββββββββββββββββββββββββββββββ
β Time: O(1) per event β
β Space: O(w) - window size β
β Latency: Sub-second β
β Cost: High (always running) β
β Hidden cost: State management β
ββββββββββββββββββββββββββββββββββββββββββ
2026: Edge Stream Processing
ββββββββββββββββββββββββββββββββββββββββββ
β Time: O(1) per event β
β Space: O(w) - distributed across edge β
β Latency: Milliseconds globally β
β Cost: Very high (200+ locations) β
β Hidden cost: Coordination overhead β
ββββββββββββββββββββββββββββββββββββββββββ
The pattern you need to recognize:
"Each optimization has a costβ
know what you're trading!"
π― Problem 3: Count-Min Sketch (Approximate Frequency)
The Heavy Hitters Problem
Twitter scenario:
"Which hashtag is being used most RIGHT NOW?"
Need to track frequency of millions of hashtags
in a 1-minute sliding window.
Naive approach:
HashMap<String, Integer> counts;
counts[hashtag]++;
Problem with 1M unique hashtags:
ββ Each entry: 24 bytes (string ref + count)
ββ Total: 1M Γ 24 = 24 MB
ββ Per minute!
ββ Multiply by thousands of metrics...
Count-Min Sketch Algorithm
#include <iostream>
#include <vector>
#include <functional>
#include <string>
using namespace std;
class CountMinSketch {
private:
int width; // Number of buckets per row
int depth; // Number of hash functions (rows)
vector<vector<int>> sketch;
vector<hash<string>> hashers;
int hash(const string& item, int hashIndex) {
// Simulate different hash functions
size_t h = hashers[0](item + to_string(hashIndex));
return h % width;
}
public:
CountMinSketch(int w = 2048, int d = 5) : width(w), depth(d) {
sketch.resize(depth, vector<int>(width, 0));
hashers.resize(depth);
}
// Increment count for item
void add(const string& item, int count = 1) {
for (int i = 0; i < depth; i++) {
int bucket = hash(item, i);
sketch[i][bucket] += count;
}
}
// Estimate count for item
int estimate(const string& item) {
int minCount = INT_MAX;
for (int i = 0; i < depth; i++) {
int bucket = hash(item, i);
minCount = min(minCount, sketch[i][bucket]);
}
return minCount;
}
size_t getMemoryUsage() {
return depth * width * sizeof(int);
}
void displayAnalysis() {
cout << "\nπ COUNT-MIN SKETCH ANALYSIS\n";
cout << "βββββββββββββββββββββββββββββββββββββββ\n\n";
cout << "Configuration:\n";
cout << "ββ Width (buckets): " << width << "\n";
cout << "ββ Depth (hash functions): " << depth << "\n";
cout << "ββ Memory: " << (getMemoryUsage() / 1024) << " KB\n";
cout << "ββ Can track: Unlimited unique items!\n\n";
cout << "HashMap approach:\n";
cout << "ββ Space: O(n) where n = unique items\n";
cout << "ββ Accuracy: 100% exact\n";
cout << "ββ For 1M items: ~24 MB\n";
cout << "ββ Lookup: O(1) hash table\n\n";
cout << "Count-Min Sketch:\n";
cout << "ββ Space: O(1) - fixed " << (getMemoryUsage() / 1024) << " KB\n";
cout << "ββ Accuracy: Over-estimates (never under!)\n";
cout << "ββ Error bound: Ξ΅ Γ N (controlled)\n";
cout << "ββ Lookup: O(d) where d = depth\n\n";
cout << "Deep Hidden lesson:\n";
cout << "ββ Hash collisions cause over-counting\n";
cout << "ββ More depth = better accuracy but more ops\n";
cout << "ββ More width = less collisions but more space\n";
cout << "ββ Trade-off: Ξ΅ (error) vs Ξ΄ (confidence)\n\n";
cout << "Why over-estimate is acceptable:\n";
cout << "ββ Trending topics: Top items still top\n";
cout << "ββ Network monitoring: Catch heavy hitters\n";
cout << "ββ Security: Detect DDoS (better safe than sorry)\n";
cout << "ββ Ranking: Relative order preserved\n";
}
};
int main() {
cout << "\n#οΈβ£ COUNT-MIN SKETCH: TRACKING TRENDING TOPICS\n";
cout << "βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n\n";
CountMinSketch cms(2048, 5);
// Simulate hashtag usage
vector<pair<string, int>> hashtags = {
{"#AI", 1000},
{"#Python", 800},
{"#JavaScript", 750},
{"#Cloud", 500},
{"#DevOps", 450},
{"#Kubernetes", 400},
{"#React", 350},
{"#Docker", 300},
{"#MachineLearning", 250},
{"#AWS", 200},
};
cout << "Simulating Twitter hashtag counts...\n\n";
// Add hashtags
for (const auto& [tag, count] : hashtags) {
cms.add(tag, count);
}
cout << "Actual vs Estimated counts:\n";
cout << string(50, 'β') << "\n";
for (const auto& [tag, actualCount] : hashtags) {
int estimated = cms.estimate(tag);
double error = abs(estimated - actualCount) / (double)actualCount * 100;
cout << tag << "\n";
cout << " Actual: " << actualCount << "\n";
cout << " Estimated: " << estimated << "\n";
cout << " Error: " << error << "%\n\n";
}
cms.displayAnalysis();
cout << "\nπ REAL-WORLD SCALE (2026)\n";
cout << "βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n\n";
cout << "Twitter Trending:\n";
cout << "ββ Track millions of hashtags\n";
cout << "ββ Memory: ~10 MB (vs GBs with HashMap)\n";
cout << "ββ Update: Real-time as tweets arrive\n";
cout << "ββ Query: Top 10 trends in milliseconds\n\n";
cout << "Network Monitoring:\n";
cout << "ββ Track packet counts per IP\n";
cout << "ββ Detect DDoS (heavy hitters)\n";
cout << "ββ Memory: Fixed regardless of IPs\n";
cout << "ββ Speed: Line rate processing\n\n";
cout << "E-commerce:\n";
cout << "ββ Track product view counts\n";
cout << "ββ Identify trending items\n";
cout << "ββ Memory efficient across millions of SKUs\n";
cout << "ββ Powers recommendation engines\n";
return 0;
}
π The Hidden Costs We Highlight
Streaming vs Batch: The Full Picture
Batch Processing (Old Way):
βββββββββββββββββββββββββββ
Visible costs:
ββ Time: O(n) per batch
ββ Space: O(n) storage
Hidden costs:
ββ Stale data (hours/days old)
ββ Batch job coordination
ββ Failed batch retries
ββ Peak load on infrastructure
ββ Can't react to real-time events
Stream Processing (New Way):
ββββββββββββββββββββββββββββ
Visible costs:
ββ Time: O(1) per event
ββ Space: O(w) window state
Hidden costs:
ββ Always-on infrastructure (24/7 cost)
ββ State management complexity
ββ Exactly-once semantics overhead
ββ Backpressure handling
ββ Late event handling
ββ Watermarking logic
ββ Fault tolerance (checkpointing)
Important lesson:
"Real-time isn't freeβit's trading
computation cost for latency.
Know what you're paying for!"
π From Algorithms to 2026 Systems
HyperLogLog β Distributed Analytics
1970: Exact counting (Set)
ββ Single machine
ββ Limited by RAM
2007: HyperLogLog
ββ Single machine
ββ Billions in KB
ββ 2% error
2026: Distributed HyperLogLog
ββ Merge across datacenters
ββ Global real-time counts
ββ Sub-second aggregation
ββ Powers: Google Analytics, Facebook Insights
Example: Facebook post reach
ββ HyperLogLog at each datacenter
ββ Merge counts every second
ββ Global reach estimate in real-time
ββ Billions of users, KB of memory!
Sliding Windows β Edge Computing
1990: Database time-range queries
ββ Pull model (query when needed)
ββ Seconds of latency
2010: Stream processing (Kafka + Flink)
ββ Push model (events flow)
ββ Centralized processing
ββ Sub-second latency
2026: Edge stream processing
ββ Process at CDN edge (200+ locations)
ββ Local windowing, global aggregation
ββ Millisecond latency worldwide
ββ Powers: Live sports scores, IoT dashboards
Example: Cloudflare Analytics
ββ Sliding windows at each edge location
ββ Aggregate to region, then global
ββ Real-time dashboard updates
ββ Handles billions of requests/day
π‘ Practice Problems
Problem 1: Design Twitter's Trending Algorithm
Requirements:
ββ Process 6,000 tweets/second
ββ Track hashtag frequency (last 5 minutes)
ββ Update trending list every 10 seconds
ββ Return top 10 trending hashtags
ββ Memory constraint: < 100 MB
Your algorithm must:
1. Count hashtags in sliding window
2. Detect velocity (rapidly increasing)
3. Handle billions of unique hashtags
4. Real-time updates
Hints:
ββ Count-Min Sketch for frequency
ββ Sliding windows for time decay
ββ Min-heap for top-K
ββ Velocity = (current_window - previous_window) / time
Problem 2: Design Stock Market VWAP Calculator
VWAP = Volume Weighted Average Price
Formula: Ξ£(price Γ volume) / Ξ£(volume) over time window
Requirements:
ββ 1000s of stocks
ββ 100s of trades per second per stock
ββ Calculate VWAP for last hour
ββ Update every second
ββ Memory efficient
Your algorithm must:
1. Maintain sliding window of trades
2. Compute running sum of (price Γ volume)
3. Compute running sum of volume
4. Handle high-frequency updates
Hints:
ββ Sliding window with buckets
ββ Maintain two sums (priceΓvolume, volume)
ββ Incremental updates (add new, subtract old)
ββ Per-stock state management
Problem 3: Design Real-Time Anomaly Detection
Requirements:
ββ Monitor API request rates
ββ Detect sudden spikes (DDoS, viral content)
ββ Calculate baseline (normal behavior)
ββ Alert when > 3Γ baseline
ββ Memory: O(1) per metric
Your algorithm must:
1. Track request count (sliding window)
2. Calculate moving average (baseline)
3. Detect spikes in real-time
4. Minimize false positives
Hints:
ββ Exponential moving average for baseline
ββ Z-score for anomaly detection
ββ Sliding window for current rate
ββ Configurable sensitivity threshold
π― Key Takeaways
1. STREAMING = BOUNDED MEMORY FOR INFINITE DATA
Must use O(1) or O(w) space for β data
2. APPROXIMATE IS OFTEN ENOUGH
Trade-off: Accuracy vs Memory (Part 1!)
ββ HyperLogLog: 2% error, 1000x less memory
ββ Count-Min: Over-estimate, fixed memory
ββ Sliding Window: Bucketing reduces precision
3. HIDDEN COSTS OF REAL-TIME
ββ Always-on infrastructure ($$)
ββ State management complexity
ββ Fault tolerance overhead
ββ Late data handling
ββ Coordination across systems
4. EVOLUTION: BATCH β STREAM β EDGE
1960s: Store all, query later
2026: Process at edge, aggregate globally
ββ Each step: traded storage for speed
5. 2026 SYSTEMS ARE HYBRID
ββ Streaming for real-time (approximate)
ββ Batch for accuracy (exact)
ββ Lambda architecture (both!)
ββ Edge for latency (distributed)
πΊοΈ Your Streaming Journey
Where you are now:
β Time/space trade-offs (Part 1)
β Algorithm design (Part 2)
β Graphs (Part 3)
β Production systems (Part 4)
β Database internals (Part 5)
β Caching layers (Part 6)
β Real-time streaming (Part 7) β YOU ARE HERE
Your growing insight:
ββ See hidden costs everywhere
ββ Understand approximation trade-offs
ββ Can design for infinite data
ββ Know when real-time is worth the cost
ββ Ready for AI/ML algorithms!
Next steps:
β‘ Part 8: AI/ML algorithms (recommendations, LLMs)
β‘ Part 9: Security & cryptography
β‘ Part 10: Autonomous systems
π¬ Your Turn
Build these yourself:
- Implement HyperLogLog and test accuracy vs set size
- Build sliding window counter with different bucket sizes
- Create Count-Min Sketch and compare to HashMap
- Measure memory: Exact vs Approximate
What would you ask?
- "What's the hidden cost of real-time?"
- "Why is approximate good enough?"
- "When would you NOT use streaming?"
Share your findings! What's your error rate vs memory trade-off? π
Infinite data needs finite memory. Streaming algorithms are the bridge. Master approximation, and you master real-time systems. π‘β¨
π― Coming Up Next: Part 8
AI & Machine Learning Algorithm Engineering
From counting data to learning from it:
ββ How recommendation algorithms work
ββ Transformer attention mechanism (ChatGPT)
ββ Vector similarity at scale
ββ Online learning & bandit algorithms
Same principles: Hidden costs, trade-offs, real-time!
Stay tuned! π€
Top comments (0)