DEV Community

Cover image for How to Process Epic Amounts of Data in NodeJS
Tommy May III
Tommy May III

Posted on

How to Process Epic Amounts of Data in NodeJS

Preface

If you are like me then you like to use NodeJS for a lot of different things like handling HTTP requests, making CLI tools, IoT, and so on. You have likely ran into situations where you needed to send off multiple HTTP requests at the same time, and if you haven't then don't worry because one day you will. Using javascript's asynchronous nature, most experienced developers will be able to send off a couple of HTTP requests at the same time. BUT what happens when you need to send millions of HTTP requests? This problem is likely to trip up even seasoned javascript developers because it touches a problem that most people don't have to deal with too often which is handling big data.

You may have guessed that if you try to asynchronously send 1 million HTTP requests then your program is going to crash and your guess would be correct. In fact your program would likely crash way before 1 million HTTP requests. Just because something is asynchronous doesn't mean it can handle an infinite amount of data. In the rest of this article I hope to show you how to handle data of any size in an efficient manner that will never cause you to run out of system resources. We will be using NodeJS Streams which is our secret sauce, so if you need a guide to streams then this is my favorite article. Unlike that article I don't plan on diving into how streams work except at a high level, instead my goal is to give you a practical example of handling big data using streams.

Straight To The Finished Code

If your in a hurry or don't care to read then here is the finished Github Repository of what we will be building.

dev.to article - How to Process Epic Amounts of Data in NodeJS




What We Will Build

  1. We will be reading a list of Github usernames from a file
  2. With each Github username we want to call the github API and get a list of their repositories. We will only be working with a smaller list of 12 usernames because I don't want a bunch of readers spamming Github's APIs and because the concept is the same for any amount of data.
  3. Write this data to our database, but to avoid setup complexity for this step we will just be writing the data to a file.
  4. Finally we will refactor to make things more performant.

We will be doing all of this using NodeJS Streams, which if done correctly have the concept of backpressuring which is what helps us use NodeJS resources in a way that won't run out of memory.

1. Reading the File Of Github Usernames

You can find the file in the example file in the repository

src/main.js

const fs = require('fs')
const path = require('path')
const CsvParser = require('csv-parse')

const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
const csvParser = new CsvParser({})

let githubUsernames = ''
readGithubUsernamesStream
  .pipe(csvParser)
  .on('data', (data) => githubUsernames += data)
  .on('end', () => console.log(githubUsernames))

// Outputs - itmayziii,dhershman1,HetaRZinzuvadia,joeswislocki,justinvoelkel,mandarm2593,mfrost503,wmontgomery,kentcdodds,gaearon,btholt,paulirish,ryanflorence

2. Get List of Repositories From Github

It was nice that NodeJS provided us with createReadStream to read our file as a stream, but now we will need our own stream to take that list of usernames, read it, and transform it into github results.

For this step we will be using axios to make HTTP requests

src/transform-username-to-github-repos.js

const axios = require('axios')
const stream = require('stream')

module.exports = class TransformUsernameToGithubRepos extends stream.Transform {
  constructor (options = {}) {
    super({ ...options, objectMode: true })
  }

  _transform (chunk, encoding, callback) {
    const username = chunk
    this.getGithubRepositoriesForUser(username)
      .then((response) => {
        let repositories = []
        if (response.data) {
          repositories = response.data.map((repository) => repository.name)
        }

        this.push(JSON.stringify({
          username,
          repositories
        }))
        callback()
      })
      .catch(callback)
  }

  getGithubRepositoriesForUser (username) {
    return axios.get(`https://api.github.com/users/${username}/repos`, {
      headers: {
        Authorization: `Token ${process.env.GITHUB_ACCESS_TOKEN}`
      }
    })
  }
}

and change our src/main.js

const fs = require('fs')
const path = require('path')
const CsvParser = require('csv-parse')
const TransformUsernameToGithubRepos = require('./transform-username-to-github-repos')

const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
const csvParser = new CsvParser({ columns: false })
const transformUsernameToGithubRepos = new TransformUsernameToGithubRepos()

let githubUserRepositories = []
readGithubUsernamesStream
  .pipe(csvParser)
  .pipe(transformUsernameToGithubRepos)
  .on('data', (data) => githubUserRepositories.push(data))
  .on('end', () => console.log(githubUserRepositories))

We changed a lot of things right there so let's unpack. We created a Transform stream that has a _transform method on it. When we pipe our CSV file to this transform stream this _transform method will be called. Once the _tranform method has been called with the username passed to it, we take the username and ask github for all the repositories for that user. We then send our results to the next thing in the stream with this.push(...). We don't have a next step in the steam pipeline yet so we start listening to the data event that where we collect the data and log in main.js.

