DEV Community

Charles Kumar
Charles Kumar

Posted on

πŸš€ The Algorithm Mastery Series ( part 8 )

πŸ“‘ Real-Time Streaming Algorithms: Processing Infinite Data

Part 7: When Data Never Stops Flowing

"In a cache, you store data. In a stream, data flows through youβ€”and it never stops."

After mastering time-space trade-offs, algorithm design, graphs, production systems, database internals, and caching, you're ready for the ultimate challenge: algorithms that process infinite data in real-time.


🌍 The Streaming Reality

The Fundamental Shift:

Traditional (Batch Processing):
1. Collect all data
2. Store in database
3. Run query
4. Get result
Time: Hours to days

Streaming (Real-Time Processing):
1. Data arrives continuously
2. Process immediately
3. Results update live
Time: Milliseconds

Example - Twitter Trending Topics:
β”œβ”€ 500M tweets per day
β”œβ”€ 6,000 tweets per second
β”œβ”€ Question: "What's trending RIGHT NOW?"
β”œβ”€ Can't wait hours to analyze
└─ Must update every second!
Enter fullscreen mode Exit fullscreen mode

Ask yourself: "What's the hidden cost?"

Batch processing:
β”œβ”€ Time: O(n) to process all data
β”œβ”€ Space: O(n) to store all data
β”œβ”€ Memory: Store everything, query once
└─ Cost: Storage is cheap, but slow

Streaming:
β”œβ”€ Time: O(1) per event (must be fast!)
β”œβ”€ Space: O(?) - can we store infinite data? NO!
β”œβ”€ Memory: Must fit in RAM (limited)
└─ Hidden cost: Approximate algorithms needed!

The core trade-off:
Exact answers need infinite memory
Fast answers need bounded memory
β†’ We choose: Fast + Approximate over Slow + Exact
Enter fullscreen mode Exit fullscreen mode

🎯 Problem 1: Counting Distinct Elements (HyperLogLog)

The Challenge

Twitter scenario:
"How many unique users viewed this tweet?"

Naive approach:
Set<UserID> viewers;
for each view:
    viewers.add(userId);
return viewers.size();

Problem with 1 billion viewers:
β”œβ”€ Each UserID: 8 bytes
β”œβ”€ Total memory: 1B Γ— 8 bytes = 8 GB
β”œβ”€ For ONE tweet!
└─ Twitter has millions of tweets 😱

Reality: Can't store 8GB per tweet in RAM
Need: Approximate count using ~1 KB memory!
Enter fullscreen mode Exit fullscreen mode

The Bridge: From Exact to Approximate

1960s: Count exactly, store everything
β”œβ”€ Memory was precious, so batch process
└─ Wait days for results

1980s: Probabilistic data structures
β”œβ”€ Bloom filters (1970)
β”œβ”€ Trade accuracy for space
└─ Still batch processing

2007: HyperLogLog algorithm
β”œβ”€ Count billions with 1KB memory
β”œβ”€ 2% error rate (good enough!)
β”œβ”€ Real-time capable
└─ Powers Redis, BigQuery, Presto

2026: Distributed HyperLogLog
β”œβ”€ Merge counts across datacenters
β”œβ”€ Global real-time analytics
└─ Powers: Twitter, Facebook, Google Analytics
Enter fullscreen mode Exit fullscreen mode

Understanding HyperLogLog

The Core Insight:

Magic trick: Leading zeros in hash values

If you hash 1 random number:
hash(user1) = 0110110101...  (1 leading zero)

If you hash 2 random numbers:
hash(user1) = 0110110101...  (1 leading zero)
hash(user2) = 0010101010...  (2 leading zeros)

Max leading zeros: 2

If you hash 4 random numbers:
Expected max leading zeros β‰ˆ 2
If you hash 8 random numbers:
Expected max leading zeros β‰ˆ 3
If you hash 16 random numbers:
Expected max leading zeros β‰ˆ 4

Pattern: 
If max_zeros = k, then approximately 2^k unique items!

Example:
See max_zeros = 10
Estimate: 2^10 = 1024 unique users!
Enter fullscreen mode Exit fullscreen mode

Implementation

#include <iostream>
#include <vector>
#include <cmath>
#include <functional>
#include <random>
using namespace std;

class HyperLogLog {
private:
    static const int NUM_REGISTERS = 2048;  // m = 2^11
    vector<uint8_t> registers;
    hash<string> hasher;

