DEV Community

anandsunderraman
anandsunderraman

Posted on • Edited on

6

Copying over data from MongoDB to S3

Copying over data from MongoDB to S3

Very recently we were tasked with copying over data from our MongoDB DB to an S3 bucket.
Since the timelines were tight our immediate solution to this was deploy a lambda that will run once a day, query data from MongoDB and copy it to s3.

We sized up the data to be around 600k records. It did not seem like a lot and we were confident of achieving the same.

Long story short this turned out to be a bigger task than we thought and we ran into multiple problems.

I would like to talk about the problems we faced at each stage and how we improvised and finally arrived at a working solution.

At the end of the process I learnt a lot but I learnt that I have lots more to learn.

Ok getting down to details.

Tech Stack

AWS Lambda on Node.js 12.x

First Attempt

Our first attempt was a brute force attempt in hindsight.

The approach was:

  1. Query the collection asynchronously in batches of 100k
  2. Do a Promise.all on all the batches of queries
  3. Concatenate the results array
  4. Write the data to a s3 file

Outcome:

Since we tried to load all the 600k records into a string to put an object into s3 we ran out of memory even after allocating the maximum permissible memory 3008MB

Code:

const MongoClient = require('mongodb').MongoClient;
let s3Client = require('aws-sdk/clients/s3');
//brute force method loading all the data into an array
exports.copyData = async (event, context) => {
//this is required for node js mongodb connection pooling
context.callbackWaitsForEmptyEventLoop = false;
let dbConnection = await MongoClient.connect(getDBURI(), {
useNewUrlParser: true,
useUnifiedTopology: true
});
let queryResultPromises = [];
let numPages = //calculate the number of pages;
//iterating through num of pages and iterate using an aggregation query
//using $limit and $skip in aggregates
for(let pageNum = 0; pageNum < numPages; pageNum++) {
let tempResultPromise = await dbConnection.db("<db-name>").collection("<collection-name>")
.aggregate(<aggregate-criteria>)
.toArray()
//collect the query promise in an array
queryResultPromises.push(tempResultPromise);
}
//collect all the query results in an array and wait for them to resolve using Promise.all
let queryResultsArray = await Promise.all(queryResultPromises);
//concatenating all the results in a single array
let queryResults = [];
queryResultsArray.forEach(resultArray => {
queryResults = queryResults.concat(resultArray);
});
await uploadDataToS3(queryResults);
}
/**
* Construct the DB URI based on the environment
* @returns {string}
*/
const getDBURI = () => {
//best practice is to fetch the password from AWS Parameter store
return "mongodb://<username>:<password>@<hostname>/<your-db-name>";
};
//converts each db record to ndjson => newline delimited json
let convertToNDJSON = (data) => {
let ndJSON = [];
data.forEach(el => ndJSON.push(JSON.stringify(el), "\n"));
return ndJSON;
};
//code to upload data to s3
let uploadDataToS3 = async (data) => {
let env = process.env;
let s3 = null;
//using minio for local s3 testing
if (env === 'local') {
s3 = new s3Client({
accessKeyId: 'minioadmin' ,
secretAccessKey: 'minioadmin' ,
endpoint: 'http://host.docker.internal:9000' ,
s3ForcePathStyle: true, // needed with minio?
signatureVersion: 'v4'
});
} else {
s3 = new s3Client();
}
let params = {Bucket: '<your-bucket-name>', Key: '<file-name>', Body: convertToNDJSON(data).join("")};
return await s3.putObject(params).promise();
};
view raw bruteForce.js hosted with ❤ by GitHub

Second Attempt

Based on our first attempt it was clear we had to handle our arrays carefully.
In the first attempt we first flattened the results array into a single array.
We then iterated over the flatenned array and transformed each db record into a string and then push it into another array and hence the memory was insufficient

The approach was:

  1. Do the array flatenning and transforming it to strings in a single array
  2. Write the data to a s3 file

Outcome:

Success !! we finally were able to write all the records to a s3 file
The issue was we used up all the 3008MB. So although it works for the current scenario, it is not future proof and we might run into memory issues again

