DEV Community

Onion Lee
Onion Lee

Posted on • Edited on

JavaRaft: Raft Distributed Consensus Protocol

While studying for System Design interviews, I picked up some basic knowledge and got interested in distributed systems. After some studies, I still found many distributed systems algorithms a bit abstract and hand-wavy. To gain some hands-on experience, I decided to build Raft consensus algorithm from scratch.

It took me a few weeks to fully understand the Raft paper and implement the core algorithm. In this post, I want to reiterate on my learning of some key details of my Raft-based distributed key-value store.

GitHub logo seungwonlee003 / JavaRaft

Java-based Raft consensus implementation with Spring Boot RPC and in-memory key-value store.

Introduction

Demo Video

Blog Post

A Java-based implementation of the Raft consensus algorithm, designed to support consistent (CP) distributed systems like distributed key-value stores.

This project includes:

  • The core Raft algorithm
  • A set of pluggable interfaces for state machines and persistence
  • A Spring Boot-based RPC layer for inter-node communication
  • A simple in-memory key-value store with HTTP endpoints for PUT, GET, and DELETE operations (as a reference implementation)

Developers can extend this framework by implementing custom state machines and storage backends to build their own distributed services.

Built with Java, Spring Boot, Lombok, and SLF4J.

Features

Features reference the section number of the Raft paper:

  • Leader election (§5.2)
  • Log replication (§5.3)
  • Election restriction (§5.4.1)
  • Committing entries from previous terms (§5.4.2)
  • Follower and candidate crashes (§5.5)
  • Implementing linearizable semantics (§8)

Usage

Try out the distributed key-value API.

Configure application properties for nodes and build the JAR (skipping tests):

mvn clean package -DskipTests

While my implementation is far from production-ready, my goal was to build a system that works under basic conditions and demonstrates the core ideas.

To keep things focused, I won’t go into line-by-line code or the fine-grained interactions between components. Instead, I’ll walk through a high-level overview of how I implemented the core Raft algorithm.


Leader election (§5.2):

The first module I developed is the leader election. In Raft, all nodes begin as followers, and a leader is elected when a follower’s election timer expires.

To enable this, I implemented an election timer that triggers an election upon expiration. It is called when the node transition its state to a follower.

// If election timeout elapses without receiving AppendEntries RPC from current leader or granting vote to candidate:
// convert to candidate (§5.2)
public void reset() {
    cancel();

    long minTimeout = config.getElectionTimeoutMillisMin();
    long maxTimeout = config.getElectionTimeoutMillisMax();
    long timeout = minTimeout + random.nextInt((int)(maxTimeout - minTimeout));

    electionFuture = scheduler.schedule(() -> {
        electionManager.startElection();
    }, timeout, TimeUnit.MILLISECONDS);
}
Enter fullscreen mode Exit fullscreen mode

The ElectionManager class includes two primary methods: startElection() and handleVoteRequest(). The startElection() method allocates threads from a pool based on the number of peers and sends asynchronous vote requests to each follower via their /requestVote endpoint. Then, the main thread completes the method by resetting the election timer. Here, a timeout, configured in the RestTemplate, ensures responses arrive within a duration shorter than the election timer, preventing overlapping elections on the same node.

If a peer’s response indicates a higher term, the node immediately steps down to the follower state, and the election concludes. If less than majority of responses are received, the election timer resets. Otherwise, the node becomes a leader, cancels the ongoing election timer to prevent any new elections, and starts sending heartbeats to the follower nodes. HandleVoteRequest method implementation is rather straightforward following the Raft paper, so will be skipped.

/**
 * On conversion to candidate, start election:
 * Increment currentTerm, vote for self, reset election timer, send RequestVote RPCs to all other servers.
 */