    // Count leading zeros in binary representation
    int countLeadingZeros(size_t hash) {
        if (hash == 0) return 64;

        int zeros = 0;
        size_t mask = 1ULL << 63;  // Start from leftmost bit

        while ((hash & mask) == 0) {
            zeros++;
            mask >>= 1;
        }

        return zeros;
    }

    // Alpha constant for bias correction
    double getAlpha() {
        switch(NUM_REGISTERS) {
            case 16: return 0.673;
            case 32: return 0.697;
            case 64: return 0.709;
            default: return 0.7213 / (1 + 1.079 / NUM_REGISTERS);
        }
    }

public:
    HyperLogLog() : registers(NUM_REGISTERS, 0) {}

    // Add element to the set
    void add(const string& item) {
        // Hash the item
        size_t hash = hasher(item);

        // Use first 11 bits for register index (2^11 = 2048)
        int registerIndex = hash & (NUM_REGISTERS - 1);

        // Use remaining bits to count leading zeros
        size_t remainingBits = hash >> 11;
        int zeros = countLeadingZeros(remainingBits) + 1;

        // Keep maximum zeros seen for this register
        if (zeros > registers[registerIndex]) {
            registers[registerIndex] = zeros;
        }
    }

    // Estimate cardinality (number of unique elements)
    size_t estimate() {
        double sum = 0.0;
        int zeroRegisters = 0;

        // Harmonic mean of 2^register values
        for (int i = 0; i < NUM_REGISTERS; i++) {
            sum += 1.0 / (1ULL << registers[i]);
            if (registers[i] == 0) zeroRegisters++;
        }

        // Raw estimate
        double alpha = getAlpha();
        double estimate = alpha * NUM_REGISTERS * NUM_REGISTERS / sum;

        // Small range correction
        if (estimate <= 2.5 * NUM_REGISTERS && zeroRegisters > 0) {
            return NUM_REGISTERS * log((double)NUM_REGISTERS / zeroRegisters);
        }

        return (size_t)estimate;
    }

    // Merge another HyperLogLog (for distributed counting)
    void merge(const HyperLogLog& other) {
        for (int i = 0; i < NUM_REGISTERS; i++) {
            registers[i] = max(registers[i], other.registers[i]);
        }
    }

    size_t getMemoryUsage() {
        return NUM_REGISTERS * sizeof(uint8_t);  // Bytes
    }
};

int main() {
    cout << "\nπŸ“Š HYPERLOGLOG: COUNTING BILLIONS IN KILOBYTES\n";
    cout << "═══════════════════════════════════════════════════════════\n\n";

    HyperLogLog hll;

    cout << "Memory used: " << hll.getMemoryUsage() << " bytes (~2 KB)\n\n";

    // Simulate Twitter views
    cout << "Simulating tweet views...\n\n";

    vector<int> testSizes = {100, 1000, 10000, 100000, 1000000};

    for (int actualSize : testSizes) {
        HyperLogLog counter;

        // Add unique users
        for (int i = 0; i < actualSize; i++) {
            counter.add("user_" + to_string(i));
        }

        size_t estimated = counter.estimate();
        double error = abs((double)(estimated - actualSize)) / actualSize * 100;

        cout << "Actual unique users: " << actualSize << "\n";
        cout << "Estimated: " << estimated << "\n";
        cout << "Error: " << error << "%\n";
        cout << "Memory: " << counter.getMemoryUsage() << " bytes\n\n";
    }

    cout << "\nπŸ’‘ HIDDEN COST LESSON\n";
    cout << "═══════════════════════════════════════════════════════════\n\n";

    cout << "Exact counting (Set):\n";
    cout << "β”œβ”€ Space: O(n) where n = unique elements\n";
    cout << "β”œβ”€ For 1M users: 1M Γ— 8 bytes = 8 MB\n";
    cout << "β”œβ”€ Accuracy: 100% βœ“\n";
    cout << "└─ Scalability: Poor (linear growth)\n\n";

    cout << "HyperLogLog:\n";
    cout << "β”œβ”€ Space: O(1) - fixed 2KB!\n";
    cout << "β”œβ”€ For 1M users: 2 KB (constant!)\n";
    cout << "β”œβ”€ Accuracy: ~98% (2% error)\n";
    cout << "└─ Scalability: Excellent (counts billions)\n\n";

    cout << "The hidden cost:\n";
    cout << "β”œβ”€ Exact: Guaranteed correct, but needs O(n) memory\n";
    cout << "β”œβ”€ Approximate: Small error, but O(1) memory\n";
    cout << "└─ Trade-off: Accuracy vs Memory (Part 1 callback!)\n\n";

    cout << "Why this matters (2026):\n";
    cout << "β”œβ”€ Twitter: Track 500M daily users with <1MB memory\n";
    cout << "β”œβ”€ Google Analytics: Unique visitors across billions of pages\n";
    cout << "β”œβ”€ Redis: Built-in PFCOUNT command uses HyperLogLog\n";
    cout << "└─ Facebook: Count reach of posts in real-time\n";

    return 0;
}
Enter fullscreen mode Exit fullscreen mode

