Why I did this to myself and why distributed systems are hard
You can read the Google MapReduce paper in an afternoon. It's 13 pages. I've read it a few times now, and every time I thought I understood it—until I tried to build it.
The scenario that got me started: you have a 1TB log file. You need word counts. What do you do?
Option 1: You load it all into memory and your laptop catches fire.
Option 2: Process line by line on one machine, but you graduate before it finishes.
Option 3: Split the work across machines and combine their results at the end.
I went with option 3. Google figured this out in 2004. Twenty years later, I'm in my room late at night wondering why worker 2 keeps dying.
The formal problem is this: given input too large for one machine's memory or too slow for one machine's CPU, partition it into M splits, process each in parallel, then combine results. MapReduce formalizes this as two functions (map and reduce) with the framework handling distribution, fault tolerance, and the shuffle phase where data moves across the network. Reading the paper took an hour. Making it work ate multiple weekends.
Clone and run it yourself (5 minutes)
Before we dive into the theory, here's how to actually run this. The whole point of building it was to make distributed systems tangible.
# Get the code
git clone https://github.com/Ajodo-Godson/MapReduce
cd MapReduce
# Start the cluster with web dashboard
docker-compose -f docker-compose.benchmark.yml up --build
Open your browser to http://localhost:5000 to see the real-time dashboard showing:
- Worker status (idle, busy, or failed)
- Job progress through map and reduce phases
- Live event log as tasks are assigned and completed
You should see output like:
[2025-12-05 10:30:15] Master server started on port 50051
[2025-12-05 10:30:17] Worker worker1 registered
[2025-12-05 10:30:17] Worker worker2 registered
[2025-12-05 10:30:18] Worker worker3 registered
[2025-12-05 10:30:18] Created 179 map tasks from input
[2025-12-05 10:30:18] Assigned map_0 to worker1
[2025-12-05 10:30:19] Task map_0 completed by worker1
...
Now break it (in a new terminal):
# Kill a worker mid-job
docker stop benchmark_worker2
# Watch recovery in the dashboard and logs
# Worker2 turns "failed", its tasks get reassigned to surviving workers
Clean up:
docker-compose -f docker-compose.benchmark.yml down
Other ways to run it
# Run automated benchmarks (1 worker vs 3 workers, failure recovery)
python3 examples/benchmark.py
# Run DAG scheduler tests (unit tests, no Docker needed)
python3 examples/dag_integration_test.py --test unit
# Run full end-to-end pipeline with real data chaining
python3 examples/dag_integration_test.py --test pipeline
If you want to run without the dashboard (basic docker-compose):
pip install -r requirements.txt
./scripts/generate_proto.sh
docker-compose up -d
docker-compose logs -f master
Map, Shuffle, Reduce: the part that actually makes sense
The core loop is almost embarrassingly simple. I kept telling myself "this is easy" until things started breaking.
Three steps:
- Map: Take input key-value pairs, emit intermediate key-value pairs
- Shuffle: Group all values by key across all mappers (this is where your network cries)
- Reduce: For each unique key, combine all its values into a final result
The paper writes it like this:
Map : (k1, v1) → list(k2, v2)
Reduce : (k2, list(v2)) → list(v2)
Notice the types change. Your input keys (k_1, v_1) are different from your intermediate keys (k_2, list(v_2)). That tripped me up initially because I kept trying to make them match. But this is intentional. The map function transforms your data into a new shape that makes aggregation possible. Your input might be (filename, file_contents), but your output is (word, count). Completely different domains.
The partitioning matters too. After map emits pairs, they're bucketed using hash(key) mod R where R is the number of reducers. This guarantees all occurrences of the same key end up at the same reducer, which is critical for correctness.
Word count, the "hello world" of MapReduce:
MAP PHASE (parallel across workers):
Input: "the cat sat on the mat"
Mapper emits: ("the", 1), ("cat", 1), ("sat", 1), ("on", 1), ("the", 1), ("mat", 1)
SHUFFLE PHASE (group by key):
"the" -> [1, 1]
"cat" -> [1]
"sat" -> [1]
"on" -> [1]
"mat" -> [1]
REDUCE PHASE (aggregate):
"the" -> 2
"cat" -> 1
...
Here's the actual implementation. This is what runs in each worker:
# src/framework/mapper.py
def word_count_map(filename, contents):
"""Map function: (filename, contents) -> list(word, 1)"""
words = contents.lower().split()
for word in words:
word = ''.join(c for c in word if c.isalnum())
if word:
yield (word, 1)
# src/framework/reducer.py
def word_count_reduce(word, counts):
"""Reduce function: (word, [1,1,1,...]) -> total_count"""
return sum(int(c) for c in counts)
The map phase applies the map function and partitions the output:
# src/framework/mapper.py
class MapPhase:
def __init__(self, map_function, partitioner):
self.map_function = map_function
self.partitioner = partitioner
def execute(self, input_key, input_value):
"""Execute map function and partition results"""
# Execute user's map function
intermediate_pairs = self.map_function(input_key, input_value)
# Partition the intermediate key-value pairs
partitions = [defaultdict(list) for _ in range(self.partitioner.num_partitions)]
for key, value in intermediate_pairs:
partition_id = self.partitioner.get_partition(key)
partitions[partition_id][key].append(value)
return [dict(p) for p in partitions]
Notice how the partitioner decides which reducer will handle each key. All occurrences of "the" from all mappers end up in the same partition, so one reducer can sum them all.
But word count is almost too simple to be interesting. The paper lists other examples that helped me understand the generality of this model:
Distributed Grep
You want to find all lines in a massive log file that match a regex pattern. The map function checks each line against the pattern and emits the line if it matches. The reduce function is just the identity function: it passes through the intermediate data unchanged. No aggregation needed because you're not combining anything, just filtering.
Map: (filename, line) → if regex.match(line): emit(line, "")
Reduce: (line, [""]) → line
This is the simplest possible MapReduce job. The "reduce" does nothing. But you still get parallelism across all your input files.
URL Access Frequency
You have terabytes of web server logs and want to know which URLs get the most traffic. Each log entry contains a URL. Map emits (URL, 1) for each request. Reduce sums all the 1s for each URL.
Map: (log_file, entry) → emit(entry.url, 1)
Reduce: (url, [1, 1, 1, ...]) → (url, sum)
This is exactly word count, but for URLs instead of words. Same pattern, different domain.
Reverse Web-Link Graph
Given a collection of web pages, you want to find all pages that link TO a given page. For each page, map scans for outgoing links and emits (target_url, source_url). Reduce collects all source URLs for each target.
Map: (source_url, page_html) → for each link in page: emit(link.target, source_url)
Reduce: (target_url, [source1, source2, ...]) → (target_url, list_of_sources)
The output tells you: "These 47 pages link to google.com." This is useful for PageRank-style algorithms where you need to know the incoming link structure.
Inverted Index
This is how search engines work. Given a collection of documents, build an index that maps each word to the list of documents containing it. Map emits (word, document_id) for each word in each document. Reduce collects all document IDs per word.
Map: (doc_id, contents) → for each word in contents: emit(word, doc_id)
Reduce: (word, [doc1, doc2, doc3, ...]) → (word, sorted_list_of_docs)
Now when someone searches for "distributed systems," you look up "distributed" and "systems" in your index, intersect the document lists, and return the results. Google processes petabytes of web pages this way.
The key insight: all these problems follow the same computational pattern. You write a map function and a reduce function. The framework handles distribution, fault tolerance, and shuffling. That separation of concerns is what made MapReduce revolutionary in 2004. Not the algorithm itself, but the programming model that let ordinary engineers write distributed programs without thinking about distributed systems.
Okay cool. Now do that across three machines where any of them might die mid-sentence.
How I wired the boxes together
Classic master-worker setup. One boss node tells everyone what to do. Workers do the actual math.
+------------------+
| Master |
| (the boss) |
+--------+---------+
|
+-----------------+-----------------+
| | |
v v v
+----------+ +----------+ +----------+
| Worker 1 | | Worker 2 | | Worker 3 |
+----------+ +----------+ +----------+
This is a centralized architecture. The master maintains all state: which tasks exist, which workers are alive, what's pending, what's running, what's done. Workers don't coordinate with each other. They only talk to the master. Ask for work, do it, report back, repeat. This keeps the worker logic simple: no consensus protocols, no peer-to-peer negotiation, no complex state to maintain between tasks.
The upside is simplicity. There's one source of truth. No consensus needed between workers. No Paxos, no Raft, no complex protocols for agreeing on who does what. The master decides, and that's that.
The downside is obvious: if the master dies, everything stops. This is a single point of failure. The paper basically shrugs at this, says master failure is "unlikely" and that clients can just retry. A real fix would be something like Raft, where workers vote on a new leader if the master disappears. The master could checkpoint its state to persistent storage, and a standby could take over.
I thought about adding it. Then I thought about my deadline, and planned to add this later on.
Moving on...
gRPC: How the boxes actually talk
I hadn't touched protobuf before this project. Think JSON, but binary and way grumpier about types.
The original MapReduce paper uses custom RPC mechanisms. Google in 2004 had their own infrastructure for everything. For my implementation, gRPC made sense: it handles serialization, connection management, and generates type-safe client/server stubs from a .proto file.
Here's the deal: gRPC lets workers call functions on the master like they're local. No manual socket code. No parsing HTTP responses. You define a contract and the tooling generates the boilerplate.
// protos/mapreduce.proto
service MasterService {
rpc RegisterWorker(WorkerInfo) returns (RegisterResponse);
rpc SendHeartbeat(Heartbeat) returns (HeartbeatAck);
rpc RequestTask(TaskRequest) returns (TaskAssignment);
rpc ReportTaskComplete(TaskResult) returns (TaskAck);
rpc ReportIntermediateFiles(IntermediateReport) returns (TaskAck);
rpc GetIntermediateLocations(PartitionRequest) returns (IntermediateLocations);
}
Six RPCs total. The first four handle core worker-master communication. The last two handle the shuffle phase (map outputs → reduce inputs).
Here are the key message structures:
message TaskAssignment {
string task_id = 1;
string task_type = 2; // "map" or "reduce"
string input_data = 3; // Text chunk for map, file locations for reduce
int32 partition_id = 4; // Which partition this task handles
int32 num_reduce_tasks = 5; // R value for partitioning
bool has_task = 6; // False if no work available
int32 task_sequence_number = 7; // For idempotency
}
message TaskResult {
string worker_id = 1;
string task_id = 2;
bool success = 3;
string output_data = 4; // JSON-encoded reduce output
string error_message = 5;
int32 task_sequence_number = 6;
}
The task_sequence_number field is interesting. It's my defense against the "zombie worker" problem. Each task reassignment increments this number. If worker A receives task map_0 (seq=0), fails, and worker B receives the same task (seq=1), a late completion report from A (seq=0) is recognized as stale and ignored. This prevents duplicate work from corrupting results. More on this later when I talk about failure handling.
For the shuffle phase:
message IntermediateReport {
string task_id = 1;
string worker_id = 2;
repeated FileLocation locations = 3; // One file per partition
}
message FileLocation {
int32 partition_id = 1;
string file_path = 2;
int64 file_size = 3;
}
message IntermediateLocations {
int32 partition_id = 1;
repeated WorkerFileLocation locations = 2; // All files for this partition
}
When a map task completes, it reports locations for R intermediate files. When a reduce task starts, it queries for all locations for its partition.
Worker calls stub.RequestTask(). Master responds with a task. Magic? No. But it felt like it after years of writing REST endpoints and manually serializing JSON.
Protocol Buffers (and the generated gRPC stubs) buy you more than a stricter type system. The proto compiler emits strongly-typed classes and client/server stubs so you no longer write repetitive serialization and parsing code by hand — you call methods and pass objects. That reduces boring bugs and speeds development. The protobuf wire format is compact and faster to parse than JSON, which matters when you are doing M × R intermediate-file lookups and many small RPCs during the shuffle.
Protobufs also make schema evolution and rolling upgrades easier: unknown fields are ignored by older clients, you can add optional fields safely, and you can reserve field numbers to avoid accidental reuse. Those properties let you evolve message shapes without bringing the whole cluster down. On the runtime side, pairing protobufs with gRPC (HTTP/2) gives useful features for this project — streaming RPCs for continuous heartbeats or long file-location fetches, built-in support for deadlines and cancellation, and multiplexed connections that reduce connection churn.
There are trade-offs: the binary format is less human readable than JSON and you must manage .proto changes carefully (don’t rename or reuse field numbers). You also need the toolchain (protoc) in your dev flow. Even so, for a small distributed system where correctness and clear contracts between processes matter, the schema-driven, generated-stub model removed a huge class of serialization bugs and made the master/worker surface area easy to reason about and test.
A few more practical gRPC notes that mattered while building this project:
Unary vs streaming RPCs. gRPC supports standard (unary) calls and three streaming modes (server-, client-, and bi-directional streaming). For this project I used simple unary RPCs for the core interactions (register, request task, report completion, report intermediate locations) because the semantics are easy to reason about and they matched the request/response nature of the protocol. Streaming is handy for other patterns though. For example, a server-stream could push status updates to a long-lived client connection, or a bidirectional stream could carry a heartbeat and task-control messages over a single multiplexed channel.
HTTP/2 benefits. gRPC runs over HTTP/2, which gives connection multiplexing and lower per-call overhead compared to opening lots of short HTTP/1.1 connections. That matters during the shuffle where many small RPCs and file-location lookups happen: fewer TCP/TLS handshakes, flow control, and header compression reduce latency and CPU overhead on busy nodes.
Operational trade-offs. gRPC is a dependency and its binary format is less human-friendly than JSON logs, so you often pair it with good logging and health-check endpoints. Also, some network middleboxes and load balancers are not gRPC-aware and can interfere with streaming or HTTP/2 features.
All told, gRPC didn't just save lines of code, it also gave clear runtime primitives (streams, deadlines, status codes, and multiplexing) that matched the needs of a master/worker system and made testing and evolution far less annoying.
The full worker loop (this is the good stuff)
Here's what actually runs in each container. This is the complete worker lifecycle:
# src/worker/client.py (simplified for clarity)
class Worker:
def run(self):
# 1. Register with master
if not self.register():
print(f"Failed to register. Exiting.")
return
# 2. Start heartbeat thread (runs in background)
heartbeat_thread = threading.Thread(target=self.send_heartbeat, daemon=True)
heartbeat_thread.start()
# 3. Main task loop
while self.running:
# Request work from master
task = self.request_task()
if task and task.task_type == 'map':
# Execute map, partition results into R files
partitions = self.executor.execute_map(
task_id=task.task_id,
input_data=task.input_data
)
# Write partitioned output to local disk
file_paths = self.intermediate_manager.write_partitioned_output(
task.task_id, partitions
)
# Tell master where intermediate files are
self.report_intermediate_files(task.task_id, file_paths)
self.report_result(task.task_id, success=True)
elif task and task.task_type == 'reduce':
# SHUFFLE: Fetch intermediate data from ALL mappers for this partition
intermediate_locations = json.loads(task.input_data)
# Merge all intermediate files for this partition
grouped_data = self.executor._fetch_and_group_intermediate(
intermediate_locations
)
# Sort by key, then reduce
sorted_data = sorted(grouped_data.items())
result = self.executor.reduce_phase.execute(sorted_data)
# Report final result to master
self.report_result(task.task_id, success=True, output=json.dumps(result))
else:
time.sleep(2) # No tasks available, wait and retry
What makes this interesting:
- Map tasks create R intermediate files (one per reduce partition)
- Reduce tasks fetch files from ALL map workers (M × R transfers total)
- Everything is asynchronous. Workers don't wait for each other
- The heartbeat runs in a separate thread so work doesn't block the "still alive" signal
The master's brain
What it keeps track of
For every task (map or reduce), the master stores:
- Current state: Pending, Running, Completed, or Failed
- Which worker has it
- Where the intermediate files ended up
- Timestamps. Attempt counts. Stuff for debugging at 1 a.m.
Here's the actual data structure:
# src/master/state.py
class MasterState:
def __init__(self):
self.workers = {} # worker_id -> worker_info
self.tasks = {} # task_id -> task_info
self.completed_task_ids = set() # For idempotency checking
self.intermediate_files = {} # task_id -> {partition_id -> location}
self.lock = threading.Lock() # Thread safety for concurrent access
self.events = [] # Recent events for visualization
# Worker tracking
def add_worker(self, worker_id, info):
with self.lock:
self.workers[worker_id] = {
'host': info.get('host'),
'port': info.get('port'),
'status': 'idle',
'last_heartbeat': time.time(),
'current_tasks': [],
'tasks_completed': 0
}
# Task management
def add_task(self, task_id, task_info):
with self.lock:
self.tasks[task_id] = {
**task_info,
'status': 'pending',
'created_at': time.time(),
'assigned_worker': None,
'assigned_at': None,
'completed_at': None,
'attempts': 0,
'sequence_number': 0, # Incremented on each assignment
'duplicate_completions': [] # Track late arrivals
}
Each task tracks its full lifecycle. The sequence_number is particularly important: it's incremented every time a task is reassigned. This lets the master distinguish between a legitimate completion and a stale completion from a worker that was presumed dead but actually just slow.
Thread safety matters
The master handles concurrent requests from multiple workers. Worker 1 might be reporting a task complete while worker 2 is requesting a new task and worker 3 is sending a heartbeat. Without proper locking, you get race conditions:
# BAD: Race condition
def assign_task(self, task_id, worker_id):
task = self.tasks[task_id]
task['status'] = 'running' # Worker 1 reads 'pending' here
task['assigned_worker'] = worker_id # Worker 2 also reads 'pending', gets same task
# GOOD: Lock protects the critical section
def assign_task(self, task_id, worker_id):
with self.lock:
task = self.tasks[task_id]
if task['status'] != 'pending':
return False # Task already assigned
task['status'] = 'running'
task['assigned_worker'] = worker_id
task['sequence_number'] += 1
return True
I use a single coarse-grained lock for simplicity. A production system might use finer-grained locking or lock-free data structures for better concurrency, but for this scale, a single lock works fine.
How a task lives and dies
PENDING --> RUNNING --> COMPLETED
|
+--> FAILED --> PENDING (try again, buddy)
Worker fails? Its tasks go back to pending. Some other worker picks them up. This is the entire fault tolerance story. Everything else is details.
When workers disappear (and they will)
Your cluster will eat dirt. Usually when you're not looking. The question isn't "will something fail" but "how badly will I handle it when it does."
In distributed systems, we talk about failure models. The simplest is crash-stop: a node either works correctly or stops entirely. No weird half-states, no corrupted messages. MapReduce assumes crash-stop failures for workers. If a worker dies, it just disappears. The master detects this absence and reacts.
The harder failure model is Byzantine: nodes might behave arbitrarily, including maliciously. MapReduce doesn't handle this. If a worker starts returning garbage data, you've got problems. The assumption is that you control your cluster and your machines aren't actively lying to you.
Heartbeats: Are You Still Alive?
Workers ping the master every few seconds. "Hey, still here." If the master stops hearing from a worker, it assumes the worst.
def check_worker_health(self):
current_time = time.time()
dead_workers = []
for worker_id, last_heartbeat in self.worker_heartbeats.items():
if current_time - last_heartbeat > self.heartbeat_timeout:
dead_workers.append(worker_id)
for worker_id in dead_workers:
self.handle_worker_failure(worker_id)
The timeout is a tuning parameter. Too short and you get false positives. A worker pauses for GC and suddenly it's "dead." Too long and actual failures take forever to detect, leaving tasks stuck.
I set mine to 10 seconds with heartbeats every 3 seconds. That gives a worker three chances to check in before the master gives up on it. In production, you'd tune this based on your network latency and acceptable detection time.
This part nearly broke me. Threading and concurrency in Python is... not fun. I had this bug where the code would correctly identify a dead worker but then just... not reassign its tasks. The failure detection worked. The recovery didn't.
The deadlock bug (with before/after code):
Here's what the broken code looked like:
# BEFORE (broken) - causes deadlock
def handle_worker_failure(self, worker_id):
with self.lock:
self.workers[worker_id]['status'] = 'failed'
self.reassign_worker_tasks(worker_id) # Tries to acquire same lock!
def reassign_worker_tasks(self, worker_id):
with self.lock: # DEADLOCK: already holding this lock from above
for task_id, task in self.tasks.items():
if task['assigned_worker'] == worker_id:
task['status'] = 'pending'
The monitor thread acquired the lock, detected the dead worker, then tried to reassign tasks, but reassign_worker_tasks also tried to acquire the same lock. Classic deadlock. The code just... hung. No error, no progress, nothing.
# AFTER (fixed) - call reassignment outside the lock
def handle_worker_failure(self, worker_id):
with self.lock:
self.workers[worker_id]['status'] = 'failed'
# Call OUTSIDE the lock to avoid deadlock
self.reassign_worker_tasks(worker_id)
I still don't fully understand why Python's threading module didn't just throw an error. Reentrant locks (RLock) are supposed to handle this. But well, it worked so I'm not going to touch it :)
The slow worker problem
Here's a fun scenario. Worker A grabs a task. Worker A gets slow (bad network, garbage collection, who knows). Worker A misses the heartbeat timeout. Master marks A as dead and gives the task to Worker B. Now both workers might finish the same task.
t=0: Master assigns map_0 to Worker A
t=3: Worker A starts (but it's being slow about it)
t=10: Worker A misses heartbeat deadline
t=11: Master: "A is dead." Assigns map_0 to Worker B
t=12: Worker B starts map_0
t=15: Worker A finishes, sends completion
t=16: Worker B finishes, sends completion <-- uh oh
This is the classic problem with failure detection in distributed systems. You cannot distinguish between "dead" and "slow." The network could be partitioned. The worker could be swapping to disk. It might recover and finish its work after you've already reassigned it.
I thought this would break everything. Turns out it's fine, as long as you track completions properly. Here's the actual implementation:
# src/master/state.py
class MasterState:
"""Manages the state of workers and tasks with idempotency support.
The scenario: Worker A takes task, goes slow, misses heartbeat, gets marked failed.
Task is reassigned to Worker B. Now both A and B might complete the same task.
Solution: Track task completion by task_id. First completion wins,
later completions are logged but ignored.
"""
def __init__(self):
self.workers = {}
self.tasks = {}
self.completed_task_ids = set() # The key to idempotency
self.lock = threading.Lock()
def complete_task(self, task_id, sequence_number, output_data, worker_id=None):
"""Mark task as completed (idempotent).
Returns: True if this was a duplicate completion, False otherwise
"""
with self.lock:
# Check if task already completed (primary idempotency check)
if task_id in self.completed_task_ids:
# This is a duplicate completion - log it but don't fail
self.tasks[task_id]['duplicate_completions'].append({
'worker_id': worker_id,
'timestamp': time.time(),
'sequence_number': sequence_number
})
print(f"[{datetime.now()}] DUPLICATE completion for task {task_id} "
f"from worker {worker_id} (already completed by "
f"{self.tasks[task_id].get('completion_worker')})")
return True # Is duplicate
# First completion - record it
self.tasks[task_id]['status'] = 'completed'
self.tasks[task_id]['completed_at'] = time.time()
self.tasks[task_id]['output_data'] = output_data
self.tasks[task_id]['completion_worker'] = worker_id
# Mark task as completed (idempotency tracking)
self.completed_task_ids.add(task_id)
return False # Not a duplicate
First one to finish wins. Later arrivals get logged and ignored. This is called idempotent handling: applying the same operation multiple times has the same effect as applying it once.
This works because map and reduce are deterministic. Given the same input, they produce the same output. So if Worker A and Worker B both complete map_0, they write the same intermediate files. Doesn't matter who "wins." The result is identical. Running a task twice doesn't corrupt anything. It just wastes some CPU.
Something from the paper that finally clicked
The original paper says:
"Any map tasks completed by the worker are reset back to their initial idle state."
The reasoning: outputs live on the failed worker's local disk, so they're gone.
This confused me for a while. When a task completes, doesn't the worker report back to master with the file locations? If it already reported, why re-run?
After implementing it myself, I finally understood: "completed" does not mean "durable."
Here's the distinction. In Google's implementation, map outputs live on the worker's local disk. The worker finishes, reports the file location to the master, and the master records it. But that data isn't replicated anywhere. It's sitting on one machine's disk. If that machine dies before reducers pull the data, the data is gone. The master knows where the data was, but it can't get to it anymore.
So "completed" means the computation finished. But the result isn't durable until it's been consumed by the reduce phase or copied to stable storage. This is the difference between:
- Computed: The work is done
- Durable: The result will survive failures
In my implementation, I track intermediate file locations centrally and assume shared storage (or that the files were copied). If a worker dies after reporting completion, the files are still accessible. So I decided: if the master already knows about the output and can still access it, don't re-run. If it doesn't, re-run.
This might be a simplification compared to Google's implementation, but it taught me something important: in distributed systems, you have to think carefully about when a write "counts." Completing a computation is not the same as persisting the result.
The shuffle: where your network bill comes from
After map finishes, you've got key-value pairs scattered across workers. The shuffle gets all values for a given key to the same reducer. This is the most expensive phase of MapReduce, and understanding why helps you write better jobs.
The all-to-all problem
Consider what happens with M mappers and R reducers. Each mapper produces R output files (one per reducer partition). Each reducer needs to fetch data from all M mappers. That's M * R file transfers.
Mappers Reducers
M1 ----+----+ R1
| |
M2 ----+----+ R2
| |
M3 ----+----+ R3
(M * R connections)
If you have 1000 mappers and 100 reducers, that's 100,000 network transfers. This is why the paper talks about bandwidth being a scarce resource. In a typical data center, machines have much more disk bandwidth than network bandwidth. The shuffle can easily become the bottleneck.
Google's original implementation tried to mitigate this with data locality. The master tries to schedule map tasks on machines that already have the input data (or on machines in the same network rack). This way, map reads are local or nearly-local. The shuffle still requires network transfer, but at least you're not paying network costs twice.
In my Docker-based setup, locality doesn't matter much since everything's on the same machine. But in a real cluster, this optimization is critical.
Partitioning (aka "Which Reducer Gets This Key?")
Here's the actual implementation:
# src/utils/partitioner.py
import hashlib
class Partitioner:
"""Hash-based partitioning for intermediate keys"""
def __init__(self, num_partitions):
self.num_partitions = num_partitions
def get_partition(self, key):
"""Get partition ID for a key using hash function"""
key_str = str(key)
# Use MD5 for consistent hashing across all Python processes
hash_value = int(hashlib.md5(key_str.encode()).hexdigest(), 16)
return hash_value % self.num_partitions
Same key, same partition, every time. Doesn't matter which mapper produced it.
The bug that cost me 4 hours:
My first version looked like this:
# BROKEN - Don't do this
def get_partition(self, key):
return hash(key) % self.num_partitions
Python's built-in hash() is randomized across processes by default (for security reasons, to prevent hash collision attacks). Worker 1 would send "hello" to partition 0. Worker 2 would send "hello" to partition 2. Word counts were wrong and I couldn't figure out why.
The symptom: "the" was being counted three different times instead of once. The fix: use hashlib.md5() which is deterministic across all processes.
Get this wrong and "the" might get counted separately on three different reducers. Ask me how I know.
Why fine-grained tasks matter
Google recommends M (map tasks) and R (reduce tasks) much larger than your worker count. The paper suggests M = 200,000 and R = 5,000 for a cluster of 2,000 workers. That's 100 map tasks per worker on average.
Two reasons:
First, load balancing. Tasks don't all take the same time. Some input splits are bigger. Some keys are more common (the "hot key" problem). If you have one task per worker, a slow task holds everything up. With 100 tasks per worker, a fast worker finishes its share and steals more work from the pending queue. The load naturally balances.
Second, fault tolerance. Worker dies? You lose one tiny task, not 30% of your data. Reassigning a small task is cheap. Reassigning a massive task wastes all the work that was already done.
I went with M = number of input chunks and R = 3 (configurable). Works fine for demo purposes. Production would want way more. The paper notes that having too many tasks also has overhead (the master has to track state for each one), so there's a sweet spot.
Chaining jobs together (the DAG scheduler)
After getting basic MapReduce working, I ran into a real problem: my word count worked, but what if I wanted to do something with the results?
Say I'm analyzing server logs. First job: count error types. Second job: find the top 10 errors. Third job: correlate with timestamps. Each job depends on the previous one's output. I could run them manually, one at a time, checking completion and copying outputs. That's tedious and error-prone.
Real pipelines aren't one job. You've got:
- Preprocess logs
- Preprocess events
- Combine both outputs (has to wait for first two)
- Generate report (has to wait for the combine)
preprocess_logs ----+
+--> combine_data --> final_report
preprocess_events --+
That's a DAG. Directed Acyclic Graph. Directed because arrows have direction (A must finish before C). Acyclic because no cycles allowed.
This is where my implementation goes beyond the original MapReduce paper. Google's paper describes single jobs. But real systems like Apache Airflow, Apache Oozie, and Google's own internal tools handle multi-job workflows. I wanted to understand how that orchestration layer works, and building it taught me a lot about dependency management and deadlock prevention.
The DAG scheduler added roughly 400 lines of code on top of the core MapReduce implementation. Not trivial, but manageable once the foundation was solid.
Why DAGs?
A DAG represents dependencies. Node = job. Edge = "must complete before." The structure guarantees that if you execute jobs in the right order, every job's inputs are ready when it starts.
The key property is that a DAG has at least one topological ordering: a sequence where every job appears after all its dependencies. This is what lets us execute without deadlock.
class Job:
def __init__(self, job_id, input_data, num_reduce_tasks):
self.job_id = job_id
self.status = JobStatus.PENDING
self.dependencies = [] # Jobs that must complete before this one
self.dependents = [] # Jobs waiting for this one
self.input_data = input_data
self.output_data = None
No cycles. Seriously.
If Job A depends on B, B depends on C, and C depends on A, you've got a deadlock. Everyone's waiting for everyone else. Nothing runs. Ever.
This is a fundamental property: cycles in a dependency graph create circular wait, one of the four conditions for deadlock (along with mutual exclusion, hold-and-wait, and no preemption). If you prevent any one of these conditions, you prevent deadlock. DAGs prevent circular wait by definition.
The scheduler rejects cycles at submission time. Before adding a job with dependencies, I run a cycle detection check:
def _would_create_cycle(self, new_job_id: str, dependencies: List[str]) -> bool:
"""Check if adding these dependencies would create a cycle."""
visited = set()
def dfs(job_id):
if job_id == new_job_id:
return True # Found a path back to the new job
if job_id in visited:
return False
visited.add(job_id)
job = self.jobs.get(job_id)
if job:
for dep_id in job.dependents:
if dfs(dep_id):
return True
return False
for dep_id in dependencies:
if dfs(dep_id):
return True
return False
If you try to create a cycle, the scheduler says no and throws an error. Better to fail fast at job submission than to discover a deadlock at runtime.
Running jobs in order
Topological sort. Fancy name, simple idea: order jobs so every job comes after its dependencies. I used Kahn's algorithm because it naturally handles parallel jobs at the same "level."
def get_topological_order(self) -> List[str]:
in_degree = {job_id: len(job.dependencies) for job_id, job in self.jobs.items()}
queue = deque([job_id for job_id, deg in in_degree.items() if deg == 0])
order = []
while queue:
job_id = queue.popleft()
order.append(job_id)
for dependent_id in self.jobs[job_id].dependents:
in_degree[dependent_id] -= 1
if in_degree[dependent_id] == 0:
queue.append(dependent_id)
return order
The algorithm starts with jobs that have no dependencies (in-degree zero). As each job "completes" (gets added to the order), it decrements the in-degree of its dependents. When a dependent's in-degree hits zero, all its dependencies are satisfied and it can run.
If the output order is shorter than the number of jobs, you have a cycle. (This is another way to detect cycles, but I check earlier at submission time.)
Propagating completion
At runtime, job completion triggers dependency checks:
def _propagate_completion(self, job_id: str):
job = self.jobs.get(job_id)
for dependent_id in job.dependents:
if self._check_dependencies_met(dependent_id):
dependent = self.jobs[dependent_id]
dependent.status = JobStatus.READY
self.ready_queue.append(dependent_id)
When a job finishes, we look at everything that was waiting for it. For each dependent, we check: are all of its dependencies now complete? If yes, mark it ready and add it to the execution queue.
This cascades through the DAG. If A and B both finish, and C depends on both, then C becomes ready after the second one completes. Parallelism where possible, dependencies respected. That's the whole game.
Job chaining: output becomes input
One nice feature: the output of one job can automatically become the input of the next.
scheduler.add_job("preprocess", input_data=raw_files)
scheduler.add_job("aggregate",
dependencies=["preprocess"],
use_dependency_output=True) # <-- magic
When "preprocess" completes, its output location gets passed to "aggregate" as input. No manual wiring. This makes it easy to build pipelines where each stage transforms the previous stage's output.
Example: setting up a job DAG
Here's how you'd set up a multi-stage pipeline with the scheduler:
from src.master.job_scheduler import DAGJobScheduler
scheduler = DAGJobScheduler()
# Stage 1: Two independent jobs (can run in parallel)
scheduler.add_job("preprocess_logs", input_data=["log1.txt", "log2.txt"])
scheduler.add_job("preprocess_events", input_data=["events.json"])
# Stage 2: Job that depends on both (waits for both to finish)
scheduler.add_job(
"combine_data",
dependencies=["preprocess_logs", "preprocess_events"],
use_dependency_output=True
)
# Stage 3: Final job
scheduler.add_job("final_report", dependencies=["combine_data"])
# Check execution order (topological sort)
order = scheduler.get_topological_order()
# Returns: ['preprocess_logs', 'preprocess_events', 'combine_data', 'final_report']
The scheduler exposes methods like get_next_ready_job(), mark_job_started(), and mark_job_completed() that the master uses to drive execution. When a job completes, _propagate_completion() checks if any dependent jobs are now ready to run. The actual MapReduce execution still happens through the task scheduler; the DAG scheduler just controls the ordering.
There's a comprehensive demo script at examples/dag_scheduler_demo.py that shows:
- Parallel execution of independent jobs
- Failure cascading through the DAG
- Output chaining between stages
- The ASCII visualization of job status
Run it with python3 examples/dag_scheduler_demo.py to see the scheduler in action.
End-to-end pipeline test
The real proof is running actual MapReduce jobs with data chaining. The integration test at examples/dag_integration_test.py runs a three-stage pipeline:
word_count → filter_common → top_words
Each job's output feeds into the next:
- word_count: Counts all words in the input file
- filter_common: Filters to words appearing more than 5 times
- top_words: Computes statistics on the filtered results
Here's actual output from running the pipeline:
Pipeline: word_count → filter_common → top_words
Job 1 (word_count): 8,332 unique words, 2.9M total occurrences
Job 2 (filter_common): 1,309 words kept (15.7% of input)
Job 3 (top_words): Top 10 computed
Top 10 words: ut (57,988), in (50,649), et (47,689), vestibulum (47,654)...
Total Pipeline Time: 8.70s
The output files are saved to output/:
-
word_count_output.json(134KB of word counts) -
filter_common_output.json(filtered words) -
top_words_output.json(statistics) -
pipeline_results.json(complete run metadata)
Run it yourself:
# Unit tests (no Docker needed)
python3 examples/dag_integration_test.py --test unit
# Full end-to-end pipeline with real data
python3 examples/dag_integration_test.py --test pipeline
Actually running this thing
I don't have a thousand machines like Google did in 2004. (Theirs were dual-processor x86 boxes with 2-4GB RAM. Mine is a laptop that heats up during Zoom calls.)
So: Docker. Each container pretends to be a machine. gRPC handles the network stuff.
Three Docker configurations
The system has separate Docker Compose files for different use cases:
| File | Purpose | Command |
|---|---|---|
docker-compose.yml |
Normal runs with sample data | docker-compose up |
docker-compose.benchmark.yml |
Performance benchmarking (1M words) | docker-compose -f docker-compose.benchmark.yml up |
docker-compose.dag.yml |
DAG pipeline execution | docker-compose -f docker-compose.dag.yml up |
All configurations use the same unified server.py with different flags:
# Single-job mode (default)
python3 -m src.master.server --input data/input/sample.txt
# Pipeline mode (DAG execution)
python3 -m src.master.server --pipeline config/pipeline.json
The worker count is tunable. The master dynamically accepts any number of workers that connect:
# Run with 1 worker
docker-compose up master worker1
# Run with 3 workers
docker-compose up master worker1 worker2 worker3
# Run with 5 workers (add more to docker-compose.yml first)
docker-compose up master worker1 worker2 worker3 worker4 worker5
Quick start
# spin up the cluster
docker-compose up -d
# watch the master logs
docker-compose logs -f master
# kill a worker mid-job
docker-compose stop worker2
There's also a web dashboard at http://localhost:5000 (when using docker-compose.benchmark.yml) that shows real-time worker status and job progress. It's read-only, just polling the master's HTTP status endpoint every second. No buttons, no control. Just a live view of what's happening.
Things to try
If you want to see fault tolerance in action, here are some experiments:
- Start the cluster:
docker-compose -f docker-compose.benchmark.yml up --build - Open http://localhost:5000 to watch progress
- While it's running, kill a worker:
docker stop benchmark_worker2 - Watch the dashboard. Worker2 turns "failed", and its tasks get reassigned to surviving workers
The interesting case is when you restart the killed worker while the job is still running. If the original task was already reassigned and completed by another worker, the restarted worker might try to complete it again. The master should ignore the duplicate completion. This is idempotency in practice.
Some numbers (and the problem of being too fast)
Here's an embarrassing problem I didn't expect: the system was too fast to benchmark properly.
The benchmarking challenge
I ran into a big problem: I was suffering from success.
My first attempt at benchmarking used a simple word count on sample.txt (the default input file). The job completed in under 100 milliseconds. Workers would register, grab all the tasks, finish everything, and be done before I could even see progress in the logs. The Docker container startup time (3+ seconds) was longer than the actual computation.
This is actually a good sign for the implementation. The gRPC overhead is minimal, task scheduling is fast, and the Python code isn't doing anything stupid. But it made benchmarking impossible. You can't measure a 1.5x speedup when your baseline is "too fast to measure."
Creating meaningful workload
I had to engineer a bigger input file. Here's what I ended up with:
# big_sample.txt stats:
# - 6.6 MB file size
# - 89,155 lines
# - 1,000,000 words (exactly 1M for nice round numbers)
# - Lorem ipsum text repeated and varied
The benchmark configuration uses --chunk-size 500, which splits the 89K lines into 178 map tasks (89155 / 500 ≈ 178). Each map task processes 500 lines (~5,600 words). This creates enough parallelism for the workers to stay busy and for timing differences to be measurable.
The numbers
| Configuration | Time | Notes |
|---|---|---|
| 1 worker | 2.07s | Sequential baseline |
| 3 workers | 1.36s | 1.52x speedup |
| 3 workers (1 killed mid-job) | 1.22s | Fault tolerance overhead minimal |
Why isn't it 3x faster with 3 workers?
The theoretical maximum speedup with 3 workers is 3x. We got 1.52x. Here's where the time goes:
1. Amdahl's Law in action
Not everything parallelizes. The reduce phase has only 3 tasks (R=3), and they can't start until all 178 map tasks complete. If one mapper is slightly slower, everyone waits. The sequential portion (reduce phase startup, final aggregation) limits speedup.
2. Task granularity overhead
Each of the 178 map tasks requires:
- gRPC call to master to get the task (~1-2ms)
- JSON serialization of input data
- JSON serialization of output (intermediate files)
- gRPC call to report completion (~1-2ms)
- gRPC call to report intermediate file locations (~1-2ms)
That's roughly 5-6ms of overhead per task. With 178 tasks, that's almost a full second of just coordination overhead. The actual word counting is probably faster than the RPC calls.
3. Shared filesystem bottleneck
In the Docker setup, all workers write intermediate files to the same mounted volume. The shuffle phase (reduce workers reading map outputs) hits this shared resource. In a real cluster with distributed storage (like GFS), this would be parallelized.
4. Python GIL considerations
Each worker runs in its own container, so the GIL isn't a direct issue. But within each worker, the heartbeat thread and task execution thread share the GIL. Heavy computation could delay heartbeats, though in practice word counting is I/O-bound, not CPU-bound.
Task execution internals
Here's what actually happens when a worker executes a map task:
# From src/worker/executor.py - execute_map()
def execute_map(self, task_id, input_data, input_key=None):
"""Execute a map task.
1. Apply user's map function to input (word_count_map)
2. Partition output into R buckets using hash(key) mod R
3. Write partitioned data to local intermediate files (JSON)
4. Return file locations to report to master
"""
input_key = input_key or task_id
# Execute map phase: tokenize text, emit (word, 1) pairs
# Then partition by hash(word) % num_reduce_tasks
partitions = self.map_phase.execute(input_key, input_data)
# Write R intermediate files (one per reduce partition)
# File format: {word: [1, 1, 1, ...], word2: [1], ...}
file_paths = self.intermediate_manager.write_partitioned_output(
task_id,
partitions
)
return file_paths # {0: "map_5_partition_0.json", 1: "...", 2: "..."}
The intermediate file format groups values by key within each partition:
// intermediate/worker1/map_0_partition_0.json
{
"the": [1, 1, 1, 1, 1],
"quick": [1],
"brown": [1, 1],
...
}
This pre-grouping at the mapper (sometimes called a combiner effect) reduces the work the reducer has to do. Instead of seeing 50,000 individual ("the", 1) pairs, the reducer sees ("the", [1, 1, 1, ...]) already grouped.
Reduce task execution
# From src/worker/executor.py - execute_reduce()
def execute_reduce(self, task_id, partition_id, intermediate_locations):
"""Execute a reduce task.
1. Fetch intermediate files from ALL map workers for this partition
2. Merge the pre-grouped data (shuffle phase)
3. Sort by key
4. Apply reduce function to each (key, values) group
5. Return final output
"""
# Fetch from all mappers: each contributed one file for this partition
# intermediate_locations = [
# {"file_path": "intermediate/worker1/map_0_partition_0.json"},
# {"file_path": "intermediate/worker2/map_1_partition_0.json"},
# ...
# ]
grouped_data = self._fetch_and_group_intermediate(intermediate_locations)
# grouped_data = {"the": [1,1,1,...,1,1], "cat": [1,1], ...}
# Sort by key for consistent output ordering
sorted_data = sorted(grouped_data.items())
# Apply reduce function: sum all the 1s
result = self.reduce_phase.execute(sorted_data)
# result = {"the": 50000, "cat": 200, ...}
return result
Master task scheduling
The scheduler implements a strict phase ordering:
# From src/master/scheduler.py - get_next_task()
def get_next_task(self, worker_id):
"""Scheduling policy:
1. During map phase: assign pending map tasks (FIFO)
2. After ALL maps complete: create reduce tasks
3. During reduce phase: assign pending reduce tasks
"""
# Check phase transition
self.check_map_phase_complete() # Creates reduce tasks if ready
pending_tasks = self.state.get_pending_tasks()
if not pending_tasks:
if self.check_job_complete():
print(" MapReduce job complete!")
return None
# Prioritize: map tasks first, then reduce
map_tasks = [t for t in pending_tasks if t['task_type'] == 'map']
reduce_tasks = [t for t in pending_tasks if t['task_type'] == 'reduce']
task = map_tasks[0] if map_tasks else reduce_tasks[0]
self.state.assign_task(task['task_id'], worker_id)
return task
The key insight: reduce tasks aren't even created until all map tasks complete. This ensures reducers have all the intermediate data they need. If we created reduce tasks earlier, reducers would have incomplete data.
Intermediate file management
Each worker maintains its own intermediate directory:
intermediate/
├── worker1/
│ ├── map_0_partition_0.json # Keys hashing to partition 0
│ ├── map_0_partition_1.json # Keys hashing to partition 1
│ ├── map_0_partition_2.json # Keys hashing to partition 2
│ ├── map_3_partition_0.json # Worker 1 also did map task 3
│ └── ...
├── worker2/
│ ├── map_1_partition_0.json
│ └── ...
└── worker3/
└── ...
With M=178 map tasks and R=3 reduce tasks, we create 178 × 3 = 534 intermediate files total. Each file is typically 10-50KB for this workload.
Timing breakdown (rough estimates for 1-worker baseline)
| Phase | Time | Notes |
|---|---|---|
| Worker startup & registration | ~3s | Docker container + gRPC handshake |
| Map phase (178 tasks) | ~1.5s | 178 × (parse + hash + write JSON) |
| Shuffle (reduce fetching files) | ~0.3s | 3 reducers × 178 files each |
| Reduce phase (3 tasks) | ~0.2s | 3 × (merge + sum + write result) |
| Total job time | ~2.0s | Excluding container startup |
The 3-worker case parallelizes the map phase but not much else. Workers split the 178 map tasks roughly evenly (~60 each), cutting that 1.5s down to ~0.5s. But shuffle and reduce don't scale as well, hence 1.52x not 3x.
You can run the benchmark yourself:
python3 examples/benchmark.py
This starts the Docker cluster, runs the job with different worker counts, and reports timing. Results are saved to benchmark_results.txt.
Connecting the dots to class
Building this system gave me concrete experience with distributed systems concepts that previously felt abstract. Here's how they showed up in practice:
Failure models and detection
The system assumes crash-stop failures: workers either work correctly or stop entirely. This is the simplest failure model and the easiest to handle. Byzantine failures (where nodes lie or behave arbitrarily) would require much more complex protocols. Things like Byzantine Fault Tolerant consensus, which needs 3f+1 nodes to tolerate f Byzantine failures.
Failure detection in distributed systems is fundamentally imperfect. You cannot distinguish between "crashed" and "slow." This connects to the FLP impossibility result (Fischer, Lynch, and Paterson, 1985): in an asynchronous system where message delays are unbounded, no deterministic consensus protocol can guarantee both safety (never making a wrong decision) and liveness (eventually making progress) with even one faulty process.
What does this mean practically? It means you have to choose. MapReduce sidesteps the impossibility by:
- Not requiring consensus: The master makes all decisions unilaterally. No need for workers to agree on anything.
- Accepting imperfect failure detection: Timeouts might incorrectly mark a slow worker as dead. That's okay.
- Making failures recoverable through retry: If we mistakenly reassign a task, the duplicate execution doesn't corrupt anything.
The FLP result doesn't say distributed systems are impossible. It says you can't have everything. MapReduce works because it gives up on perfect failure detection and compensates with idempotent operations.
Consistency and idempotency
MapReduce achieves eventual consistency through idempotent operations. The same task might run multiple times, but the result is always the same. This is a form of at-least-once semantics: we guarantee every task runs at least once, but it might run more than once.
The alternative would be exactly-once semantics, which is much harder to achieve in distributed systems. It typically requires two-phase commit or similar coordination protocols, which add latency and complexity. MapReduce avoids this by making duplicate execution safe.
Coordination patterns
The master implements centralized coordination. All decisions flow through one node. This is simple but creates a bottleneck and single point of failure.
The alternative is decentralized coordination, where nodes coordinate peer-to-peer. Systems like Paxos and Raft do this for consensus. The trade-off: decentralized systems are more fault-tolerant but harder to reason about and typically slower (multiple rounds of communication to agree on anything).
For MapReduce, centralized coordination makes sense. The master is a coordination bottleneck, not a data bottleneck. It makes decisions but doesn't process terabytes of data. This asymmetry makes centralization acceptable.
Data partitioning
Hash partitioning ensures keys are evenly distributed across reducers. It's simple and works well when keys are uniformly distributed.
The downside is that it doesn't preserve ordering. If you need sorted output, you'd use range partitioning instead: keys 0-99 go to reducer 0, keys 100-199 go to reducer 1, etc. This preserves order but risks imbalanced load if keys aren't uniformly distributed.
State machine thinking
I started thinking of tasks as state machines:
PENDING -> RUNNING -> COMPLETED
|
v
FAILED -> PENDING (retry)
This made the code cleaner. Each state transition has clear triggers and effects. The master maintains the current state; workers trigger transitions by requesting tasks and reporting completion.
CAP theorem trade-offs
The CAP theorem says you can have at most two of: Consistency, Availability, Partition tolerance. Where does MapReduce land?
Actually, MapReduce assumes partitions won't happen, or are rare enough to ignore. Within a datacenter, network partitions are uncommon (though not impossible). Given this assumption, MapReduce provides both consistency and availability. It's a CA system that assumes reliable networking.
But what happens when the assumption breaks? If the master becomes partitioned from workers, the system stops making progress. Workers can't get tasks. Tasks can't be reported complete. MapReduce chooses to stop rather than return potentially wrong results. In CAP terms, when forced to choose, it picks consistency over availability.
This is similar to traditional databases. They often assume reliable networks within a datacenter and provide CA semantics. The CAP trade-off really bites when you cross datacenters or have unreliable networks. That's when you need to make hard choices about CP vs AP.
A true CP system (like one using Raft for master election) would continue making progress during partitions by electing a new leader in the majority partition. MapReduce doesn't do this. It just waits for the partition to heal. That's a pragmatic choice for batch processing where jobs can be restarted.
What I skipped (and why)
Speculative execution. The paper describes running backup copies of slow tasks. This requires tracking task timing statistics and deciding when a task is "too slow." It's an optimization that matters at scale but adds significant complexity. I'd need to track p50/p99 task durations, decide on backup thresholds, and handle the case where both original and backup complete. Valuable, but not core to understanding MapReduce.
Combiner functions. If your reduce function is associative (like sum or count), you can run a "mini-reduce" on the mapper before shuffling. This reduces network traffic significantly. Instead of sending [(word, 1), (word, 1), (word, 1)], you send [(word, 3)]. I understood the concept but prioritized other features.
Master failover. This would require implementing Raft or Paxos for leader election, which is essentially a whole separate project. The master checkpoints its state, so recovery is possible, but automated failover would need consensus among standby nodes. Maybe next semester.
Data locality optimization. Google's scheduler tries to place map tasks on machines that already have the input data. In my Docker setup, all containers share the same filesystem, so locality is meaningless. Real clusters would benefit hugely from this.
What I actually learned
The paper is 13 pages. The algorithm fits in your head. The hard part is everything the paper doesn't say: the edge cases, the debugging, the moments where you have to make a design decision and the paper just moves on.
I spent more time debugging threading issues than writing the actual map/reduce logic. The distinction between "completed" and "durable" didn't click until I had workers dying at inconvenient times and had to think through what should happen to their outputs.
Building this changed how I read distributed systems papers. I now notice the gaps, the places where the authors say "and then we do X" without explaining how X actually works when messages get delayed or nodes crash at the wrong moment. Those gaps are where the real learning happens.
Links
- My implementation: github.com/Ajodo-Godson/MapReduce. Clone it, break it, learn from it.
- The original paper: MapReduce: Simplified Data Processing on Large Clusters. Pages 3-5 cover the execution model, page 7 covers fault tolerance. Both are worth re-reading after you've implemented something.
- gRPC introduction
- Protocol Buffers documentation
- FLP Impossibility Paper. Dense but foundational. The result is simple; the proof is not.
Fall 2025. Distributed Systems.
Top comments (0)