Problem
Have you faced a situation where a lot of messages went to the dead-letter queue in azure service bus and then you need to clean it? You can easily go to azure portal and peek at messages and clear them. But if you have more than 20k messages, maybe it becomes painful to select one by one and clear. Other than that, it is important to understand why messages went there for troubleshooting reasons and help you to fix any issue that your application may be experiencing. What to do in such a situation ?
Thoughts
A high number of messages in the dead-letter queue is a clear indication that your application is not able to consume them and usually they go there after the max attempts are reached. This includes uncaught exceptions, TTL expiration, serialization and deserialization issues, and so on. We must consider the impacts if business operations rely on these messages because if we do not have enough monitoring this can bring silent failures like: user operations not completed, payment confirmation failing causing dissatisfaction or financial loss. Retained messages consume storage and can contribute to increase costs.
Consolidation of Thoughts
Instead of just cleaning the dead-letter queue we should understand why messages got there in order to fix silent issues that may exist in our application. With that being said, how about generating code that connects to your queue extract the errors details to files and then mark the messages as consumed?
Solution
const { ServiceBusClient } = require("@azure/service-bus");
const fs = require('fs').promises;
const path = require('path');
const dotenv = require('dotenv');
dotenv.config();
class DeadLetterQueueExtractor {
constructor(connectionString, queueName) {
this.connectionString = connectionString;
this.queueName = queueName;
this.client = new ServiceBusClient(connectionString);
this.receiver = null;
}
async initialize() {
try {
// Create receiver for dead-letter sub-queue
this.receiver = this.client.createReceiver(this.queueName, {
subQueueType: "deadLetter"
});
console.log(`✅ Connected to dead-letter queue: ${this.queueName}`);
} catch (error) {
console.error(`❌ Failed to initialize receiver: ${error.message}`);
throw error;
}
}
async extractMessages(maxMessages = 100, saveToFile = true) {
try {
console.log(`🔍 Extracting up to ${maxMessages} messages from dead-letter queue...`);
// Peek messages without removing them
const messages = await this.receiver.peekMessages(maxMessages);
if (messages.length === 0) {
console.log("ℹ️ No messages found in dead-letter queue");
return [];
}
console.log(`📨 Found ${messages.length} messages in dead-letter queue`);
// Process and analyze messages
const extractedMessages = messages.map((message, index) => {
return {
messageId: message.messageId,
sequenceNumber: message.sequenceNumber?.toString(),
enqueuedTimeUtc: message.enqueuedTimeUtc,
deadLetterReason: message.deadLetterReason,
deadLetterErrorDescription: message.deadLetterErrorDescription,
deliveryCount: message.deliveryCount,
timeToLive: message.timeToLive,
subject: message.subject,
contentType: message.contentType,
correlationId: message.correlationId,
sessionId: message.sessionId,
replyTo: message.replyTo,
replyToSessionId: message.replyToSessionId,
applicationProperties: message.applicationProperties,
body: this.parseMessageBody(message.body),
rawMessage: {
// Store raw message for debugging
bodyType: typeof message.body,
bodyLength: message.body?.length || 0
}
};
});
// Generate analysis report
const analysis = this.analyzeMessages(extractedMessages);
if (saveToFile) {
await this.saveToFiles(extractedMessages, analysis);
}
return {
messages: extractedMessages,
analysis: analysis
};
} catch (error) {
console.error(`❌ Error extracting messages: ${error.message}`);
throw error;
}
}
parseMessageBody(body) {
try {
if (!body) return null;
// Handle different body types
if (Buffer.isBuffer(body)) {
const bodyString = body.toString('utf8');
// Try to parse as JSON
try {
return JSON.parse(bodyString);
} catch {
return bodyString;
}
} else if (typeof body === 'string') {
try {
return JSON.parse(body);
} catch {
return body;
}
} else if (typeof body === 'object') {
return body;
}
return body;
} catch (error) {
console.warn(`⚠️ Failed to parse message body: ${error.message}`);
return body?.toString() || null;
}
}
analyzeMessages(messages) {
const analysis = {
totalMessages: messages.length,
deadLetterReasons: {},
deliveryCountStats: {},
timeDistribution: {},
contentTypes: {},
correlationIds: new Set(),
sessionIds: new Set(),
oldestMessage: null,
newestMessage: null,
averageDeliveryCount: 0,
recommendations: []
};
let totalDeliveryCount = 0;
let oldestTime = null;
let newestTime = null;
messages.forEach(message => {
// Dead letter reasons
const reason = message.deadLetterReason || 'Unknown';
analysis.deadLetterReasons[reason] = (analysis.deadLetterReasons[reason] || 0) + 1;
// Delivery count statistics
const deliveryCount = message.deliveryCount || 0;
analysis.deliveryCountStats[deliveryCount] = (analysis.deliveryCountStats[deliveryCount] || 0) + 1;
totalDeliveryCount += deliveryCount;
// Time distribution (by hour)
if (message.enqueuedTimeUtc) {
const hour = new Date(message.enqueuedTimeUtc).getHours();
analysis.timeDistribution[hour] = (analysis.timeDistribution[hour] || 0) + 1;
if (!oldestTime || message.enqueuedTimeUtc < oldestTime) {
oldestTime = message.enqueuedTimeUtc;
analysis.oldestMessage = message;
}
if (!newestTime || message.enqueuedTimeUtc > newestTime) {
newestTime = message.enqueuedTimeUtc;
analysis.newestMessage = message;
}
}
// Content types
if (message.contentType) {
analysis.contentTypes[message.contentType] = (analysis.contentTypes[message.contentType] || 0) + 1;
}
// Correlation IDs
if (message.correlationId) {
analysis.correlationIds.add(message.correlationId);
}
// Session IDs
if (message.sessionId) {
analysis.sessionIds.add(message.sessionId);
}
});
analysis.averageDeliveryCount = totalDeliveryCount / messages.length;
analysis.correlationIds = Array.from(analysis.correlationIds);
analysis.sessionIds = Array.from(analysis.sessionIds);
// Generate recommendations
analysis.recommendations = this.generateRecommendations(analysis);
return analysis;
}
generateRecommendations(analysis) {
const recommendations = [];
// Check for common issues
const maxDeliveryExceeded = analysis.deadLetterReasons['MaxDeliveryCountExceeded'] || 0;
const ttlExpired = analysis.deadLetterReasons['TTLExpiredException'] || 0;
const sessionLockLost = analysis.deadLetterReasons['SessionLockLost'] || 0;
const messageLockLost = analysis.deadLetterReasons['MessageLockLost'] || 0;
if (maxDeliveryExceeded > 0) {
recommendations.push({
issue: "MaxDeliveryCountExceeded",
count: maxDeliveryExceeded,
recommendation: "Review receiver error handling logic. Consider increasing max delivery count or fixing processing exceptions."
});
}
if (ttlExpired > 0) {
recommendations.push({
issue: "TTLExpiredException",
count: ttlExpired,
recommendation: "Messages are expiring. Consider increasing TTL or improving processing speed."
});
}
if (sessionLockLost > 0) {
recommendations.push({
issue: "SessionLockLost",
count: sessionLockLost,
recommendation: "Session handling issues. Review session management in receiver application."
});
}
if (messageLockLost > 0) {
recommendations.push({
issue: "MessageLockLost",
count: messageLockLost,
recommendation: "Message processing is taking too long. Consider increasing lock duration or optimizing processing logic."
});
}
if (analysis.averageDeliveryCount > 5) {
recommendations.push({
issue: "HighDeliveryCount",
averageCount: analysis.averageDeliveryCount,
recommendation: "High average delivery count indicates persistent processing failures. Review receiver application logic."
});
}
return recommendations;
}
async saveToFiles(messages, analysis) {
try {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
const outputDir = `dlq-extraction-${timestamp}`;
await fs.mkdir(outputDir, { recursive: true });
// Save detailed messages
await fs.writeFile(
path.join(outputDir, 'messages.json'),
JSON.stringify(messages, null, 2)
);
// Save analysis report
await fs.writeFile(
path.join(outputDir, 'analysis.json'),
JSON.stringify(analysis, null, 2)
);
// Save summary report
const summaryReport = this.generateSummaryReport(analysis);
await fs.writeFile(
path.join(outputDir, 'summary.txt'),
summaryReport
);
// Save CSV for easy analysis
const csvContent = this.generateCSV(messages);
await fs.writeFile(
path.join(outputDir, 'messages.csv'),
csvContent
);
console.log(`📁 Files saved to directory: ${outputDir}`);
console.log(` - messages.json: Detailed message data`);
console.log(` - analysis.json: Analysis results`);
console.log(` - summary.txt: Human-readable summary`);
console.log(` - messages.csv: CSV format for spreadsheet analysis`);
} catch (error) {
console.error(`❌ Error saving files: ${error.message}`);
}
}
generateSummaryReport(analysis) {
let report = `Dead Letter Queue Analysis Report\n`;
report += `=====================================\n\n`;
report += `Total Messages: ${analysis.totalMessages}\n`;
report += `Average Delivery Count: ${analysis.averageDeliveryCount.toFixed(2)}\n\n`;
if (analysis.oldestMessage && analysis.newestMessage) {
report += `Oldest Message: ${analysis.oldestMessage.enqueuedTimeUtc}\n`;
report += `Newest Message: ${analysis.newestMessage.enqueuedTimeUtc}\n\n`;
}
report += `Dead Letter Reasons:\n`;
report += `-------------------\n`;
Object.entries(analysis.deadLetterReasons).forEach(([reason, count]) => {
const percentage = ((count / analysis.totalMessages) * 100).toFixed(1);
report += `${reason}: ${count} (${percentage}%)\n`;
});
report += `\nDelivery Count Distribution:\n`;
report += `---------------------------\n`;
Object.entries(analysis.deliveryCountStats).forEach(([count, messages]) => {
report += `${count} deliveries: ${messages} messages\n`;
});
if (analysis.recommendations.length > 0) {
report += `\nRecommendations:\n`;
report += `---------------\n`;
analysis.recommendations.forEach((rec, index) => {
report += `${index + 1}. ${rec.issue}: ${rec.recommendation}\n`;
});
}
return report;
}
generateCSV(messages) {
const headers = [
'messageId',
'sequenceNumber',
'enqueuedTimeUtc',
'deadLetterReason',
'deadLetterErrorDescription',
'deliveryCount',
'subject',
'contentType',
'correlationId',
'sessionId'
];
let csv = headers.join(',') + '\n';
messages.forEach(message => {
const row = headers.map(header => {
const value = message[header] || '';
// Escape commas and quotes in CSV
const escapedValue = String(value).replace(/"/g, '""');
return `"${escapedValue}"`;
});
csv += row.join(',') + '\n';
});
return csv;
}
async close() {
try {
if (this.receiver) {
await this.receiver.close();
}
await this.client.close();
console.log("✅ Connection closed successfully");
} catch (error) {
console.error(`❌ Error closing connection: ${error.message}`);
}
}
}
// Usage example and main execution
async function main() {
// Configuration
const config = {
connectionString: process.env.AZURE_SERVICEBUS_CONNECTION_STRING || "YOUR_CONNECTION_STRING_HERE",
queueName: process.env.QUEUE_NAME || "your-queue-name",
maxMessages: parseInt(process.env.MAX_MESSAGES) || 100
};
// Validate configuration
if (!config.connectionString || config.connectionString === "YOUR_CONNECTION_STRING_HERE") {
console.error("❌ Please set AZURE_SERVICEBUS_CONNECTION_STRING environment variable or update the config");
process.exit(1);
}
if (!config.queueName || config.queueName === "your-queue-name") {
console.error("❌ Please set QUEUE_NAME environment variable or update the config");
process.exit(1);
}
console.log("🚀 Starting Dead Letter Queue extraction...");
console.log(`Queue: ${config.queueName}`);
console.log(`Max Messages: ${config.maxMessages}`);
const extractor = new DeadLetterQueueExtractor(config.connectionString, config.queueName);
try {
await extractor.initialize();
const result = await extractor.extractMessages(config.maxMessages, false);
console.log("\n📊 Analysis Summary:");
console.log(`Total Messages: ${result.analysis.totalMessages}`);
console.log(`Average Delivery Count: ${result.analysis.averageDeliveryCount.toFixed(2)}`);
console.log("\n🔍 Dead Letter Reasons:");
Object.entries(result.analysis.deadLetterReasons).forEach(([reason, count]) => {
const percentage = ((count / result.analysis.totalMessages) * 100).toFixed(1);
console.log(` ${reason}: ${count} (${percentage}%)`);
});
if (result.analysis.recommendations.length > 0) {
console.log("\n💡 Recommendations:");
result.analysis.recommendations.forEach((rec, index) => {
console.log(` ${index + 1}. ${rec.issue}: ${rec.recommendation}`);
});
}
//Now clear the dead-letter queue
console.log("\n🗑️ Clearing dead-letter queue...")
const clearReceiver = extractor.client.createReceiver(config.queueName, {
subQueueType: "deadLetter",
receiveMode: "receiveAndDelete"
});
//We are already receiving and deleting messages in the extractMessages method, so we can just use receiveAndDelete mode here
const messagesToClear = await clearReceiver.receiveMessages(config.maxMessages);
if (messagesToClear.length > 0) {
console.log(`✅ Cleared ${messagesToClear.length} messages from dead-letter queue`);
} else {
console.log("ℹ️ No messages to clear from dead-letter queue");
}
} catch (error) {
console.error(`❌ Extraction failed: ${error.message}`);
process.exit(1);
} finally {
await extractor.close();
}
}
// Export for use as module
module.exports = DeadLetterQueueExtractor;
// Run if called directly
if (require.main === module) {
main().catch(console.error);
}
This script is designed to help developers analyze and manage messages in the Azure Service Bus Dead-Letter Queue (DLQ).
What does this code do?
Connects to the Azure Service Bus DLQ for a specified queue.
Extracts and analyzes messages without removing them, providing insights such as:
Why messages were dead-lettered (dead-letter reasons)
Delivery count statistics
Message content and metadata
Generates reports in multiple formats (JSON, CSV, and human-readable summaries) to help you understand and troubleshoot issues.
Optionally clears the DLQ by removing all messages, making it easier to reset and monitor new issues.
Why is this helpful?
It gives you visibility into what’s going wrong in your message processing pipeline.
You can quickly spot patterns (like repeated errors or expired messages) and get actionable recommendations.
Beginners and experienced developers alike can use this tool to maintain healthy messaging systems and improve reliability.
In short:
This script is your assistant for diagnosing, reporting, and cleaning up the Azure Service Bus Dead-Letter Queue making message troubleshooting easier and more transparent for everyone.
Top comments (0)