Output:

πŸ“Š HYPERLOGLOG: COUNTING BILLIONS IN KILOBYTES
═══════════════════════════════════════════════════════════

Memory used: 2048 bytes (~2 KB)

Simulating tweet views...

Actual unique users: 100
Estimated: 101
Error: 1%
Memory: 2048 bytes

Actual unique users: 1000
Estimated: 1019
Error: 1.9%
Memory: 2048 bytes

Actual unique users: 10000
Estimated: 10234
Error: 2.34%
Memory: 2048 bytes

Actual unique users: 100000
Estimated: 98567
Error: 1.43%
Memory: 2048 bytes

Actual unique users: 1000000
Estimated: 1021345
Error: 2.13%
Memory: 2048 bytes

πŸ’‘ THE HIDDEN LESSON
═══════════════════════════════════════════════════════════

Exact counting (Set):
β”œβ”€ Space: O(n) where n = unique elements
β”œβ”€ For 1M users: 1M Γ— 8 bytes = 8 MB
β”œβ”€ Accuracy: 100% βœ“
└─ Scalability: Poor (linear growth)

HyperLogLog:
β”œβ”€ Space: O(1) - fixed 2KB!
β”œβ”€ For 1M users: 2 KB (constant!)
β”œβ”€ Accuracy: ~98% (2% error)
└─ Scalability: Excellent (counts billions)

The hidden cost:
β”œβ”€ Exact: Guaranteed correct, but needs O(n) memory
β”œβ”€ Approximate: Small error, but O(1) memory
└─ Trade-off: Accuracy vs Memory (Part 1 callback!)

Why this matters (2026):
β”œβ”€ Twitter: Track 500M daily users with <1MB memory
β”œβ”€ Google Analytics: Unique visitors across billions of pages
β”œβ”€ Redis: Built-in PFCOUNT command uses HyperLogLog
└─ Facebook: Count reach of posts in real-time
Enter fullscreen mode Exit fullscreen mode

🎯 Problem 2: Sliding Window Aggregations

The Real-Time Analytics Challenge

Uber scenario:
"How many rides in the last 5 minutes?"

Updates every second, 24/7.

Naive approach:
Store all events with timestamps
On query: Filter by time window
Count matches

Problem:
β”œβ”€ 1000 rides/second Γ— 300 seconds = 300,000 events
β”œβ”€ Each event: ~100 bytes
β”œβ”€ Memory: 30 MB per window
β”œβ”€ Queries: O(n) scan every second
└─ Doesn't scale!
Enter fullscreen mode Exit fullscreen mode

The Bridge: From Storage to Streaming

1960s: Store everything, query periodically
β”œβ”€ Batch jobs run hourly/daily
└─ No real-time requirements

1990s: Time-series databases
β”œβ”€ Optimized for time-range queries
β”œβ”€ Still query-based (pull model)
└─ Seconds of latency

2010s: Stream processing (Apache Storm, Flink)
β”œβ”€ Push model (events flow through)
β”œβ”€ Maintain aggregates in memory
β”œβ”€ Sub-second latency
└─ Windowing algorithms

2026: Edge stream processing
β”œβ”€ Process at CDN edge (Cloudflare Workers)
β”œβ”€ Global real-time aggregation
β”œβ”€ Millisecond latency worldwide
└─ Powers: Live sports scores, stock tickers, IoT dashboards
Enter fullscreen mode Exit fullscreen mode

Sliding Window Algorithm

The Core Insight:

Don't store events, store aggregates!

Tumbling windows (no overlap):
Time:     0-5s | 5-10s | 10-15s
Count:     100 |  150  |  120

Sliding windows (1-second slide):
Time:     0-5s | 1-6s | 2-7s | 3-8s
Count:     100 |  110 | 125  | 135

Key: Use buckets + add/subtract
Instead of: Scan all events each time
Enter fullscreen mode Exit fullscreen mode