3. Write Our User Repositories to a File

src/main.js

const fs = require('fs')
const path = require('path')
const CsvParser = require('csv-parse')
const TransformUsernameToGithubRepos = require('./transform-username-to-github-repos')

const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
const csvParser = new CsvParser({ columns: false })
const transformUsernameToGithubRepos = new TransformUsernameToGithubRepos()
const writeStream = fs.createWriteStream(path.resolve(__dirname, '../github-user-repositories.txt'))

let githubUserRepositories = []
readGithubUsernamesStream
  .pipe(csvParser)
  .pipe(transformUsernameToGithubRepos)
  .pipe(writeStream)
  .on('end', () => process.exit())

This was an easy step, we just created a write stream write things into a txt file.

4. Refactor

What we have works, but it is far from ideal. If you look at the code it is terribly inefficient.

  • It only does one HTTP request at a time, just because we can't do 1 million HTTP requests at the same time doesn't mean we can't do maybe one hundred. For this example we will limit to 5 per pipeline walkthrough for demonstration purposes.
  • Code also has poor error handling

Lets go ahead and fix these things, starting with multiple HTTP requests per pipeline walkthrough

src/transform-username-to-github-repos.js

const axios = require('axios')
const stream = require('stream')

module.exports = class TransformUsernameToGithubRepos extends stream.Transform {
  constructor (options = {}) {
    super({ ...options, objectMode: true })
    this.requests = []
  }

  _transform (chunk, encoding, callback) {
    const username = chunk[0]
    const githubRequest = this.getGithubRepositoriesForUser(username)
    this.requests.push(this.prepareGithubRequest(username, githubRequest))
    if (this.requests.length < 5) {
      return callback()
    }

    this.processRequests(callback)
  }

  _flush (callback) {
    this.processRequests(callback)
  }

  getGithubRepositoriesForUser (username) {
    return axios.get(`https://api.github.com/users/${username}/repos`, {
      headers: {
        Authorization: `Token ${process.env.GITHUB_ACCESS_TOKEN}`
      }
    })
  }

  prepareGithubRequest (username, githubRequest) {
    return githubRequest
      .then((response) => {
        let repositories = []
        if (response.data) {
          repositories = response.data.map((repository) => repository.name)
        }

        return {
          username,
          repositories
        }
      })
  }

  processRequests (callback) {
    return Promise.all(this.requests)
      .then((responses) => {
        this.requests = []

        this.push(responses.reduce((accumulator, currentValue) => {
          return accumulator + JSON.stringify(currentValue)
        }, ''))
        callback()
      })
      .catch(callback)
  }
}

Again we just did a lot so let's upack what happened. We changed our _tranform method to call the Github API and then shove the promise into an array, we then continue on if the total accumulated promises are less than 5. Basically we are calling Github 5 times before we ever tell the transform to push data through the stream which can be found in the method processRequests. We have successfully changed our pipeline to process 5 requests each time instead of 1 which is a huge performance gain.

We could imagine if we were processing 1 million records and instead of 5 as our number we used 100, so we would send 100 HTTP requests near the same time and wait for them all to resolve before sending 100 more. This is a really efficient / resource saving way of processing large amounts of data.

We are not done though, we still need to have better error handling, for this we will take advantage of the native NodeJS pipeline function.

pipeline - A module method to pipe between streams forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.

src/main.js

const fs = require('fs')
const path = require('path')
const CsvParser = require('csv-parse')
const TransformUsernameToGithubRepos = require('./transform-username-to-github-repos')
const stream = require('stream')

const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
const csvParser = new CsvParser({ columns: false })
const transformUsernameToGithubRepos = new TransformUsernameToGithubRepos()
const writeStream = fs.createWriteStream(path.resolve(__dirname, '../github-user-repositories.txt'))

stream.pipeline(
  readGithubUsernamesStream,
  csvParser,
  transformUsernameToGithubRepos,
  writeStream,
  (error) => {
    if (error) {
      console.error('error ', error)
      return process.exit(1)
    }

    process.exit()
  }
)

Conclusion

NodeJS streams allow us to effectively have a pipeline from which data starts at one point and flows through until the end. Using backpressuring, which comes from simply implementing NodeJS's already built streams, we efficiently use a computer's resources while processing very large sets of data. I know methods like this work because I've used it to process over 10 million records from a CSV, call an API to get additional data, and then store the results in a database, much like we did in this article. Streams are effective on their own, but if you really wanted to speed things up I would consider combining child processes with streams for maximum efficiency.

Cover photo Credit - Jonathan Kemper on unsplash

Top comments (1)

Collapse
 
anduser96 profile image
Andrei Gatej

Awesome stuff! Thank you for sharing!