DEV Community

Cover image for How to Recover Kafka DLQ Messages After a Schema Change Broke Your Consumer
Mohammed Saifulhuq
Mohammed Saifulhuq

Posted on

How to Recover Kafka DLQ Messages After a Schema Change Broke Your Consumer

The Problem Nobody Has a Tool For

Here is the scenario that wakes engineers up at 2am.

Your payment service has been running fine for months. The consumer reads events from a Kafka topic, processes them, sends confirmations. The status field is a plain String - "pending", "failed", "done".

A bug is discovered: the status field accepts invalid values like "ok" and "finished" from different producer teams, causing downstream analytics to break. The correct fix is converting status from String to a strict Enum with only four valid values: PENDING, PROCESSING, COMPLETED, FAILED.

The fix gets deployed at 9:45 PM.

Nobody checked the DLQ first.

There were 23,000 messages sitting in payments.dlq from an earlier consumer failure that afternoon. The new V2 consumer starts processing them. Each one crashes immediately:

InvalidFormatException: Cannot deserialize value of type `PaymentStatus` 
from String "pending": not one of the values accepted for Enum class: 
[PENDING, PROCESSING, COMPLETED, FAILED]
Enter fullscreen mode Exit fullscreen mode

All 23,000 messages are now permanently stuck. The V2 consumer cannot read them. They cannot be redriven as-is. PagerDuty fires at 2:00 AM.


Why Schema Registry Doesn't Help Here

The first suggestion you'll get is "just use Schema Registry."

Schema Registry is a seatbelt. It prevents new schema-incompatible events from being produced. It does nothing for messages already sitting in your DLQ from before the schema change.

Those messages are stored as bytes in Kafka. Schema Registry has no knowledge of them. It cannot transform them. Cannot validate them. Cannot redrive them.

The 23,000 stuck messages are entirely your problem.


What Engineers Actually Do (The Manual Reality)

I posted this exact scenario on r/apachekafka and asked how senior teams handle it. Four experienced engineers replied. Not one named a tool.

Their answers:

BroBroMate (Senior Engineer): "Something I've done in the past is just throw a quick Kafka Streams app up to do mass transformations... And easy to unit test before rolling out."

KTCrisis: "You could also spin up a v2 topic for the new consumers and keep a specific v1 consumer around just to drain the DLQ. But it adds a new topic and a dedicated consumer for a one-shot issue."

Every solution described is either:

  • Building a temporary application from scratch, using it once, deleting it
  • Keeping zombie infrastructure alive until the queue drains
  • Writing a throwaway Python script with no tests, no audit trail, no validation

The time cost: 3-8 hours of a senior engineer's time. At 2am. Every single incident.


The Gap No Existing Tool Fills

I looked at every existing solution:

DLQMan (irori-ab) - routes messages based on exception type headers. No transformation engine. If your messages have the wrong data format, DLQMan cannot help.

Confluent Control Center - has basic redrive. No schema transformation. Requires full Confluent Enterprise stack.

kafka-rewind-tools - handles the redrive part. You still need to write the mutation yourself.

Custom Kafka Streams apps - what everyone builds from scratch, uses once, and throws away.

The gap: no tool handles the transformation step between "message has wrong format" and "message is safe to redrive".


Building DLQ Revive

I spent 5 weeks building the tool that should exist.

DLQ Revive is an open-source Kafka Dead Letter Queue mutation and redrive engine. Here is what it does and the architectural decisions behind each feature.

1. Browse DLQ Messages Safely

// WRONG - what most people do
consumer.subscribe(List.of("payments.dlq"));  // joins consumer group!

// CORRECT - what DLQ Revive does
consumer.assign(List.of(new TopicPartition("payments.dlq", 0)));
consumer.seek(new TopicPartition("payments.dlq", 0), fromOffset);
Enter fullscreen mode Exit fullscreen mode

Using subscribe() joins your application to Kafka's consumer group. Kafka can then trigger a rebalance and assign partitions to your read-only viewer - stealing them from your production consumer. Your 2am debugging tool accidentally takes down your production pipeline.

assign() + seek() bypasses group membership entirely. You read exactly what you want. The production consumer is untouched. DLQ Revive never calls commitSync() in view mode.

Reads are paginated at max 100 messages per API call. No full topic loads. No JVM OutOfMemoryError on topics with 500,000 stuck messages.

2. Transform Schema With JSONata