Implementation

#include <iostream>
#include <queue>
#include <chrono>
#include <thread>
using namespace std;

class SlidingWindowCounter {
private:
    struct Bucket {
        int timestamp;  // Bucket start time (seconds)
        int count;      // Events in this bucket
    };

    int windowSizeSeconds;
    int bucketSizeSeconds;
    int numBuckets;

    queue<Bucket> buckets;
    int totalCount;
    int currentBucketTimestamp;
    int currentBucketCount;

    int getCurrentTime() {
        return time(nullptr);
    }

    void evictOldBuckets(int now) {
        int windowStart = now - windowSizeSeconds;

        while (!buckets.empty() && buckets.front().timestamp < windowStart) {
            totalCount -= buckets.front().count;
            buckets.pop();
        }
    }

    void rotateBucket(int now) {
        if (currentBucketCount > 0) {
            buckets.push({currentBucketTimestamp, currentBucketCount});
            totalCount += currentBucketCount;
        }

        currentBucketTimestamp = now;
        currentBucketCount = 0;
    }

public:
    SlidingWindowCounter(int windowSec = 300, int bucketSec = 1) 
        : windowSizeSeconds(windowSec), 
          bucketSizeSeconds(bucketSec),
          numBuckets(windowSec / bucketSec),
          totalCount(0),
          currentBucketCount(0) {
        currentBucketTimestamp = getCurrentTime();
    }

    // Add event to window
    void addEvent() {
        int now = getCurrentTime();

        // Check if we need to rotate to new bucket
        if (now - currentBucketTimestamp >= bucketSizeSeconds) {
            rotateBucket(now);
        }

        currentBucketCount++;

        // Evict old buckets outside window
        evictOldBuckets(now);
    }

    // Get count for current window
    int getCount() {
        int now = getCurrentTime();
        evictOldBuckets(now);
        return totalCount + currentBucketCount;
    }

    void displayState() {
        cout << "Current window count: " << getCount() << "\n";
        cout << "Active buckets: " << buckets.size() << "\n";
        cout << "Current bucket: " << currentBucketCount << " events\n";
    }

    void analyzeComplexity() {
        cout << "\nπŸ” SLIDING WINDOW COMPLEXITY\n";
        cout << "═══════════════════════════════════════\n\n";

        cout << "Naive approach (scan all events):\n";
        cout << "β”œβ”€ addEvent(): O(1) - just append\n";
        cout << "β”œβ”€ getCount(): O(n) - scan all events in window\n";
        cout << "β”œβ”€ Space: O(n) - store all events\n";
        cout << "└─ For 300k events: 300k scans per query!\n\n";

        cout << "Sliding window with buckets:\n";
        cout << "β”œβ”€ addEvent(): O(1) - increment counter\n";
        cout << "β”œβ”€ getCount(): O(1) - return totalCount\n";
        cout << "β”œβ”€ evictOldBuckets(): O(b) where b = buckets to evict\n";
        cout << "β”œβ”€ Space: O(w/b) where w = window, b = bucket size\n";
        cout << "└─ For 300-sec window, 1-sec buckets: 300 buckets max\n\n";

        cout << "hidden cost:\n";
        cout << "β”œβ”€ Bucket overhead: Each bucket = 12 bytes\n";
        cout << "β”œβ”€ 300 buckets Γ— 12 = 3.6 KB (tiny!)\n";
        cout << "β”œβ”€ Trade-off: Slight memory for massive speed\n";
        cout << "└─ 300,000x less memory than storing events!\n\n";

        cout << "Granularity trade-off:\n";
        cout << "β”œβ”€ 1-second buckets: More accurate, more memory\n";
        cout << "β”œβ”€ 10-second buckets: Less accurate, less memory\n";
        cout << "└─ Choose based on precision needs!\n";
    }
};

