We recently ran a deep audit of TitanMQ's codebase and found five bugs that ranged from "data loss on restart" to "unbounded disk growth." None of them showed up in unit tests. All of them would have been catastrophic in production.
This post walks through each bug, why it existed, and how we fixed it.
Bug 1: The Broker That Forgot Everything on Restart
Issue: #5 — CommitLog has no recovery on restart
The original CommitLog constructor looked like this:
public CommitLog(Path directory, long maxSegmentSize) throws IOException {
this.directory = directory;
this.maxSegmentSize = maxSegmentSize;
this.directory.toFile().mkdirs();
rollNewSegment(); // Always starts fresh at offset 0
}
Every time the broker restarted, it created a new segment starting at offset 0. The old segment files were still on disk, but the broker had no idea they existed. Consumers would see an empty topic. Producers would get offset 0 for their next message. Months of data, invisible.
The Fix
On startup, scan the data directory for existing .log files, sort them by base offset (the filename is the zero-padded offset), and recover state from each one:
private void recover() throws IOException {
List<Path> segmentFiles = new ArrayList<>();
if (Files.exists(directory)) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(directory, "*.log")) {
for (Path path : stream) {
segmentFiles.add(path);
}
}
}
if (segmentFiles.isEmpty()) {
rollNewSegment();
return;
}
segmentFiles.sort(Comparator.comparing(p -> p.getFileName().toString()));
for (Path segFile : segmentFiles) {
String fileName = segFile.getFileName().toString();
long baseOffset = Long.parseLong(fileName.replace(".log", ""));
LogSegment segment = new LogSegment(directory, baseOffset, maxSegmentSize, flushPolicy);
segment.recover();
segments.add(segment);
}
LogSegment lastSegment = segments.getLast();
if (lastSegment.endOffset() >= 0) {
nextOffset.set(lastSegment.endOffset() + 1);
} else {
nextOffset.set(lastSegment.baseOffset());
}
activeSegment = lastSegment;
}
Each LogSegment also needed a recover() method that scans the file entry by entry, rebuilds the in-memory index, and handles the edge case where the broker crashed mid-write (a partial entry at the end of the file):
void recover() throws IOException {
long fileSize = channel.size();
long pos = 0;
index.clear();
while (pos + ENTRY_HEADER_SIZE <= fileSize) {
ByteBuffer header = ByteBuffer.allocate(ENTRY_HEADER_SIZE);
int bytesRead = channel.read(header, pos);
if (bytesRead < ENTRY_HEADER_SIZE) break;
header.flip();
long offset = header.getLong();
int messageSize = header.getInt();
if (messageSize <= 0 || pos + ENTRY_HEADER_SIZE + messageSize > fileSize) {
// Partial write from crash — truncate here
channel.truncate(pos);
break;
}
index.add(new OffsetIndex(offset, pos));
endOffset = offset;
pos += ENTRY_HEADER_SIZE + messageSize;
}
currentPosition = pos;
}
The truncation of partial entries is important. If the broker crashed between writing the header and the payload, we'd have a valid-looking header pointing to garbage data. By checking pos + ENTRY_HEADER_SIZE + messageSize > fileSize, we detect this and truncate the file to the last complete entry.
Bug 2: Messages That Survived the JVM but Not the OS
Issue: #6 — No fsync, messages can be lost on OS crash
The original LogSegment.append() called channel.write() and moved on. Java's FileChannel.write() writes to the OS page cache, not to disk. If the JVM crashes, the OS will eventually flush the pages. But if the OS crashes (kernel panic, power loss), those pages are gone.
This is the classic fsync trade-off that every storage system has to make. Kafka defaults to relying on the OS page cache (no explicit fsync) and compensates with replication. But for a single-node deployment or when you need strong durability guarantees, you need fsync.
The Fix
We introduced a FlushPolicy enum with three options:
public enum FlushPolicy {
EVERY_MESSAGE, // fsync after each append — safest, slowest
PERIODIC, // fsync every N milliseconds — good balance
NONE // rely on OS page cache — fastest, least safe
}
In LogSegment.append():
public synchronized void append(long offset, ByteBuffer data) throws IOException {
// ... write header and data ...
if (flushPolicy == FlushPolicy.EVERY_MESSAGE) {
channel.force(false);
}
}
For PERIODIC mode, CommitLog runs a background thread:
if (flushPolicy == FlushPolicy.PERIODIC && flushIntervalMs > 0) {
scheduler.scheduleAtFixedRate(this::periodicFlush,
flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
}
private void periodicFlush() {
try {
if (activeSegment != null) {
activeSegment.flush();
}
} catch (IOException e) {
log.error("Periodic flush failed", e);
}
}
The channel.force(false) call is fdatasync — it flushes data but not file metadata (size, timestamps). This is faster than force(true) (fsync) and sufficient for our use case since we track file position in memory.
The performance impact is significant. On a typical SSD, EVERY_MESSAGE reduces throughput by roughly 10x compared to NONE. PERIODIC with a 1-second interval is a good middle ground: you lose at most 1 second of data on an OS crash, but throughput stays close to NONE.
Bug 3: The Raft Cluster That Couldn't Remember Its Own Term
Issue: #7 — Raft log is in-memory only
This was the scariest bug. The entire Raft state — currentTerm, votedFor, and all log entries — was stored in an ArrayList in memory. When a broker restarted, it came back as a blank slate at term 0 with an empty log.
The Raft paper (§5.2) is explicit: currentTerm and votedFor must be persisted to stable storage before responding to any RPC. Without this:
- A node could vote for candidate A, restart, then vote for candidate B in the same term — violating election safety
- Committed entries would vanish, violating state machine safety
- A restarted node would start an election at term 0, potentially disrupting a healthy cluster
The Fix
We created RaftPersistence with two files:
{dataDir}/raft-meta — currentTerm (8B) + votedFor
{dataDir}/raft-log — sequential log entries
The meta file uses atomic write-then-rename to prevent corruption:
public void saveMeta(long currentTerm, String votedFor) throws IOException {
byte[] votedForBytes = votedFor != null ? votedFor.getBytes() : new byte[0];
ByteBuffer buf = ByteBuffer.allocate(8 + 4 + votedForBytes.length);
buf.putLong(currentTerm);
buf.putInt(votedForBytes.length);
if (votedForBytes.length > 0) buf.put(votedForBytes);
buf.flip();
Path tmpFile = metaFile.resolveSibling("raft-meta.tmp");
try (FileChannel ch = FileChannel.open(tmpFile,
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)) {
ch.write(buf);
ch.force(true); // fsync before rename
}
Files.move(tmpFile, metaFile,
StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
}
The write-to-temp-then-rename pattern is critical. If we wrote directly to raft-meta and crashed mid-write, we'd have a corrupted file. With the rename approach, the file is either the old version or the new version — never a partial write.
Every place in RaftNode where currentTerm or votedFor changes now calls persistMeta() before proceeding:
-
startElection()— increments term, votes for self -
stepDown()— updates term, clears votedFor -
handleVoteRequest()— may update term and grant vote
On startup, RaftNode.start() loads the persisted state before joining the cluster:
public void start() {
if (persistence != null) {
RaftPersistence.MetaState meta = persistence.loadMeta();
if (meta != null) {
currentTerm.set(meta.currentTerm());
votedFor = meta.votedFor();
}
List<RaftLog.LogEntry> entries = persistence.loadLogEntries();
for (RaftLog.LogEntry entry : entries) {
raftLog.append(entry.term(), entry.command());
}
}
// ... start election timer ...
}
Bug 4: The Lock-Free Counter That Wasn't
Issue: #8 — BackPressureController race condition
The original tryAcquire():
public boolean tryAcquire() {
long current = inFlightCount.incrementAndGet(); // Already incremented!
if (current > highWaterMark) {
inFlightCount.decrementAndGet(); // Undo
return false;
}
return true;
}
The problem: incrementAndGet() is atomic, but the increment happens before the check. If 10 threads call tryAcquire() simultaneously when the count is at highWaterMark - 1, all 10 will increment the counter (pushing it to highWaterMark + 9), then all 10 will see current > highWaterMark and decrement. The counter ends up correct, but for a brief moment it exceeded the watermark — and during that moment, memory pressure could spike.
More importantly, the pattern is just wrong. The intent is "only increment if below the limit," but the code increments unconditionally and then checks.
The Fix
A proper CAS (Compare-And-Swap) loop:
public boolean tryAcquire() {
while (true) {
long current = inFlightCount.get();
if (current >= highWaterMark) {
if (!throttled) {
throttled = true;
log.warn("Back-pressure activated: in-flight={}, highWaterMark={}",
current, highWaterMark);
}
return false;
}
if (inFlightCount.compareAndSet(current, current + 1)) {
return true;
}
// CAS failed — another thread modified the count, retry
}
}
This guarantees the counter never exceeds highWaterMark. The compareAndSet only succeeds if no other thread has modified the value since we read it. If it fails, we re-read and try again. Under low contention, this completes in one iteration. Under high contention, it may take a few retries, but correctness is guaranteed.
The performance difference is negligible — both approaches are lock-free and operate on a single AtomicLong. But the CAS version is correct.
Bug 5: The Disk That Never Stopped Growing
Issue: #9 — No segment cleanup
The original CommitLog created new segments when old ones filled up, but never deleted anything. On a broker processing 100K messages/second with 1KB messages, that's roughly 100MB per second, 8.6TB per day. Without cleanup, the broker would fill any disk in hours.
The Fix
Two retention policies, configurable independently:
// Delete segments older than 7 days
commitLog.retentionTime(Duration.ofDays(7));
// Or: keep total size under 100GB
commitLog.retentionBytes(100L * 1024 * 1024 * 1024);
The cleanup runs on a background thread every 60 seconds:
synchronized void cleanupExpiredSegments() {
if (segments.size() <= 1) return; // Never delete the active segment
List<LogSegment> toDelete = new ArrayList<>();
// Time-based
if (retentionMs > 0) {
for (int i = 0; i < segments.size() - 1; i++) {
LogSegment seg = segments.get(i);
long lastModified = Files.getLastModifiedTime(seg.filePath()).toMillis();
if (System.currentTimeMillis() - lastModified > retentionMs) {
toDelete.add(seg);
}
}
}
// Size-based
if (retentionBytes > 0) {
long totalSize = segments.stream().mapToLong(LogSegment::size).sum();
for (int i = 0; i < segments.size() - 1 && totalSize > retentionBytes; i++) {
LogSegment seg = segments.get(i);
if (!toDelete.contains(seg)) toDelete.add(seg);
totalSize -= seg.size();
}
}
for (LogSegment seg : toDelete) {
seg.delete();
segments.remove(seg);
}
}
The key safety invariant: we never delete the last segment (the active one). The loop runs i < segments.size() - 1 to enforce this. Even if both retention policies say "delete everything," the active segment survives.
LogSegment.delete() closes the file handles before deleting:
public void delete() throws IOException {
close();
if (!filePath.toFile().delete()) {
log.warn("Failed to delete segment file {}", filePath);
}
}
What We Learned
These five bugs share a common theme: they all involve state that lives beyond a single JVM process lifetime. In-memory data structures are easy to reason about, but the moment you need durability, recovery, or bounded resource usage, you need explicit code to handle it.
The fixes added roughly 500 lines of code across 7 files. None of them changed the public API. All of them were invisible to users — until the broker crashed, restarted, or ran for more than a few hours.
If you're building a storage system, here's the checklist we wish we'd had from day one:
- What happens on restart? If your constructor creates fresh state, you've lost everything.
-
What happens on OS crash? If you're not calling
fsync, your data is in the page cache, not on disk. - What happens after a week? If you're not deleting old data, you're filling the disk.
- What happens under contention? If your "atomic" operation has a check-then-act gap, it's not atomic.
-
What state must survive a restart? If it's in a
volatilefield, it won't.
All five fixes are in commit ab0578d. Issues #5, #6, #7, #8, #9 are closed.
Top comments (0)