DEV Community

Zach
Zach

Posted on

Using Node streams to make a .csv cleaner

Over the past week, I worked on a project that involved importing very large .csv files to add to a database. Some of my files were bigger than 2gb, which is very difficult to read all at once with most computers' memory restrictions. So, I thought this would be a great opportunity to work with node's notorious streams! Through research I found that even redux creator Dan Abramov was scared of nodestreams. Cue panic:

https://media.giphy.com/media/HUkOv6BNWc1HO/giphy.gif

The project however turned out to be a lot of fun!

So what is a nodestream anyways?

These are powerful sets of data that are consumed in the form of a continuous stream, which gives us much more flexibility in regards to memory usage. That 2gb .csv file that would take 30+ minutes to read all at once- or that would simply break your computer's memory limitations? Now, it takes just a minute or two and my computer is way less stressed doing it. I just open up a writeStream on the file. Then I can transform the data with a transform stream, and write it back to a new file with a writeStream. Let's look at an example.

First, I need to import node's fs module, as well as a CSV parser, and a tool to convert my parsed CSV files back into CSV strings. I need specifically to use the csv-writer's stringifier, so that it can parse our data as it comes in through the stream, since we will not have all the data available to us at once. The CSV parser will parse our CSV files into objects, with the headers as keys.

Links (read the docs!): https://www.npmjs.com/package/csv, https://www.npmjs.com/package/csv-writer

Modules:

const csv = require("csv-parser");
const createCsvStringifier = require("csv-writer").createObjectCsvStringifier;
const fs = require('fs');
const Transform = require("stream").Transform;

Next, I need to set my headers for the new CSV file. This maps the keys from an object (id) to the column headers of your new CSV file.

const csvStringifier = createCsvStringifier({
  header: [
    { id: "id", title: "id" },
    { id: "name", title: "name" },
    { id: "slogan", title: "slogan" },
    { id: "description", title: "description" },
    { id: "category", title: "category" },
    { id: "default_price", title: "default_price" },
  ],
});

Now, I need to define my read and write streams. We call the streams on the filepath we want to read from and write to.

let readStream = fs.createReadStream("./data/products.csv");
let writeStream = fs.createWriteStream("./data/cleanproducts.csv");

On to the transformer!

https://media.giphy.com/media/CRwUOHpwa9Lhe/giphy.gif

This will take data from our readStream, modify it, and pass it to the writeStream. We are going to create a new subclass of the Transform class constructor we imported earlier. This class can take some options to modify the stream, so we will add those options as well. We'll use these options later.

class CSVCleaner extends Transform {
  constructor(options) {
    super(options);
  }
}

Now let's add our transform method! My CSV files had a few big issues, including problematic whitespace, and a mix of strings and numbers in a field that should have had only numbers. Let's fix all that as we get our data. Transform takes in a chunk of data from our readStream, encoding (which we don't need here) and a callback. The callback will allow our function to move on to the next chunk, so we are going to name it 'next' for semantic purposes.

_transform(chunk, encoding, next) {
    for (let key in chunk) {
      //trims whitespace
      let trimKey = key.trim();
      chunk[trimKey] = chunk[key];
      if (key !== trimKey) {
        delete chunk[key];
      }
    }
    //filters out all non-number characters
    let onlyNumbers = chunk.default_price.replace(/\D/g, "");
    chunk.default_price = onlyNumbers;
    //uses our csvStringifier to turn our chunk into a csv string
    chunk = csvStringifier.stringifyRecords([chunk]);
    this.push(chunk);
    next();
  }

Now we are looking pretty good! We have imported all necessary modules, established paths for our streams and set up a transformer to clean our data.

To begin the process of making our new file of cleaned data, we first need to write our header and instantiate our transformer. Since this will only be one line, we don't want this in our transform method. We can access the header using the getHeaderString method from the csvStringifier we declared when we set our headers before. This can be easily accomplished with our writeStream. Since our writeStream will write in a continuous order, we can use it to write our first header item. Streams normally only operate on string or buffer datatypes, which is a problem because our CSV parser will be handing objects to our transformer. But remember that we let our CSVCleaner take in some options? We can tell our stream that we will be passing it an object instead by instantiating our transformer with an object that has a key called writeableObjectMode set to true. Next, we open up our readStream, pipe it through the CSV parser, pipe it through our transformer to our writeStream and then we can listen for the finish event on our stream to let us know when we're done.

const transformer = new CSVCleaner({ writableObjectMode: true });

//write header
writeStream.write(csvStringifier.getHeaderString());

readStream
  .pipe(csv())
  .pipe(transformer)
  .pipe(writeStream)
  .on("finish", () => {
    console.log("finished");
  });

And that's it! Now I have nice clean data I can easily insert into my DB.

Top comments (0)