int main() {
    cout << "\n⏱️ SLIDING WINDOW REAL-TIME AGGREGATION\n";
    cout << "═══════════════════════════════════════════════════════════\n\n";

    // 10-second window for demo (normally 300 seconds for 5 minutes)
    SlidingWindowCounter window(10, 1);

    cout << "Simulating Uber rides (10-second window)...\n\n";

    // Simulate events over time
    for (int i = 0; i < 25; i++) {
        // Add 5-10 events per second
        int eventsThisSecond = 5 + (rand() % 6);

        for (int j = 0; j < eventsThisSecond; j++) {
            window.addEvent();
        }

        if (i % 2 == 0) {  // Display every 2 seconds
            cout << "Time: " << i << "s - ";
            window.displayState();
        }

        this_thread::sleep_for(chrono::milliseconds(100));  // Simulate time
    }

    window.analyzeComplexity();

    cout << "\nπŸš€ REAL-WORLD APPLICATIONS (2026)\n";
    cout << "═══════════════════════════════════════════════════════════\n\n";

    cout << "Uber:\n";
    cout << "β”œβ”€ Real-time ride demand by region\n";
    cout << "β”œβ”€ Surge pricing calculations\n";
    cout << "└─ Driver availability metrics\n\n";

    cout << "Twitter:\n";
    cout << "β”œβ”€ Trending topics (count hashtags in 5-min window)\n";
    cout << "β”œβ”€ Tweet velocity for viral detection\n";
    cout << "└─ User engagement rates\n\n";

    cout << "Stock Market:\n";
    cout << "β”œβ”€ Trading volume in last minute\n";
    cout << "β”œβ”€ Price moving averages\n";
    cout << "└─ Volatility calculations\n\n";

    cout << "IoT/Monitoring:\n";
    cout << "β”œβ”€ Request rate (last 5 minutes)\n";
    cout << "β”œβ”€ Error rate thresholds\n";
    cout << "β”œβ”€ CPU/memory utilization trends\n";
    cout << "└─ Alert triggering logic\n";

    return 0;
}
Enter fullscreen mode Exit fullscreen mode

πŸ”„ The Evolution: Batch β†’ Real-Time β†’ Edge

See the Hidden Costs at Each Stage

1960s: Batch Processing
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Time: O(n) - process all data         β”‚
β”‚ Space: O(n) - store all data          β”‚
β”‚ Latency: Hours to days                β”‚
β”‚ Cost: Low (process once)              β”‚
β”‚ Hidden cost: Stale data               β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1990s: Time-Series Databases
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Time: O(log n) - indexed queries      β”‚
β”‚ Space: O(n) - still store everything  β”‚
β”‚ Latency: Seconds                       β”‚
β”‚ Cost: Medium (disk I/O)               β”‚
β”‚ Hidden cost: Index maintenance        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

2010s: Stream Processing (Kafka, Flink)
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Time: O(1) per event                   β”‚
β”‚ Space: O(w) - window size             β”‚
β”‚ Latency: Sub-second                   β”‚
β”‚ Cost: High (always running)           β”‚
β”‚ Hidden cost: State management         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

2026: Edge Stream Processing
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Time: O(1) per event                   β”‚
β”‚ Space: O(w) - distributed across edge β”‚
β”‚ Latency: Milliseconds globally        β”‚
β”‚ Cost: Very high (200+ locations)      β”‚
β”‚ Hidden cost: Coordination overhead    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

The pattern you need to recognize:
"Each optimization has a costβ€”
know what you're trading!"
Enter fullscreen mode Exit fullscreen mode

🎯 Problem 3: Count-Min Sketch (Approximate Frequency)

The Heavy Hitters Problem

Twitter scenario:
"Which hashtag is being used most RIGHT NOW?"

Need to track frequency of millions of hashtags
in a 1-minute sliding window.

Naive approach:
HashMap<String, Integer> counts;
counts[hashtag]++;

Problem with 1M unique hashtags:
β”œβ”€ Each entry: 24 bytes (string ref + count)
β”œβ”€ Total: 1M Γ— 24 = 24 MB
β”œβ”€ Per minute!
└─ Multiply by thousands of metrics...
Enter fullscreen mode Exit fullscreen mode

Count-Min Sketch Algorithm

#include <iostream>
#include <vector>
#include <functional>
#include <string>
using namespace std;

class CountMinSketch {
private:
    int width;   // Number of buckets per row
    int depth;   // Number of hash functions (rows)
    vector<vector<int>> sketch;
    vector<hash<string>> hashers;

    int hash(const string& item, int hashIndex) {
        // Simulate different hash functions
        size_t h = hashers[0](item + to_string(hashIndex));
        return h % width;
    }

public:
    CountMinSketch(int w = 2048, int d = 5) : width(w), depth(d) {
        sketch.resize(depth, vector<int>(width, 0));
        hashers.resize(depth);
    }

    // Increment count for item
    void add(const string& item, int count = 1) {
        for (int i = 0; i < depth; i++) {
            int bucket = hash(item, i);
            sketch[i][bucket] += count;
        }
    }