The transformation step is what makes DLQ recovery possible without writing custom code.

DLQ Revive uses JSONata - a declarative JSON-to-JSON mapping language. To fix the String to Enum problem from our scenario:

{
  "orderId": orderId,
  "amount": amount,
  "currency": currency,
  "status": $uppercase(status),
  "processedAt": $now()
}
Enter fullscreen mode Exit fullscreen mode

One expression. Applies to all 23,000 messages. Preview 5 samples before committing. See the before and after side by side.

Why not Groovy? User-submitted Groovy on your backend can call Runtime.getRuntime().exec("rm -rf /"). It is an RCE vulnerability built into the product if you allow it. JSONata is purely declarative - it has no access to the file system, network, or system commands. Zero RCE surface.

3. Validate Before Redriving

BroBroMate's advice from that Reddit thread was exactly right:

"Whatever way you do it, validate every 'fixed' record against an agreed good schema before producing it - there's nothing worse than dumping another X thousand bad messages on the DLQ."

DLQ Revive validates each transformed message against the target schema before producing. Failed validation shows per-message errors. You see exactly which messages will fail before redriving a single one.

4. Idempotency Guard - Pod-Restart Safe

-- Before producing any message:
SELECT 1 FROM redrive_log 
WHERE topic = ? AND partition = ? AND offset = ?

-- If not found, insert THEN produce:
INSERT INTO redrive_log (topic, partition, offset, redriven_at, redriven_by)
VALUES (?, ?, ?, NOW(), ?)
Enter fullscreen mode Exit fullscreen mode

The redrive_log table has a UNIQUE(topic, partition, offset) constraint.

If your application crashes halfway through redriving 10,000 messages and restarts, it will not reprocess the messages it already produced. In a fintech payment pipeline, that is the difference between a successful recovery and double-charging 5,000 customers.

5. Full Audit Trail

Every browse and redrive action is logged to PostgreSQL with timestamp, user, session ID, message count, and source/target topics. For teams in regulated industries (PCI-DSS, SOC 2), this is the compliance evidence that a manual throwaway script can never provide.


Quick Start

git clone https://github.com/Saifulhuq01/dlq-revive.git
cd dlq-revive
docker compose -f docker/docker-compose.yml up -d
# Open http://localhost:4200
Enter fullscreen mode Exit fullscreen mode

That starts Kafka, Zookeeper, PostgreSQL, the Spring Boot backend, and the Angular dashboard together.

Stack: Java 17, Spring Boot 3.x, Apache Kafka 3.x, Angular 13+, PostgreSQL 15, JSONata, Docker Compose, GitHub Actions CI.


The Kafka Consumer Safety Details

A few things worth understanding if you are building Kafka tooling:

Why assign() instead of subscribe():
subscribe() participates in consumer group coordination. Kafka's group coordinator can reassign partitions at any time during a rebalance. For a read-only tool this is dangerous - you don't want Kafka moving partitions between your tool and the production consumer mid-operation.

assign() with seek() gives you direct partition access. No group membership. No rebalance risk. The offset you seek to is exactly where you start reading. You consume exactly limit records and stop.

Why pagination at the Kafka offset level:
Standard REST pagination (page 1, page 2) doesn't work well for Kafka topics because the "next page" needs to know the exact offset where the previous page ended. DLQ Revive uses cursor-based pagination: fromOffset=0&limit=50 returns messages at offsets 0-49. The next call uses fromOffset=50. This is memory-safe regardless of topic size.

Why no commitSync() in view mode:
Committing offsets in view mode would change the consumer group's position - affecting what the production consumer reads next if they share a group ID. DLQ Revive uses a unique, isolated consumer group and never commits in view mode.


What's Next

The open-source core handles:

  • Paginated DLQ browsing
  • JSONata schema transformation
  • Preview before redriving
  • Idempotency-safe redrive
  • Full PostgreSQL audit trail
  • One-command Docker setup

Free tier limit: 100 messages per redrive session. This is enough for testing and small incidents. Bulk redrive (>100 messages) and team features are in the cloud tier.

GitHub: github.com/Saifulhuq01/dlq-revive

MIT licensed. If you've dealt with DLQ schema recovery at work and want to try it against a real scenario, I'd genuinely value your feedback. Especially on the JSONata approach and whether the safety guarantees hold up under your specific Kafka setup.


Mohammed Saifulhuq - Apache Fineract contributor, building DLQ Revive

Top comments (0)