In distributed systems, ensuring data consistency across multiple concurrent processes is critical. When dealing with sensitive operations like account withdrawals, race conditions can result in corrupted data or financial discrepancies. This article explains how to use mutexes (mutual exclusion locks) with RabbitMQ and Node.js to safely handle concurrent operations while maintaining efficient memory management.
The Problem
Imagine a scenario where multiple RabbitMQ workers process withdrawal requests from the same account simultaneously. Without proper synchronization, two workers might read the same initial balance, subtract different amounts, and then update the account, leading to an incorrect balance.
Race condition example:
- Initial balance: $1000
- Worker A reads the balance: $1000
- Worker B reads the balance: $1000
- Worker A subtracts $300 and updates the balance: $700
- Worker B subtracts $200 and updates the balance: $800 (overwriting Worker A’s update)
Final balance: $800 (should have been $500).
Solution: Using Mutexes
A mutex ensures that only one worker can process operations on a specific account at any time. Here’s how we can implement a mutex-based solution in a RabbitMQ-driven Node.js system.
Step-by-Step Implementation
- Prerequisites
Install the necessary packages:
npm install async-mutex amqplib
- async-mutex: Provides an easy-to-use mutex for asynchronous operations.
- amqplib: Enables interaction with RabbitMQ.
- Code Implementation
Below is the complete implementation, including robust memory management for mutex cleanup.
a. Mutex Management
We use a Map to maintain mutexes per account. Mutexes are created on demand and removed when they are no longer needed.
b. Memory Optimization
To avoid memory bloat, we implement:
- Idle Timeouts: Automatically remove mutexes for inactive accounts after 5 minutes.
- Periodic Cleanup: A background process ensures stale mutexes are removed every 1 minute.
c. Full Implementation
const { Mutex } = require('async-mutex');
const amqp = require('amqplib');
// Mutex for account operations with automatic cleanup
const accountMutexes = new Map(); // Store one mutex per account
const accountTimeouts = new Map(); // Store timeout references for cleanup
const CLEANUP_INTERVAL_MS = 60000; // 1-minute cleanup interval
const IDLE_TIMEOUT_MS = 300000; // 5-minute idle timeout per account
// Function to get or create a mutex for a specific account
function getAccountMutex(accountId) {
if (!accountMutexes.has(accountId)) {
const mutex = new Mutex();
accountMutexes.set(accountId, mutex);
resetAccountTimeout(accountId); // Start idle timeout cleanup
}
return accountMutexes.get(accountId);
}
// Function to reset idle timeout for an account
function resetAccountTimeout(accountId) {
if (accountTimeouts.has(accountId)) {
clearTimeout(accountTimeouts.get(accountId));
}
const timeout = setTimeout(() => {
accountMutexes.delete(accountId);
accountTimeouts.delete(accountId);
console.log(`Mutex for account ${accountId} removed due to inactivity.`);
}, IDLE_TIMEOUT_MS);
accountTimeouts.set(accountId, timeout);
}
// Periodic cleanup process
function startPeriodicCleanup() {
setInterval(() => {
accountTimeouts.forEach((_, accountId) => {
if (!accountMutexes.has(accountId)) {
accountTimeouts.delete(accountId);
}
});
}, CLEANUP_INTERVAL_MS);
console.log(`Periodic cleanup started: checking every ${CLEANUP_INTERVAL_MS / 1000} seconds.`);
}
// Simulated database of accounts
const accounts = {
"123": { balance: 1000 },
"456": { balance: 2000 },
};
// Process withdrawal
async function processWithdrawal(accountId, amount) {
const mutex = getAccountMutex(accountId);
const release = await mutex.acquire();
try {
console.log(`Processing withdrawal for account ${accountId}`);
const account = accounts[accountId];
if (!account) {
throw new Error('Account not found');
}
if (account.balance < amount) {
throw new Error('Insufficient funds');
}
account.balance -= amount;
console.log(`Withdrawal successful! New balance for account ${accountId}: ${account.balance}`);
} catch (error) {
console.error(`Error processing withdrawal for account ${accountId}:`, error.message);
} finally {
release();
resetAccountTimeout(accountId);
}
}
// RabbitMQ message handler
async function handleMessage(message) {
const { accountId, amount } = JSON.parse(message.content.toString());
await processWithdrawal(accountId, amount);
}
// Connect to RabbitMQ and consume messages
(async () => {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queueName = 'withdrawals';
await channel.assertQueue(queueName, { durable: true });
console.log(`Waiting for messages in queue: ${queueName}`);
channel.consume(queueName, async (msg) => {
if (msg) {
await handleMessage(msg);
channel.ack(msg); // Acknowledge message after processing
}
});
startPeriodicCleanup(); // Start periodic cleanup
})();
How It Works
- Account-Specific Mutex:
- Each account has its own mutex (accountMutexes), allowing safe concurrency for different accounts.
- Mutexes are dynamically created when accessed for the first time.
- Critical Section:
- The processWithdrawal function locks the mutex to ensure that only one worker can modify the account’s balance.
- Memory Management:
- Idle Timeout: Removes mutexes after 5 minutes of inactivity.
- Periodic Cleanup: A background process runs every minute to clean up stale or unreferenced mutexes.
Advantages
- Race Condition Prevention:
- Ensures that only one worker processes withdrawals for a given account at a time.
- Efficient Memory Management:
- Automatically removes mutexes for inactive accounts, preventing memory bloat.
- High Throughput:
- Concurrent processing of different accounts is unaffected, maintaining system scalability.
- Robust Error Handling:
- Proper handling of account errors and lock release in the finally block ensures the system stays consistent.
Sample Output
Input Queue Messages:
{ "accountId": "123", "amount": 100 }
{ "accountId": "456", "amount": 200 }
{ "accountId": "123", "amount": 300 }
Console Output:
Waiting for messages in queue: withdrawals
Periodic cleanup started: checking every 60 seconds.
Processing withdrawal for account 123
Withdrawal successful! New balance for account 123: 900
Processing withdrawal for account 456
Withdrawal successful! New balance for account 456: 1800
Processing withdrawal for account 123
Withdrawal successful! New balance for account 123: 600
Mutex for account 123 removed due to inactivity.
Mutex for account 456 removed due to inactivity.
Conclusion
By combining mutexes with RabbitMQ, you can safely handle concurrent operations in Node.js systems.
Adding idle timeouts and periodic cleanup ensures efficient memory management, making this solution scalable and robust for real-world use cases.
Top comments (0)