    // Estimate count for item
    int estimate(const string& item) {
        int minCount = INT_MAX;

        for (int i = 0; i < depth; i++) {
            int bucket = hash(item, i);
            minCount = min(minCount, sketch[i][bucket]);
        }

        return minCount;
    }

    size_t getMemoryUsage() {
        return depth * width * sizeof(int);
    }

    void displayAnalysis() {
        cout << "\nπŸ” COUNT-MIN SKETCH ANALYSIS\n";
        cout << "═══════════════════════════════════════\n\n";

        cout << "Configuration:\n";
        cout << "β”œβ”€ Width (buckets): " << width << "\n";
        cout << "β”œβ”€ Depth (hash functions): " << depth << "\n";
        cout << "β”œβ”€ Memory: " << (getMemoryUsage() / 1024) << " KB\n";
        cout << "└─ Can track: Unlimited unique items!\n\n";

        cout << "HashMap approach:\n";
        cout << "β”œβ”€ Space: O(n) where n = unique items\n";
        cout << "β”œβ”€ Accuracy: 100% exact\n";
        cout << "β”œβ”€ For 1M items: ~24 MB\n";
        cout << "└─ Lookup: O(1) hash table\n\n";

        cout << "Count-Min Sketch:\n";
        cout << "β”œβ”€ Space: O(1) - fixed " << (getMemoryUsage() / 1024) << " KB\n";
        cout << "β”œβ”€ Accuracy: Over-estimates (never under!)\n";
        cout << "β”œβ”€ Error bound: Ξ΅ Γ— N (controlled)\n";
        cout << "└─ Lookup: O(d) where d = depth\n\n";

        cout << "Deep Hidden lesson:\n";
        cout << "β”œβ”€ Hash collisions cause over-counting\n";
        cout << "β”œβ”€ More depth = better accuracy but more ops\n";
        cout << "β”œβ”€ More width = less collisions but more space\n";
        cout << "└─ Trade-off: Ξ΅ (error) vs Ξ΄ (confidence)\n\n";

        cout << "Why over-estimate is acceptable:\n";
        cout << "β”œβ”€ Trending topics: Top items still top\n";
        cout << "β”œβ”€ Network monitoring: Catch heavy hitters\n";
        cout << "β”œβ”€ Security: Detect DDoS (better safe than sorry)\n";
        cout << "└─ Ranking: Relative order preserved\n";
    }
};

int main() {
    cout << "\n#️⃣ COUNT-MIN SKETCH: TRACKING TRENDING TOPICS\n";
    cout << "═══════════════════════════════════════════════════════════\n\n";

    CountMinSketch cms(2048, 5);

    // Simulate hashtag usage
    vector<pair<string, int>> hashtags = {
        {"#AI", 1000},
        {"#Python", 800},
        {"#JavaScript", 750},
        {"#Cloud", 500},
        {"#DevOps", 450},
        {"#Kubernetes", 400},
        {"#React", 350},
        {"#Docker", 300},
        {"#MachineLearning", 250},
        {"#AWS", 200},
    };

    cout << "Simulating Twitter hashtag counts...\n\n";

    // Add hashtags
    for (const auto& [tag, count] : hashtags) {
        cms.add(tag, count);
    }

    cout << "Actual vs Estimated counts:\n";
    cout << string(50, '─') << "\n";

    for (const auto& [tag, actualCount] : hashtags) {
        int estimated = cms.estimate(tag);
        double error = abs(estimated - actualCount) / (double)actualCount * 100;

        cout << tag << "\n";
        cout << "  Actual: " << actualCount << "\n";
        cout << "  Estimated: " << estimated << "\n";
        cout << "  Error: " << error << "%\n\n";
    }

    cms.displayAnalysis();

    cout << "\n🌍 REAL-WORLD SCALE (2026)\n";
    cout << "═══════════════════════════════════════════════════════════\n\n";

    cout << "Twitter Trending:\n";
    cout << "β”œβ”€ Track millions of hashtags\n";
    cout << "β”œβ”€ Memory: ~10 MB (vs GBs with HashMap)\n";
    cout << "β”œβ”€ Update: Real-time as tweets arrive\n";
    cout << "└─ Query: Top 10 trends in milliseconds\n\n";

    cout << "Network Monitoring:\n";
    cout << "β”œβ”€ Track packet counts per IP\n";
    cout << "β”œβ”€ Detect DDoS (heavy hitters)\n";
    cout << "β”œβ”€ Memory: Fixed regardless of IPs\n";
    cout << "└─ Speed: Line rate processing\n\n";

    cout << "E-commerce:\n";
    cout << "β”œβ”€ Track product view counts\n";
    cout << "β”œβ”€ Identify trending items\n";
    cout << "β”œβ”€ Memory efficient across millions of SKUs\n";
    cout << "└─ Powers recommendation engines\n";

    return 0;
}
Enter fullscreen mode Exit fullscreen mode