public void startElection() {
    lockManager.getStateWriteLock().lock();
    lockManager.getLogReadLock().lock();
    try {
        if (nodeState.getCurrentRole() == Role.LEADER) {
            return;
        }

        nodeState.setCurrentRole(Role.CANDIDATE);
        nodeState.incrementTerm();
        nodeState.setVotedFor(nodeState.getNodeId());

        int currentTerm = nodeState.getCurrentTerm();
        int lastLogIndex = raftLog.getLastIndex();
        int lastLogTerm = raftLog.getLastTerm();

        List<CompletableFuture<VoteResponseDto>> voteFutures = new ArrayList<>();
        ExecutorService executor = Executors.newCachedThreadPool();

        for (String peerUrl : config.getPeerUrls().values()) {
            CompletableFuture<VoteResponseDto> voteFuture = CompletableFuture
                    .supplyAsync(() -> requestVote(
                            currentTerm,
                            nodeState.getNodeId(),
                            lastLogIndex,
                            lastLogTerm,
                            peerUrl
                    ), executor)
                    .orTimeout(config.getElectionRpcTimeoutMillis(), TimeUnit.MILLISECONDS)
                    .exceptionally(throwable -> {
                        return new VoteResponseDto(currentTerm, false);
                    });
            voteFutures.add(voteFuture);
        }

        int majority = (config.getPeerUrls().size() + 1) / 2 + 1;
        AtomicInteger voteCount = new AtomicInteger(1);

        // If votes received from majority of servers: become leader (§5.2).
        for (CompletableFuture<VoteResponseDto> future : voteFutures) {
            future.thenAccept(response -> {
                lockManager.getStateWriteLock().lock();
                try {
                    if (nodeState.getCurrentRole() != Role.CANDIDATE || nodeState.getCurrentTerm() != currentTerm) {
                        return;
                    }
                    if (response != null && response.isVoteGranted()) {
                        int newVoteCount = voteCount.incrementAndGet();
                        if (newVoteCount >= majority) {
                            stateManager.becomeLeader();
                        }
                    }
                } finally {
                    lockManager.getStateWriteLock().unlock();
                }
            });
        }
        stateManager.resetElectionTimer();
    } finally {
        lockManager.getLogReadLock().unlock();
        lockManager.getStateWriteLock().unlock();
    }
}
Enter fullscreen mode Exit fullscreen mode

Log replication (§5.3):

The second module I implemented is the log replication. Given the complexity of this logic, I splitted the responsibilities into three separate classes: one for handling client write requests, another for initiating log replication, and lastly for processing append entries.

AppendEntriesHandler.java
ClientRequestHandler.java
LogReplicator.java
RaftReplicationManager.java

To manage these interactions, I used the Facade design pattern, introducing a centralized ReplicationManager class to delegate calls to the appropriate classes.

...
public class RaftReplicationManager {
    ...

    public boolean handleClientRequest(LogEntry entry) {
        return clientRequestHandler.handle(entry);
    }

    public void startLogReplication() {
        logReplicator.start();
    }

    public AppendEntriesResponseDTO handleAppendEntries(AppendEntriesDTO dto) {
        return appendEntriesHandler.handle(dto);
    }

    public void initializeIndices() {
        logReplicator.initializeIndices();
    }
}
Enter fullscreen mode Exit fullscreen mode

The replication process begins with the start() method in the LogReplicator class. This method relies on several key data structures defined by Raft: nextIndex[] and matchIndex[], which track replication progress for each follower, and a pendingReplication[] map to manage ongoing replication tasks.

When a node becomes the leader after winning an election, this method iterates over all followers in the pendingReplication[] map and launches asynchronous log replication for each one if not already launched, utilizing threads from a fixed thread pool.

