Step 1: MongoDB Cursor
Here’s how we set up the cursor (reusing your snippet):
const cursor =
userObject?.data?.serviceProviderName === 'ZYRO'
? zyroTransactionModel.find(query).cursor()
: finoTransactionModel.find(query).cursor();
console.log("Cursor created successfully");
Step 2: Setting Up the ZIP File
Use the yazl library to stream CSV data into a ZIP file:
const yazl = require('yazl');
const zipfile = new yazl.ZipFile();
reply.raw.writeHead(200, {
"Content-Type": "application/zip",
"Content-Disposition": "attachment; filename=transactions.zip",
});
zipfile.outputStream.pipe(reply.raw);
const cleanup = async () => {
console.log("Cleaning up resources...");
zipfile.end(); // Finalize ZIP
await cursor.close();
};
reply.raw.on("close", cleanup);
reply.raw.on("error", cleanup);
Step 3: Creating Dynamic CSV Streams
Generate CSV data dynamically and stream it into the ZIP file:
const createNewCSVStream = (headers) => {
const csvStream = new Readable({ read() {} });
csvStream.push(headers.join(",") + "\n"); // Add headers
return csvStream;
};
const filteredHeaders = getHeaders(transactionDownloadFields, userObject?.state?.auth?.role);
const currentCSVStream = createNewCSVStream(filteredHeaders);
zipfile.addReadStream(currentCSVStream, "transactions_part_1.csv");
Step 4: Streaming MongoDB Data to CSV
Stream the data from MongoDB directly into the CSV:
cursor.on('data', (doc) => {
const csvRow = filteredHeaders.map(header => doc[header.key] || '').join(',');
currentCSVStream.push(csvRow + '\n'); // Write row
});
cursor.on('end', () => {
currentCSVStream.push(null); // End the stream
zipfile.end(); // Finalize the ZIP
});
Step 5: Processing Data from MongoDB Cursor
Stream documents from the MongoDB cursor, transform them as needed, and dynamically write rows to the CSV stream:
try {
for await (const doc of cursor) {
if (clientDisconnected) {
console.log("Client disconnected. Stopping processing...");
break;
}
streamedCount++;
rowCount++;
let row = "";
const filteredHeaders = getHeaders(
transactionDownloadFields,
userObject?.state?.auth?.role
);
for (let i = 0; i < filteredHeaders.length; i++) {
const field = filteredHeaders[i];
// Fetch the corresponding field configuration from transactionDownloadFields
const originalField = transactionDownloadFields.find((f) => f.value === field.value);
// Get the value from the transaction document
let value = getValueFromTransaction(doc, field.value);
// Apply transformation if the field has a transform function
if (originalField?.transform) {
value = originalField.transform(value);
}
// Enclose the value in double quotes
value = value !== undefined ? `"${value}"` : '"N/A"';
row += (i > 0 ? "," : "") + value;
}
row += "\n";
currentCSVStream.push(row);
// Check if the row count has reached the threshold for the current CSV file
if (rowCount >= MAX_ROWS_PER_FILE) {
console.log(`Threshold reached for file ${fileIndex - 1}. Starting new file...`);
currentCSVStream.push(null); // End the current CSV stream
currentCSVStream = createNewCSVStream(); // Start a new stream
rowCount = 0; // Reset the row count
}
}
// Finalize the current CSV stream if it has data
if (currentCSVStream) {
currentCSVStream.push(null);
}
// Finalize the ZIP file
zipfile.end();
console.log(`Successfully streamed ${streamedCount} rows across ${fileIndex - 1} files.`);
} catch (error) {
console.error("Error during processing:", error);
if (!headersSent) reply.status(500).send({ error: "Failed to generate ZIP file" });
} finally {
// Cleanup: Close the MongoDB cursor
await cursor.close().catch((err) => console.error("Error closing cursor:", err));
}
Summary
Document Iteration Using for await...of:
Streams documents one by one from the MongoDB cursor efficiently.
Enables real-time processing without loading all data into memory.
- Dynamic CSV Row Generation:
Constructs each row dynamically by iterating over filteredHeaders.
Applies transformations using a transform function, if defined in transactionDownloadFields.
Row Threshold and File Splitting:
Monitors the row count against the threshold (MAX_ROWS_PER_FILE).
Ends the current CSV stream and starts a new one when the threshold is reached.
- Error Handling:
Logs and sends an error response if an issue occurs during processing.
Ensures proper cleanup by closing the MongoDB cursor in the finally block.
- Finalizing Streams:
Pushes null to terminate the current CSV stream.
Completes the ZIP file once all rows are processed.
Top comments (0)