DEV Community

Cover image for Uploading 10 Million records to backend in 10 seconds
YoBulkMaster
YoBulkMaster

Posted on

Uploading 10 Million records to backend in 10 seconds

https://github.com/yobulkdev/yobulkdev/tree/main/pages/api/upload

Uploading CSV with 1 Million rows to backend within 10 seconds

Inserting large records from a spreadsheet (CSV file) to a database is a very common and hard engineering problem.

Modern data stack ETL tools may solve it ..But

  • You are a javascript (NodeJS )person..:)
  • You don't want to invest money on an ETL tool.
  • You are not a data engineer who create and maintain data pipeline.

img

Node.js streams are a powerful and flexible mechanism for reading and writing data in a streaming fashion. Streams provide an efficient way to handle large volumes of data by processing it in small chunks, rather than loading it all into memory at once.

So we are going to solve this problem using Node Stream...

img

How NodeJS stream is going to help us!!

In Node.js, a stream is an abstract interface that represents a sequence of data. A stream can be thought of as a flow of data that is divided into chunks, and these chunks can be processed incrementally as they become available.

Streams can be used to read or write data from various sources, such as files, network sockets, or even in-memory data structures. Streams in Node.js are implemented using event emitters, which means that they can be used asynchronously and in a non-blocking way.

Current architecture used to solve this problem ?

Stream Flow

Step 1

A drop zone is created to provision a place for drag and drop of csv file.

const onDrop = useCallback((acceptedFiles) => {
    setFiles((prevFiles) => [...prevFiles, ...acceptedFiles]);
  }, []);

  const { getRootProps, getInputProps, isDragActive } = useDropzone({ onDrop });

return(
    <>
        <div
        className={`px-6 pt-5 pb-6 border-2 border-dashed rounded-lg ${
          isDragActive ? "border-green-400" : "border-gray-400"
        }`}
        {...getRootProps()}
      >
        <input {...getInputProps()} />
        <div className="flex flex-col items-center justify-center        text-center space-y-1">
          <PlusIcon className="w-8 h-8 text-gray-400" />
          <p className="text-sm font-medium text-gray-400">
            Drop files here or click to upload
          </p>
        </div>
      </div>
      {files.map((file) => (
        <p
          key={file.name}
          className="mt-2 text-sm font-medium text-gray-500 truncate"
        >
          {file.name} ({file.size} bytes)
        </p>
      ))}
      </>)

Enter fullscreen mode Exit fullscreen mode

Step 2

Create rest api for starting streaming.

 busboy.on(
            'file',
            async function (fieldname, file, filename, encoding, mimetype) {
              console.log('The file details are', filename, encoding, mimetype);
            }
          pipeline(stream1, stream2)

            )
Enter fullscreen mode Exit fullscreen mode

Here we have used Busboy library so as to directly stream the data instead of copying it to a location on server and then moving into MongoDB.

The pipeline function from node js stream will be used for giving a flow for the parsing, then transforming and inserting into Mongodb.

     pipeline(
                file,
                openCsvInputStream,
                headers_changes,
                dbClient.stream,
                (err) => {
                  if (err) {
                    console.log('Pipeline failed with an error:', err);
                  } else {
                    console.log('Pipeline ended successfully');
                  }
                }
              );
Enter fullscreen mode Exit fullscreen mode

Step 3

Here the papaparse stream is the first stage on the pipeline. Papaparse is a high speed parsing library for parsing huge csvs into json format. But, we need stream of the huge csv data which will be passed to the next stages of the transformation.

The following code creates a readable stream out of papaparse input stream.

const openCsvInputStream = (fileInputStream) => {
  const csvInputStream = new Readable({ objectMode: true });
  csvInputStream._read = () => {};
  Papa.parse(fileInputStream, {
    header: true,
    dynamicTyping: true,
    skipEmptyLines: true,
    step: (results) => {
      csvInputStream.push(results.data);
    },
    complete: () => {
      csvInputStream.push(null);
    },
    error: (err) => {
      csvInputStream.emit('error', err);
    },
  });
  return csvInputStream;
};

Enter fullscreen mode Exit fullscreen mode

Step 4

This is where the transformation streams come into picture.

    var headers_changes = new Transform({
          readableObjectMode: true,
          writableObjectMode: true,
        });

        headers_changes._transform = async function (data, enc, cb) {
          var newdata = await changeHeader({
            oldColumns,
           newColumns,
          });
          headers_changes.push(newdata);
          cb();
        };

Enter fullscreen mode Exit fullscreen mode

This header change transformation changes the old column header names into new header name, the catch is , this is a transformer stream. Once the data is transformed, it is ready for insertion.

Step 5

Here comes the last stage of insertion into Mongo DB. But we need to have a batch insert into MongoDB. This is called as bulk insert.

Here we have to create a batch of records and when the batch is full, insert into the mongodb. Empty the batch to enable it to give space for new records.

  async addToBatch(record) {
    try {
      this.batch.push(record);

      if (this.batch.length === this.config.batchSize) {
        await this.insertToMongo(this.batch);
      }
    } catch (error) {
      console.log(error);
    }
  }

Enter fullscreen mode Exit fullscreen mode
 const writable = new Writable({
      objectMode: true,
      write: async (record, encoding, next) => {
        try {
          if (this.dbConnection) {
            await this.addToBatch(record);
            next();
          } else {
            this.dbConnection = await this.connect();
            await this.addToBatch(record);
            next();
          }
        } catch (error) {
          console.log(error);
        }
      },
    });

Enter fullscreen mode Exit fullscreen mode

What did we achieve?

The stream is a powerful function of NodeJs. Even if you have a laptop with 8gb ram, you can use it to parse a big CSV and stream it to Mongodb with using a very minimal usage of CPU & RAM.But, can it take multiple requests in parallel?
Wait for our next write up!

Note:This testing is done on a M1 MAC with 8GB RAM.

Top comments (0)