πŸŽ“ The Hidden Costs We Highlight

Streaming vs Batch: The Full Picture

Batch Processing (Old Way):
═══════════════════════════
Visible costs:
β”œβ”€ Time: O(n) per batch
β”œβ”€ Space: O(n) storage

Hidden costs:
β”œβ”€ Stale data (hours/days old)
β”œβ”€ Batch job coordination
β”œβ”€ Failed batch retries
β”œβ”€ Peak load on infrastructure
└─ Can't react to real-time events

Stream Processing (New Way):
════════════════════════════
Visible costs:
β”œβ”€ Time: O(1) per event
β”œβ”€ Space: O(w) window state

Hidden costs:
β”œβ”€ Always-on infrastructure (24/7 cost)
β”œβ”€ State management complexity
β”œβ”€ Exactly-once semantics overhead
β”œβ”€ Backpressure handling
β”œβ”€ Late event handling
β”œβ”€ Watermarking logic
└─ Fault tolerance (checkpointing)

Important lesson:
"Real-time isn't freeβ€”it's trading
computation cost for latency.
Know what you're paying for!"
Enter fullscreen mode Exit fullscreen mode

πŸš€ From Algorithms to 2026 Systems

HyperLogLog β†’ Distributed Analytics

1970: Exact counting (Set)
β”œβ”€ Single machine
└─ Limited by RAM

2007: HyperLogLog
β”œβ”€ Single machine
β”œβ”€ Billions in KB
└─ 2% error

2026: Distributed HyperLogLog
β”œβ”€ Merge across datacenters
β”œβ”€ Global real-time counts
β”œβ”€ Sub-second aggregation
└─ Powers: Google Analytics, Facebook Insights

Example: Facebook post reach
β”œβ”€ HyperLogLog at each datacenter
β”œβ”€ Merge counts every second
β”œβ”€ Global reach estimate in real-time
└─ Billions of users, KB of memory!
Enter fullscreen mode Exit fullscreen mode

Sliding Windows β†’ Edge Computing

1990: Database time-range queries
β”œβ”€ Pull model (query when needed)
└─ Seconds of latency

2010: Stream processing (Kafka + Flink)
β”œβ”€ Push model (events flow)
β”œβ”€ Centralized processing
└─ Sub-second latency

2026: Edge stream processing
β”œβ”€ Process at CDN edge (200+ locations)
β”œβ”€ Local windowing, global aggregation
β”œβ”€ Millisecond latency worldwide
└─ Powers: Live sports scores, IoT dashboards

Example: Cloudflare Analytics
β”œβ”€ Sliding windows at each edge location
β”œβ”€ Aggregate to region, then global
β”œβ”€ Real-time dashboard updates
└─ Handles billions of requests/day
Enter fullscreen mode Exit fullscreen mode

πŸ’‘ Practice Problems

Problem 1: Design Twitter's Trending Algorithm

Requirements:
β”œβ”€ Process 6,000 tweets/second
β”œβ”€ Track hashtag frequency (last 5 minutes)
β”œβ”€ Update trending list every 10 seconds
β”œβ”€ Return top 10 trending hashtags
β”œβ”€ Memory constraint: < 100 MB

Your algorithm must:
1. Count hashtags in sliding window
2. Detect velocity (rapidly increasing)
3. Handle billions of unique hashtags
4. Real-time updates

Hints:
β”œβ”€ Count-Min Sketch for frequency
β”œβ”€ Sliding windows for time decay
β”œβ”€ Min-heap for top-K
└─ Velocity = (current_window - previous_window) / time
Enter fullscreen mode Exit fullscreen mode

Problem 2: Design Stock Market VWAP Calculator

VWAP = Volume Weighted Average Price
Formula: Ξ£(price Γ— volume) / Ξ£(volume) over time window

Requirements:
β”œβ”€ 1000s of stocks
β”œβ”€ 100s of trades per second per stock
β”œβ”€ Calculate VWAP for last hour
β”œβ”€ Update every second
β”œβ”€ Memory efficient