// Upon election: send initial empty AppendEntries RPCs (heartbeat) to each server; repeat during idle periods
// to prevent election timeouts (§5.2)
public void start() {
    lockManager.getStateReadLock().lock();
    try {
        if (nodeState.getCurrentRole() != Role.LEADER) return;
    } finally {
        lockManager.getStateReadLock().unlock();
    }

    for (String peer : config.getPeerUrls().values()) {
        if (!pendingReplication.getOrDefault(peer, false)) {
            pendingReplication.put(peer, true);
            executor.submit(() -> replicateLoop(peer));
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

For each follower, an asynchronous thread executes the replicateLoop() method. This method runs a while loop that continuously replicates new log entries to the /appendEntries endpoint of the follower, catches followers up on missing entries, or sends empty log entries as heartbeats to maintain leadership. The loop’s back-off period aligns with the heartbeat interval.

// Sends heartbeats/log entries to peer, adjusting sleep time to match heartbeat interval
private void replicateLoop(String peer) {
    int backoff = config.getHeartbeatIntervalMillis();
    while (true) {
        lockManager.getStateReadLock().lock();
        try {
            if (nodeState.getCurrentRole() != Role.LEADER) break;
        } finally {
            lockManager.getStateReadLock().unlock();
        }

        lockManager.getLogWriteLock().lock();
        lockManager.getStateWriteLock().lock();
        lockManager.getStateMachineWriteLock().lock();
        long startTime = System.currentTimeMillis();
        try {
            boolean ok = replicate(peer);
            if (ok) updateCommitIndex();
        } finally {
            lockManager.getStateMachineWriteLock().unlock();
            lockManager.getStateWriteLock().unlock();
            lockManager.getLogWriteLock().unlock();
        }
        long duration = System.currentTimeMillis() - startTime;
        long sleepTime = Math.max(0, backoff - duration);
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            break;
        }
    }
    pendingReplication.put(peer, false);
}
Enter fullscreen mode Exit fullscreen mode

Here, the RestTemplate timeout is configured to prevent the loop from stalling indefinitely, which could otherwise trigger a unneccessary election on a follower.

When the replication loop succeeds—assuming no follower responds with a higher term—the leader updates the corresponding nextIndex[] and matchIndex[] values and invokes updateCommitIndex() method.

// If last log index >= nextIndex for a follower: send AppendEntries RPC with log entries starting at nextIndex.
private boolean replicate(String peer) {
    int ni = nextIndex.get(peer);
    int prevIdx = ni - 1;
    int prevTerm = (prevIdx >= 0) ? raftLog.getTermAt(prevIdx) : 0;
    List<LogEntry> entries = raftLog.getEntriesFrom(ni);

    AppendEntriesRequestDto dto = new AppendEntriesRequestDto(
            nodeState.getCurrentTerm(), nodeState.getNodeId(),
            prevIdx, prevTerm, entries, raftLog.getCommitIndex()
    );

    String peerId = config.getPeerUrls().entrySet().stream()
            .filter(entry -> entry.getValue().equals(peer))
            .map(Map.Entry::getKey)
            .map(String::valueOf)
            .findFirst()
            .orElse("unknown");

    try {
        String url = peer + "/raft/appendEntries";
        ResponseEntity<AppendEntriesResponseDto> res = restTemplate.postForEntity(url, dto, AppendEntriesResponseDto.class);
        AppendEntriesResponseDto body = res.getBody() != null ? res.getBody() : new AppendEntriesResponseDto(-1, false);
        if (body.getTerm() > nodeState.getCurrentTerm()) {
            nodeStateManager.becomeFollower(body.getTerm());
            return false;
        }
        if (body.isSuccess()) {
            nextIndex.put(peer, ni + entries.size());
            matchIndex.put(peer, ni + entries.size() - 1);
            return true;
        } else {
            int newNextIndex = Math.max(0, ni - 1);
            nextIndex.put(peer, newNextIndex);
            return false;
        }
    } catch (ResourceAccessException e) {
        return false;
    } catch (Exception e) {
        return false;
    }
}
Enter fullscreen mode Exit fullscreen mode

This, in turn, triggers applyEntries() method, which applies log entries up to the commit index to the data store. Additionally, when advancing the commit index, it ensures that only entries from the leader’s current term are committed. This upholds Raft’s safety guarantee, preventing the application of stale entries from prior terms. This is shown below:

private void updateCommitIndex() {
    int majority = (config.getPeerUrlList().size() + 1) / 2 + 1;
    int term = nodeState.getCurrentTerm();
    for (int i = log.getLastIndex(); i > log.getCommitIndex(); i--) {
        int count = 1;
        for (int idx : matchIndex.values()) {
            if (idx >= i) count++;
        }
        if (count >= majority && log.getTermAt(i) == term) {
            log.setCommitIndex(i);
            applyEntries();
            break;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

From the follower’s perspective, the handle() method in the AppendEntriesHandler class processes the AppendEntriesDTO received from the leader.

It first verifies the sender’s leadership by checking the term. Next, it compares its log with the leader’s nextIndex[] for that follower. If the follower’s log is not up-to-date due to a mismatch in prior entries, it rejects the request, prompting the leader to decrement the follower’s nextIndex[] by one and retry.

If the log matches, the follower accepts the entries according to the log matching property (§5.2), updates its commit index, and applies the entries to its state machine up to the new commit index.

public AppendEntriesResponseDto handle(AppendEntriesRequestDto dto) {
    lockManager.getLogWriteLock().lock();
    lockManager.getStateWriteLock().lock();
    lockManager.getStateMachineWriteLock().lock();
    try {
        int term = nodeState.getCurrentTerm();
        int leaderTerm = dto.getTerm();

        // Reply false if term < currentTerm (§5.1)
        if (leaderTerm < term) return new AppendEntriesResponseDto(term, false);

        // If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1)
        if (leaderTerm > term) {
            stateManager.becomeFollower(leaderTerm);
            term = leaderTerm;
        }

        nodeState.setCurrentLeader(dto.getLeaderId());

        // Reply false if log doesn't contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
        if (dto.getPrevLogIndex() > 0 &&
                (!raftLog.containsEntryAt(dto.getPrevLogIndex()) ||
                        raftLog.getTermAt(dto.getPrevLogIndex()) != dto.getPrevLogTerm())) {
            stateManager.resetElectionTimer();
            return new AppendEntriesResponseDto(term, false);
        }

        // If an existing entry conflicts with a new one (same index but different terms), delete the existing
        // entry and all that follow it (§5.3). Append any new entries not already in the log.
        int index = dto.getPrevLogIndex() + 1;
        List<LogEntry> entries = dto.getEntries();
        if (!entries.isEmpty()) {
            for (int i = 0; i < entries.size(); i++) {
                int logIndex = index + i;
                if (raftLog.containsEntryAt(logIndex) && raftLog.getTermAt(logIndex) != entries.get(i).getTerm()) {
                    raftLog.deleteFrom(logIndex);
                    raftLog.appendAll(entries.subList(i, entries.size()));
                    break;
                }
            }
            if (!raftLog.containsEntryAt(index)) {
                raftLog.appendAll(entries);
            }
        }

        // If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of the last new entry)
        if (dto.getLeaderCommit() > raftLog.getCommitIndex()) {
            int lastNew = dto.getPrevLogIndex() + entries.size();
            raftLog.setCommitIndex(Math.min(dto.getLeaderCommit(), lastNew));
            applyEntries();
        }
        stateManager.resetElectionTimer();
        return new AppendEntriesResponseDto(term, true);
    } finally {
        lockManager.getStateMachineWriteLock().unlock();
        lockManager.getStateWriteLock().unlock();
        lockManager.getLogWriteLock().unlock();
    }
}
Enter fullscreen mode Exit fullscreen mode

Finally, the ClientRequestHandler manages client writes to the database. Its handle() method appends the entry to the leader’s log for replication and monitors the entry’s index. It returns a success response only when the last applied index surpasses the entry’s index, confirming that the write has been applied to the state machine.

This mechanism supports linearizability, ensuring that subsequent client reads following a successful write reflect the updated state machine.

// If command received from the client: append entry to local log, respond after entry applied
// to state machine (§5.3)
public boolean handle(LogEntry clientEntry) {
    lockManager.getStateReadLock().lock();
    lockManager.getLogWriteLock().lock();
    int entryIndex;
    try {
        if (!raftNodeState.isLeader()) return false;
        raftLog.append(clientEntry);
        entryIndex = raftLog.getLastIndex();
    } finally {
        lockManager.getLogWriteLock().unlock();
        lockManager.getStateReadLock().unlock();
    }

    long start = System.currentTimeMillis();
    long timeoutMillis = raftConfig.getClientRequestTimeoutMillis();

    while (true) {
        lockManager.getLogReadLock().lock();
        lockManager.getStateReadLock().lock();
        try {
            if (raftNodeState.getCurrentRole() != Role.LEADER) {
                return false;
            }
            if (raftNodeState.getLastApplied() >= entryIndex) {
                return true;
            }
        } finally {
            lockManager.getStateReadLock().unlock();
            lockManager.getLogReadLock().unlock();
        }

        if (System.currentTimeMillis() - start > timeoutMillis) {
            return false;
        }
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Here, a slight latency is introduced because client writes are aligned with the heartbeat cycle. If the heartbeat interval is 1 second, the leader may wait up to 1 second before replicating the write. While the Raft protocol allows independent AppendEntries RPCs for writes (separate from heartbeats), I chose to synchronize client writes with the heartbeat cycle to simplify concurrency handling and to take advantage of batched replication.

Before concluding, I want to address concurrency concerns. In Raft, concurrent threads handling RequestVote, AppendEntries, and election timeouts can introduce race conditions by simultaneously modifying shared node state (such as currentTerm, votedFor, and the log). To ensure serializability and prevent such race conditions, I implemented a centralized read-write lock that guards access to the node’s state, log, and state machine.

This concludes the explanation of the core Raft logics. Now, I want to talk about the fundamental concern that I addressed per the Raft paper known as "Implementing linearizable semantics" (§8).


Ensuring Lineraizable Semantics

In Raft, writes are committed once they reach a majority quorum, and the leader is guaranteed to hold all committed log entries. However, this does not inherently ensure that reads routed to the leader always reflect the most up-to-date state, which is necessary for linearizability.

Consider a scenario where a client is connected to a leader that loses its leadership due to a network partition, isolating it from the majority of the cluster. If the client reconnects to this former leader before the partition heals, the node may still consider itself the leader and serve stale data from its state machine data that could have been superseded by a new leader. This violates linearizability, as the read does not reflect the latest committed writes.

Raft paper proposes different solutions to this and I implemented a mechanism where the client get operation will always send no-op entry to the followers to confirm leadership. If majority of nodes accepts the entry through /appendEntries endpoint logic, it guarantees that the leader is not stale, thereby confirming reads as linearizable. While this introduces extra network round trip, it provides just-enough solution given my current implementation.

More efficient alternatives, such as ReadIndex or lease-based reads, exist, but I chose this straightforward approach. Sorry for the messy code below: this was because I didn’t plan to extend this class any further.

@GetMapping("/get")
public ResponseEntity<String> get(@RequestParam String key) {
    final String NO_OP_ENTRY = "NO_OP_ENTRY";
    try {
        handleRead(new WriteRequestDto(NO_OP_ENTRY, Long.MAX_VALUE, NO_OP_ENTRY, NO_OP_ENTRY));
    } catch (Exception e) {
        return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body("Failed to process read request: " + e.getMessage());
    }

    lockManager.getStateMachineReadLock().lock();
    try {
        String value = kvStore.get(key);
        if (value == null) {
            return ResponseEntity.status(HttpStatus.NOT_FOUND)
                    .body("Key not found: " + key);
        }
        return ResponseEntity.ok(value);
    } catch (Exception e) {
        return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body("Error retrieving key: " + e.getMessage());
    } finally {
        lockManager.getStateMachineReadLock().unlock();
    }
}

private void handleRead(WriteRequestDto request) {
    LogEntry entry = new LogEntry(
            nodeState.getCurrentTerm(),
            request.getKey(),
            request.getValue(),
            LogEntry.Operation.PUT,
            request.getClientId(),
            request.getSequenceNumber()
    );
    boolean committed = replicationManager.handleClientRequest(entry);
    if (!committed) throw new RuntimeException("Can't process the read at this moment");
}
Enter fullscreen mode Exit fullscreen mode

Ensuring Lineraizable Semantics 2

Raft paper discusses a scenario where “if the leader crashes after committing the log entry but before responding to the client, the client will retry the command with a new leader, causing it to be executed a second time”. Client-side deduplication solves this by attaching a unique identifier (e.g., a sequence number) to each request. The server tracks these IDs, rejecting duplicates. This preserves both correctness and linearizability, addressing one of the key challenges.

In the implementation, I modified the apply() method in the StateMachine class. Before applying a write, it queries the database (in-memory) to compare the log entry’s sequence number with the last recorded sequence number for that client ID. The write is applied only if the new sequence number is greater, confirming it as a fresh operation rather than a duplicate.

@Override
public void apply(LogEntry entry) {
    if (entry == null) {
        throw new IllegalArgumentException("Log entry cannot be null");
    }

    // deduplication check
    String clientId = entry.getClientId();
    long sequenceNumber = entry.getSequenceNumber();
    Long lastSequenceNumber = kvStore.getLastSequenceNumber(clientId);
    if (lastSequenceNumber != null && sequenceNumber <= lastSequenceNumber) {
        return;
    }
...
}
Enter fullscreen mode Exit fullscreen mode

Persistence

The final requirement is persistence. The state machine must ensure durability, meaning no data is lost in the event of a crash. While I initially tested Raft’s core components with an in-memory implementation, the system was designed around an abstraction layer to allow for disk-based persistence.

If I were to build a distributed key-value store on top of this, I’d use my custom-built embedded database as the state machine, paired with a file-backed, append-only log to serve as the durable Raft log.

With these components in place, I’ve addressed the essential elements of my Raft-based key-value store. I believe my implementation isn’t perfect and there should be far more tedious conditions and corner cases that I simply am not aware of. Numerous ways for subtle bugs.

Nonetheless, this was quite an exciting journey as it taught me areas to consider in designing a distributed systems.

Top comments (0)