Using up all the memory

Code:

const MongoClient = require('mongodb').MongoClient;
let s3Client = require('aws-sdk/clients/s3');
//brute force method loading all the data into an array
exports.copyData = async (event, context) => {
//this is required for node js mongodb connection pooling
context.callbackWaitsForEmptyEventLoop = false;
let dbConnection = await MongoClient.connect(getDBURI(), {
useNewUrlParser: true,
useUnifiedTopology: true
});
let queryResultPromises = [];
let numPages = //calculate the number of pages;
//iterating through num of pages and iterate using an aggregation query
//using $limit and $skip in aggregates
for(let pageNum = 0; pageNum < numPages; pageNum++) {
let tempResultPromise = await dbConnection.db("<db-name>").collection("<collection-name>")
.aggregate(<aggregate-criteria>)
.toArray()
//collect the query promise in an array
queryResultPromises.push(tempResultPromise);
}
//collect all the query results in an array and wait for them to resolve using Promise.all
let queryResultsArray = await Promise.all(queryResultPromises);
let data = [].concat.apply([], queryResultsArray);
await uploadDataToS3(data.map(convertToNDJSON).join(""));
}
/**
* Construct the DB URI based on the environment
* @returns {string}
*/
const getDBURI = () => {
//best practice is to fetch the password from AWS Parameter store
return "mongodb://<username>:<password>@<hostname>/<your-db-name>";
};
//converts each db record to ndjson => newline delimited json
let convertToNDJSON = (data) => {
let ndJSON = [];
data.forEach(el => ndJSON.push(JSON.stringify(el), "\n"));
return ndJSON;
};
//code to upload data to s3
let uploadDataToS3 = async (data) => {
let env = process.env;
let s3 = null;
//using minio for local s3 testing
if (env === 'local') {
s3 = new s3Client({
accessKeyId: 'minioadmin' ,
secretAccessKey: 'minioadmin' ,
endpoint: 'http://host.docker.internal:9000' ,
s3ForcePathStyle: true, // needed with minio?
signatureVersion: 'v4'
});
} else {
s3 = new s3Client();
}
//using multipart upload to speed up the process
let params = {Bucket: '<your-bucket-name>', Key: '<file-name>', Body: data};
let opts = {queueSize: 2, partSize: 1024 * 1024 * 10};
return await s3.upload(params, opts).promise();
};

Third Attempt

So although from the previous attempt we tasted success we need a more efficient way to handle these huge arrays of data.

Streams

A little google search and stackoverflow questions led me to streams in node.js
I will not delve deep into streams but rather quote resources that I referred to.
The main concept of streams is that when you have large amounts of data to work with, rather than loading it all in memory, just load smaller chunks of it and work with it.
On digging deeper we found that mongodb find and aggregate operations by default return streams.
We also found that s3 upload api accepted a readable stream and had the ability to do a multipart upload. This seemed like a perfect way to work.
Mongodb query results would be the data source and s3 file would be the sink.

The approach was:

  1. Stream the mongodb results
  2. Mongodb aggregate default cursor size streams 16MB worth of data
  3. Use s3 multipart upload api

Outcome:

  1. Even more success !!. We managed to reduce the memory consumption from 3008MB to 200 - 300MB. That was a huge win for us.
  2. The issue was that there was some code issue because of which the node script would not exit and the lambda would timeout after the max time of 900 seconds even though the actual execution was completed way before Due to the timeout issue the lambda retries 3 times and so the file is written 3 times, wasted executions

Reduced memory consumption

Code:

