DEV Community

Cover image for System Design: Global Top‑K Heavy Hitters System
ZeeshanAli-0704
ZeeshanAli-0704

Posted on • Edited on

System Design: Global Top‑K Heavy Hitters System

Designing a Global Top K Heavy Hitters System with Kafka, Stream Processing, and Redis Sorted Sets

Table of Contents


System Overview

We need a globally scalable system to identify Top K “Heavy Hitters” (most‑viewed or most‑played posts/songs) from a continuous event stream. The system must:

  • Efficiently aggregate and rank by views in near real time
  • Support regional and global Top K queries
  • Deliver high availability, low latency, and elastic scale
  • Maintain predictable performance as data volume and users grow

Key idea: Move ranking work from read time (expensive ORDER BY in databases) to write time using Redis Sorted Sets. Stream processors incrementally update scores; Redis keeps the set sorted so reads are fast and predictable.

Back to top


Key Components

  • Event Streaming
    • Apache Kafka: durable, partitioned log for view/play events.
  • Stream Processing
  • Fast Rank Index + Cache
    • Redis Sorted Sets (ZSETs): millisecond Top K reads and rank queries.
  • Operational Database (recommended for history/analytics)
  • API Layer
    • REST/GraphQL service exposing Top K endpoints (regional and global).
  • Global Aggregator
    • Periodically merges regional Top K into a “global Top K” (min‑heap or Redis ZSET).
  • Observability
    • Metrics (Prometheus/Grafana), logs (ELK/Cloud), tracing (OpenTelemetry).

Back to top


High Level Architecture

  • Clients → Kafka (views/play events)
  • Stream processors → Incremental aggregates → Redis (Top K per region/window)
  • API → Reads Redis (falls back to DB if needed)
  • Global aggregator → Merges regional Top K → Global Top K in Redis

Back to top


Detailed Flow

1. Data Ingestion

  • Producers: Web/mobile clients emit events { post_id, region_id, timestamp, (user_id optional) }.
  • Kafka:
    • Topic partitioned by region_id for parallelism/locality.
    • Use Avro/Protobuf with Schema Registry for safe schema evolution.

2. Stream Processing

  • Goal: Incrementally compute per‑region counts and maintain Top K without scanning databases.
  • Optional windowing: compute “trending” (e.g., 5‑minute tumbling or sliding windows).
  • Update Redis:
    • Exact: ZINCRBY per post, maintain a full leaderboard.
    • Bounded Top K: keep only the top K in a compact ZSET; ignore long tail.

3. Database Storage

Recommended for durability, analytics, and recovery:

  • Persist time‑bucketed aggregates (e.g., end‑of‑window or periodic checkpoints) to Cassandra/MongoDB.

4. Caching and Rank Index (Redis)

  • Per‑region ZSET: region:<region_id>:top_k
  • Per‑window ZSET: region:<region_id>:top_k:<window_id>
  • Global ZSETs: global:top_k, global:top_k:<window_id>

5. Regional Aggregation

  • Preferred: Write‑through from stream processors to Redis, keeping Redis as the live rank index.
  • Reconcile occasionally from DB if needed (e.g., post‑incident).

6. Global Aggregation

  • Collect regional Top K entries (R regions → ≤ K × R items).
  • Compute global Top K with a min‑heap of size K or a Redis ZSET (trim with ZREMRANGEBYRANK).

7. API Layer

  • GET /top-posts?region=<region>&timeframe=<window>&k=<N>
  • Fast path: read from Redis; optional DB fallback for cache misses.

Back to top


Flow Diagram

1) Client Event
   -> Kafka (topic: views; partitioned by region_id)

2) Stream Processor
   -> Aggregates (optionally windowed)
   -> Redis (ZSET per region/window) for Top K
   -> Cassandra/MongoDB for historical aggregates

3) API Service (GET /top-posts)
   -> Redis read:
        Hit: return Top K
        Miss: optional DB query + repopulate Redis

4) Global Aggregator (periodic)
   -> Fetch regional Top K
   -> Merge to global Top K (min‑heap or Redis ZSET)
   -> Write to Redis (global:top_k)
Enter fullscreen mode Exit fullscreen mode

Back to top


Redis Top K Strategies

A) Exact Leaderboards (Full ZSET)

  • Keep every post in a ZSET per region with total_views as score.
  • Pros: Exact rank for every item; easy “neighbors by rank.”
  • Cons: Memory‑heavy at very high cardinality; a single ZSET key lives on one Redis shard.

B) Bounded Top K (Recommended for Massive Scale)

  • Maintain only the top K in Redis—memory and latency remain constant regardless of total population.
  • Update logic:
    1. If ZCARD(key) < KZADD
    2. Else fetch current minimum (Kth): ZRANGE key 0 0 WITHSCORES
    3. If new_score > min_score:
      • MULTIZADD key new_score post_idZPOPMIN key 1EXEC
    4. Else skip (not in Top K)

C) Hybrid Approach

  • Redis Top K for UI speed
  • Cassandra/MongoDB or Redis KV for per‑post totals and analytics
  • Global Top K by merging regional Top Ks

Handling Ties

  • Deterministic order via:
    • score = primary_score + tiny epsilon (e.g., fractional timestamp), or
    • secondary tie‑break in app (e.g., latest update wins)

Back to top


Windowing Deep Dive

Event Time, Watermarks, Allowed Lateness

  • Event time: Use event timestamps (not processing time) for correctness.
  • Watermarks: Indicate event‑time progress and when windows can be closed.
  • Allowed lateness: How long late events can still adjust a closed window.

Window Types

  • Tumbling: Fixed, non‑overlapping (e.g., 5m, 1h). Simple and cache‑friendly.
  • Sliding: Overlapping windows (size=5m, slide=1m). Smoother, more compute.
  • Session: Gap‑based, dynamic. Good for bursty behavior.

Windowed Keys and TTLs

  • Key pattern:
    • region:<r>:top_k:<window_id>
    • global:top_k:<window_id>
  • Apply TTLs to auto‑expire old windows.
  • For late events within allowed lateness: update window aggregates and adjust Redis ZSETs; beyond lateness: route to DLQ/audit.

Back to top


Optimizations

Rate Limiting

  • Fixed/tumbling window (per minute):
    • INCR rate:<user>:<minute>; EXPIRE rate:<user>:<minute> 60
  • Sliding window:
    • ZADD rate:<user> <now_ms> <now_ms>
    • ZREMRANGEBYSCORE rate:<user> 0 <now_ms-60000>
    • ZCARD rate:<user> then compare to limit
  • Token bucket: Maintain tokens with server time; refill based on elapsed duration.

Caching Keys

  • Regional live: region:<r>:top_k
  • Regional windowed: region:<r>:top_k:<window>
  • Global live: global:top_k
  • Global windowed: global:top_k:<window>

Partitioning

  • Kafka: partition by region_id (and optionally hash of post_id for balance).
  • Cassandra: partition by (region_id, window_start); avoid hot partitions.

Approximate Heavy Hitters

  • For extreme cardinality: Use Count‑Min Sketch + min‑heap for candidates; confirm via exact counts before Redis writes.

Write Minimization and Concurrency

  • Cache the current Kth threshold to avoid unnecessary writes.
  • Batch Redis operations; use pipelining.
  • Use MULTI/EXEC for “add + evict” to avoid races.

Back to top


Scaling for Global Use

  • Per‑region leaderboards: one ZSET per region/window.
  • Global aggregation: merge regional Top Ks (≤ K × R items) via min‑heap or Redis ZSET.
  • Redis Cluster: One key lives on one shard—keep Top K keys small; avoid single global ZSETs with massive cardinality.
  • Multi‑region Kafka: Use MirrorMaker or cluster linking for DR and aggregation.
  • Database: Region‑specific Cassandra clusters; tune replication for availability vs. latency.

Back to top


Data Modeling

Cassandra Example

  • views_by_region_and_window
    • PRIMARY KEY ((region_id, window_start), post_id)
    • Columns: count
  • posts
    • PRIMARY KEY (post_id)
    • Columns: content, author, created_at, metadata…

Redis Key Design

  • region:<region_id>:top_k → ZSET(post_id => score)
  • region:<region_id>:top_k:<window_id> → ZSET
  • global:top_k and global:top_k:<window_id> → ZSET

Back to top


Capacity Planning and Performance

  • Full (exact) ZSET: Memory grows with unique posts; a single ZSET must fit one Redis shard.
  • Bounded Top K: Memory bounded by K; independent of total population size—few MBs at K=10k with compact IDs.
  • Throughput: ZINCRBY/ZADD ~ O(log N). With bounded Top K, N=K → effectively constant time.
  • Persistence/HA: Replicas double memory; snapshots/AOF add overhead. Size clusters accordingly and monitor fragmentation/latency.

Back to top


Fault Tolerance and Recovery

  • Kafka: Retention for reprocessing; at‑least‑once or exactly‑once semantics depending on framework.
  • Stream processors: Checkpointing and state backends (e.g., Flink RocksDB) for recovery.
  • Redis: Replicas and automatic failover; reconcile from Cassandra/MongoDB after incidents if needed.
  • Idempotency: Deduplicate via event IDs or rely on EOS processing to prevent over‑counting.

Back to top


Security, Privacy, and Compliance

  • Encrypt in transit (TLS) and at rest where supported.
  • Place data stores in private networks (VPC) and use strict ACLs/roles.
  • Apply least privilege for service accounts and Redis ACLs.
  • Enable audit logs for API and data access.
  • Verify any third‑party services/tools align with your organization’s security and compliance guidelines before adoption.

Back to top


Example API Request and Responses

Request

GET /top-posts?region=NA&timeframe=24h&k=10
Enter fullscreen mode Exit fullscreen mode

Response

{
  "region": "NA",
  "timeframe": "24h",
  "top_posts": [
    {
      "post_id": "123",
      "author": "Artist1",
      "view_count": 120000,
      "content": "Post Content 1"
    },
    {
      "post_id": "456",
      "author": "Artist2",
      "view_count": 110000,
      "content": "Post Content 2"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Back to top


Code Snippets (Appendix)

Bounded Top K Update (K = 10,000)

# Pseudocode flow using Redis commands
# Assume key = region:<r>:top_k, K = 10000, new_score and post_id are known

# If size < K → add
ZCARD region:<r>:top_k
# If result < 10000:
ZADD region:<r>:top_k <new_score> <post_id>

# Else get current minimum (Kth)
ZRANGE region:<r>:top_k 0 0 WITHSCORES
# If new_score > min_score:
MULTI
  ZADD region:<r>:top_k <new_score> <post_id>
  ZPOPMIN region:<r>:top_k 1
EXEC
Enter fullscreen mode Exit fullscreen mode

Read Top K

ZREVRANGE region:<r>:top_k 0 9 WITHSCORES
Enter fullscreen mode Exit fullscreen mode

Sliding‑Window Rate Limiting (per user, last 60s)

# now_ms is the current timestamp in milliseconds
ZADD rate:<user_id> <now_ms> <now_ms>
ZREMRANGEBYSCORE rate:<user_id> 0 <now_ms-60000>
ZCARD rate:<user_id>   # Compare with limit
Enter fullscreen mode Exit fullscreen mode

Back to top


References & Further Reading

Author links and hashtag:

Back to top

Top comments (0)