Your algorithm must:
1. Maintain sliding window of trades
2. Compute running sum of (price Γ— volume)
3. Compute running sum of volume
4. Handle high-frequency updates

Hints:
β”œβ”€ Sliding window with buckets
β”œβ”€ Maintain two sums (priceΓ—volume, volume)
β”œβ”€ Incremental updates (add new, subtract old)
└─ Per-stock state management
Enter fullscreen mode Exit fullscreen mode

Problem 3: Design Real-Time Anomaly Detection

Requirements:
β”œβ”€ Monitor API request rates
β”œβ”€ Detect sudden spikes (DDoS, viral content)
β”œβ”€ Calculate baseline (normal behavior)
β”œβ”€ Alert when > 3Γ— baseline
β”œβ”€ Memory: O(1) per metric

Your algorithm must:
1. Track request count (sliding window)
2. Calculate moving average (baseline)
3. Detect spikes in real-time
4. Minimize false positives

Hints:
β”œβ”€ Exponential moving average for baseline
β”œβ”€ Z-score for anomaly detection
β”œβ”€ Sliding window for current rate
└─ Configurable sensitivity threshold
Enter fullscreen mode Exit fullscreen mode

🎯 Key Takeaways

1. STREAMING = BOUNDED MEMORY FOR INFINITE DATA
   Must use O(1) or O(w) space for ∞ data

2. APPROXIMATE IS OFTEN ENOUGH
   Trade-off: Accuracy vs Memory (Part 1!)
   β”œβ”€ HyperLogLog: 2% error, 1000x less memory
   β”œβ”€ Count-Min: Over-estimate, fixed memory
   └─ Sliding Window: Bucketing reduces precision

3. HIDDEN COSTS OF REAL-TIME
   β”œβ”€ Always-on infrastructure ($$)
   β”œβ”€ State management complexity
   β”œβ”€ Fault tolerance overhead
   β”œβ”€ Late data handling
   └─ Coordination across systems

4. EVOLUTION: BATCH β†’ STREAM β†’ EDGE
   1960s: Store all, query later
   2026: Process at edge, aggregate globally
   └─ Each step: traded storage for speed

5. 2026 SYSTEMS ARE HYBRID
   β”œβ”€ Streaming for real-time (approximate)
   β”œβ”€ Batch for accuracy (exact)
   β”œβ”€ Lambda architecture (both!)
   └─ Edge for latency (distributed)
Enter fullscreen mode Exit fullscreen mode

πŸ—ΊοΈ Your Streaming Journey

Where you are now:
βœ“ Time/space trade-offs (Part 1)
βœ“ Algorithm design (Part 2)
βœ“ Graphs (Part 3)
βœ“ Production systems (Part 4)
βœ“ Database internals (Part 5)
βœ“ Caching layers (Part 6)
βœ“ Real-time streaming (Part 7) ← YOU ARE HERE

Your growing insight:
β”œβ”€ See hidden costs everywhere
β”œβ”€ Understand approximation trade-offs
β”œβ”€ Can design for infinite data
β”œβ”€ Know when real-time is worth the cost
└─ Ready for AI/ML algorithms!

Next steps:
β–‘ Part 8: AI/ML algorithms (recommendations, LLMs)
β–‘ Part 9: Security & cryptography
β–‘ Part 10: Autonomous systems
Enter fullscreen mode Exit fullscreen mode

πŸ’¬ Your Turn

Build these yourself:

  1. Implement HyperLogLog and test accuracy vs set size
  2. Build sliding window counter with different bucket sizes
  3. Create Count-Min Sketch and compare to HashMap
  4. Measure memory: Exact vs Approximate

What would you ask?

  • "What's the hidden cost of real-time?"
  • "Why is approximate good enough?"
  • "When would you NOT use streaming?"

Share your findings! What's your error rate vs memory trade-off? πŸ“Š


Infinite data needs finite memory. Streaming algorithms are the bridge. Master approximation, and you master real-time systems. πŸ“‘βœ¨


🎯 Coming Up Next: Part 8

AI & Machine Learning Algorithm Engineering

From counting data to learning from it:
β”œβ”€ How recommendation algorithms work
β”œβ”€ Transformer attention mechanism (ChatGPT)
β”œβ”€ Vector similarity at scale
β”œβ”€ Online learning & bandit algorithms

Same principles: Hidden costs, trade-offs, real-time!
Enter fullscreen mode Exit fullscreen mode

Stay tuned! πŸ€–

Top comments (0)