const MongoClient = require('mongodb').MongoClient;
let s3Client = require('aws-sdk/clients/s3');
const stream = require('stream');
const pipeline = stream.pipeline;
//brute force method loading all the data into an array
exports.copyData = (event, context, callback) => {
MongoClient.connect(getDBURI(), {
useNewUrlParser: true,
useUnifiedTopology: true
}).then((dbConnection) => {
pipeline(
dbConnection.db("<db-name>").collection("<collection-name>").aggregate(<aggregate-criteria>)
.stream({transform: x => convertToNDJSON(x)}),
uploadDataToS3(),
(err) => {
if (err) {
console.log('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
callback();
}
)
})
}
/**
* Construct the DB URI based on the environment
* @returns {string}
*/
const getDBURI = () => {
//best practice is to fetch the password from AWS Parameter store
return "mongodb://<username>:<password>@<hostname>/<your-db-name>";
};
//converts each db record to ndjson => newline delimited json
//converts each db record to ndjson => newline delimited json
let convertToNDJSON = (data) => {
return JSON.stringify(data) + "\n";
};
let uploadDataToS3 = () => {
let env = process.env;
let s3 = null;
let pass = new stream.PassThrough();
if (env === 'local') {
s3 = new s3Client({
accessKeyId: 'minioadmin' ,
secretAccessKey: 'minioadmin' ,
endpoint: 'http://host.docker.internal:9000' ,
s3ForcePathStyle: true, // needed with minio?
signatureVersion: 'v4'
});
} else {
s3 = new s3Client();
}
//using multipart upload to speed up the process
let params = {Bucket: '<your-bucket-name>', Key: '<file-name>', Body: data};
let opts = {queueSize: 2, partSize: 1024 * 1024 * 10};
s3.upload(params,opts, function(err, data) {
if (err) {
console.log(`Error uploading file ${file-name}`,err);
} else {
console.log(`Successfully uploaded file: ${file-name}, result: ${JSON.stringify(data)}`);
}
});
return pass;
};

Fourth Attempt

We had nailed down most of the approach and the question was how to exit the node.js function. We realized we did not call the callback function of the lambda handler once the upload was done. Once that was done we were able to complete the execution under 490 seconds and exit the function.

Lambda exits before timeout

Code:

const MongoClient = require('mongodb').MongoClient;
let s3Client = require('aws-sdk/clients/s3');
const stream = require('stream');
const pipeline = stream.pipeline;
//brute force method loading all the data into an array
exports.copyData = (event, context, callback) => {
MongoClient.connect(getDBURI(), {
useNewUrlParser: true,
useUnifiedTopology: true
}).then((dbConnection) => {
pipeline(
dbConnection.db("<db-name>").collection("<collection-name>").aggregate(<aggregate-criteria>)
.stream({transform: x => convertToNDJSON(x)}),
uploadDataToS3(callback),
(err) => {
if (err) {
console.log('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
}
)
})
}
/**
* Construct the DB URI based on the environment
* @returns {string}
*/
const getDBURI = () => {
//best practice is to fetch the password from AWS Parameter store
return "mongodb://<username>:<password>@<hostname>/<your-db-name>";
};
//converts each db record to ndjson => newline delimited json
let convertToNDJSON = (data) => {
return JSON.stringify(data) + "\n";
};
let uploadDataToS3 = (callback) => {
let env = process.env;
let s3 = null;
let pass = new stream.PassThrough();
if (env === 'local') {
s3 = new s3Client({
accessKeyId: 'minioadmin' ,
secretAccessKey: 'minioadmin' ,
endpoint: 'http://host.docker.internal:9000' ,
s3ForcePathStyle: true, // needed with minio?
signatureVersion: 'v4'
});
} else {
s3 = new s3Client();
}
//using multipart upload to speed up the process
let params = {Bucket: '<your-bucket-name>', Key: '<file-name>', Body: data};
let opts = {queueSize: 2, partSize: 1024 * 1024 * 10};
s3.upload(params,opts, function(err, data) {
if (err) {
console.log(`Error uploading file ${file-name}`,err);
} else {
console.log(`Successfully uploaded file: ${file-name}, result: ${JSON.stringify(data)}`);
}
callback();
});
return pass;
};

Postmark Image

Speedy emails, satisfied customers

Are delayed transactional emails costing you user satisfaction? Postmark delivers your emails almost instantly, keeping your customers happy and connected.

Sign up

Top comments (0)

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay