DEV Community

Mario
Mario

Posted on • Originally published at mariokandut.com on

How to use streams to ETL data?

Streams are a built-in feature in Node.js and represent asynchronous flow of data. Streams are also a way to handle reading and/or writing files. A Node.js stream can help process large files, larger than the free memory of your computer, since it processes the data in small chunks.

Streams in Node.js

This is the fifth article of a series about streams in Node.js. This article is about how to perform ETL operations (Extract, Transform, Load) on CSV data using streams.

Streams in Node.js

Overview

When working with a flat data, we can just use the fs module and streams to process the data (memory-efficient). Instead of reading all the data into memory, we can read it in small chunks with the help of streams to avoid overconsumption of the memory.

In this article we are going to create sample data in a CSV file, extract this data, transform it and load the data.

A C omma- S eparated V alues file is a delimited text file that uses a comma to separate values. Read more here. We will transform the CSV data to JSON or better to ndjson, which is basically a file of JSON records separated by newlines and with the file extension .ndjson. For sure, you are asking yourself - why are we not just using JSON? The main reason is fault tolerance. If only one single invalid record is written to JSON, the entire JSON file will be corrupted. The main difference between JSON and ndjson is that in ndjson files each line of a file must contain a single JSON record. Hence, a ndjson file contains valid JSON, but ndjson is not a valid JSON document. The ndjson format works well with streaming data and large sets of data, where each record is processed individually.

We are going to:

    1. Create CSV sample data
    1. Initialize project for NPM
    1. Create a CSV parser
    1. Add Transform stream
    1. Run & Done

1. Create CSV data

Let's create some sample CSV data, you can use the sample data below, or create your own data with FakerJS and convert it to CSV.

id,firstName,lastName,email,email2,randomized
100,Jobi,Taam,Jobi.Taam@yopmail.com,Jobi.Taam@gmail.com,Z lsmDLjL
101,Dacia,Elephus,Dacia.Elephus@yopmail.com,Dacia.Elephus@gmail.com,Za jfPaJof
102,Arlina,Bibi,Arlina.Bibi@yopmail.com,Arlina.Bibi@gmail.com,zmzlfER
103,Lindie,Torray,Lindie.Torray@yopmail.com,Lindie.Torray@gmail.com,ibVggFEh
104,Modestia,Leonard,Modestia.Leonard@yopmail.com,Modestia.Leonard@gmail.com," Tit KCrdh"
105,Karlee,Cornelia,Karlee.Cornelia@yopmail.com,Karlee.Cornelia@gmail.com,PkQCUXzq
106,Netty,Travax,Netty.Travax@yopmail.com,Netty.Travax@gmail.com,psJKWDBrXm
107,Dede,Romelda,Dede.Romelda@yopmail.com,Dede.Romelda@gmail.com,heUrfT
108,Sissy,Crudden,Sissy.Crudden@yopmail.com,Sissy.Crudden@gmail.com,cDJxC
109,Sherrie,Sekofski,Sherrie.Sekofski@yopmail.com,Sherrie.Sekofski@gmail.com,dvYHUJ
110,Sarette,Maryanne,Sarette.Maryanne@yopmail.com,Sarette.Maryanne@gmail.com,rskGIJNF
111,Selia,Waite,Selia.Waite@yopmail.com,Selia.Waite@gmail.com,DOPBe
112,Karly,Tjon,Karly.Tjon@yopmail.com,Karly.Tjon@gmail.com,zzef nCMVL
113,Sherrie,Berriman,Sherrie.Berriman@yopmail.com,Sherrie.Berriman@gmail.com,rQqmjw
114,Nadine,Greenwald,Nadine.Greenwald@yopmail.com,Nadine.Greenwald@gmail.com,JZsmKafeIf
115,Antonietta,Gino,Antonietta.Gino@yopmail.com,Antonietta.Gino@gmail.com,IyuCBqwlj
116,June,Dorothy,June.Dorothy@yopmail.com,June.Dorothy@gmail.com,vyCTyOjt
117,Belva,Merriott,Belva.Merriott@yopmail.com,Belva.Merriott@gmail.com,MwwiGEjDfR
118,Robinia,Hollingsworth,Robinia.Hollingsworth@yopmail.com,Robinia.Hollingsworth@gmail.com,wCaIu
119,Dorthy,Pozzy,Dorthy.Pozzy@yopmail.com,Dorthy.Pozzy@gmail.com,fmWOUCIM
120,Barbi,Buffum,Barbi.Buffum@yopmail.com,Barbi.Buffum@gmail.com,VOZEKSqrZa
121,Priscilla,Hourigan,Priscilla.Hourigan@yopmail.com,Priscilla.Hourigan@gmail.com,XouVGeWwJ
122,Tarra,Hunfredo,Tarra.Hunfredo@yopmail.com,Tarra.Hunfredo@gmail.com,NVzIduxd
123,Madalyn,Westphal,Madalyn.Westphal@yopmail.com,Madalyn.Westphal@gmail.com,XIDAOx
124,Ruthe,McAdams,Ruthe.McAdams@yopmail.com,Ruthe.McAdams@gmail.com,iwVelLKZH
125,Maryellen,Brotherson,Maryellen.Brotherson@yopmail.com,Maryellen.Brotherson@gmail.com,nfoiVBjjqw
126,Shirlee,Mike,Shirlee.Mike@yopmail.com,Shirlee.Mike@gmail.com,MnTkBSFDfo
127,Orsola,Giule,Orsola.Giule@yopmail.com,Orsola.Giule@gmail.com,VPrfEYJi
128,Linzy,Bennie,Linzy.Bennie@yopmail.com,Linzy.Bennie@gmail.com,ZHctp
129,Vanessa,Cohdwell,Vanessa.Cohdwell@yopmail.com,Vanessa.Cohdwell@gmail.com,RvUcbJihHf
130,Jaclyn,Salvidor,Jaclyn.Salvidor@yopmail.com,Jaclyn.Salvidor@gmail.com,gbbIxz
131,Mildrid,Pettiford,Mildrid.Pettiford@yopmail.com,Mildrid.Pettiford@gmail.com,snyeV
132,Carol-Jean,Eliathas,Carol-Jean.Eliathas@yopmail.com,Carol-Jean.Eliathas@gmail.com,EAAjYHiij
133,Susette,Ogren,Susette.Ogren@yopmail.com,Susette.Ogren@gmail.com," BhYgr"
134,Farrah,Suanne,Farrah.Suanne@yopmail.com,Farrah.Suanne@gmail.com,hYZbZIc
135,Cissiee,Idelia,Cissiee.Idelia@yopmail.com,Cissiee.Idelia@gmail.com,PNuxbvjx
136,Alleen,Clara,Alleen.Clara@yopmail.com,Alleen.Clara@gmail.com,YkonJWtV
137,Merry,Letsou,Merry.Letsou@yopmail.com,Merry.Letsou@gmail.com,sLfCumcwco
138,Fanny,Clywd,Fanny.Clywd@yopmail.com,Fanny.Clywd@gmail.com,Go kx
139,Trixi,Pascia,Trixi.Pascia@yopmail.com,Trixi.Pascia@gmail.com,lipLcqRAHr
140,Sandie,Quinn,Sandie.Quinn@yopmail.com,Sandie.Quinn@gmail.com,KrGazhI
141,Dania,Wenda,Dania.Wenda@yopmail.com,Dania.Wenda@gmail.com,CXzs kDv
142,Kellen,Vivle,Kellen.Vivle@yopmail.com,Kellen.Vivle@gmail.com,RrKPYqq
143,Jany,Whittaker,Jany.Whittaker@yopmail.com,Jany.Whittaker@gmail.com,XAIufn
144,Lusa,Fillbert,Lusa.Fillbert@yopmail.com,Lusa.Fillbert@gmail.com,FBFQnPm
145,Farrah,Edee,Farrah.Edee@yopmail.com,Farrah.Edee@gmail.com,TrCwKb
146,Felice,Peonir,Felice.Peonir@yopmail.com,Felice.Peonir@gmail.com,YtVZywf
147,Starla,Juan,Starla.Juan@yopmail.com,Starla.Juan@gmail.com,aUTvjVNyw
148,Briney,Elvyn,Briney.Elvyn@yopmail.com,Briney.Elvyn@gmail.com,tCEvgeUbwF
149,Marcelline,Ricarda,Marcelline.Ricarda@yopmail.com,Marcelline.Ricarda@gmail.com,sDwIlLckbd
150,Mureil,Rubie,Mureil.Rubie@yopmail.com,Mureil.Rubie@gmail.com,HbcfbKd
151,Nollie,Dudley,Nollie.Dudley@yopmail.com,Nollie.Dudley@gmail.com,EzjjrNwVUm
152,Yolane,Melony,Yolane.Melony@yopmail.com,Yolane.Melony@gmail.com,wfqSgpgL
153,Brena,Reidar,Brena.Reidar@yopmail.com,Brena.Reidar@gmail.com,iTlvaS
154,Glenda,Sabella,Glenda.Sabella@yopmail.com,Glenda.Sabella@gmail.com,zzaWxeI
155,Paola,Virgin,Paola.Virgin@yopmail.com,Paola.Virgin@gmail.com,gJO hXTWZl
156,Aryn,Erich,Aryn.Erich@yopmail.com,Aryn.Erich@gmail.com,qUoLwH
157,Tiffie,Borrell,Tiffie.Borrell@yopmail.com,Tiffie.Borrell@gmail.com,cIYuVMHwF
158,Anestassia,Daniele,Anestassia.Daniele@yopmail.com,Anestassia.Daniele@gmail.com,JsDbQbc
159,Ira,Glovsky,Ira.Glovsky@yopmail.com,Ira.Glovsky@gmail.com,zKITnYXyhC
160,Sara-Ann,Dannye,Sara-Ann.Dannye@yopmail.com,Sara-Ann.Dannye@gmail.com,wPClmU
161,Modestia,Zina,Modestia.Zina@yopmail.com,Modestia.Zina@gmail.com,YRwcMqPK
162,Kelly,Poll,Kelly.Poll@yopmail.com,Kelly.Poll@gmail.com,zgklmO
163,Ernesta,Swanhildas,Ernesta.Swanhildas@yopmail.com,Ernesta.Swanhildas@gmail.com,tWafP
164,Giustina,Erminia,Giustina.Erminia@yopmail.com,Giustina.Erminia@gmail.com,XgOKKAps
165,Jerry,Kravits,Jerry.Kravits@yopmail.com,Jerry.Kravits@gmail.com,olzBzS
166,Magdalena,Khorma,Magdalena.Khorma@yopmail.com,Magdalena.Khorma@gmail.com,BBKPB
167,Lory,Pacorro,Lory.Pacorro@yopmail.com,Lory.Pacorro@gmail.com,YmWQB
168,Carilyn,Ethban,Carilyn.Ethban@yopmail.com,Carilyn.Ethban@gmail.com,KUXenrJh
169,Tierney,Swigart,Tierney.Swigart@yopmail.com,Tierney.Swigart@gmail.com,iQCQJ
170,Beverley,Stacy,Beverley.Stacy@yopmail.com,Beverley.Stacy@gmail.com,NMrS Zpa f
171,Ida,Dex,Ida.Dex@yopmail.com,Ida.Dex@gmail.com,hiIgOCxNg
172,Sam,Hieronymus,Sam.Hieronymus@yopmail.com,Sam.Hieronymus@gmail.com,dLSkVe
173,Lonnie,Colyer,Lonnie.Colyer@yopmail.com,Lonnie.Colyer@gmail.com,ZeDosRy
174,Rori,Ethban,Rori.Ethban@yopmail.com,Rori.Ethban@gmail.com,SXFZQmX
175,Lelah,Niles,Lelah.Niles@yopmail.com,Lelah.Niles@gmail.com,NwxvCXeszl
176,Kathi,Hepsibah,Kathi.Hepsibah@yopmail.com,Kathi.Hepsibah@gmail.com,SOcAOSn
177,Dominga,Cyrie,Dominga.Cyrie@yopmail.com,Dominga.Cyrie@gmail.com,IkjDyuqK
178,Pearline,Bakerman,Pearline.Bakerman@yopmail.com,Pearline.Bakerman@gmail.com,vHVCkQ
179,Selma,Gillan,Selma.Gillan@yopmail.com,Selma.Gillan@gmail.com,hSZgpBNsw
180,Bernardine,Muriel,Bernardine.Muriel@yopmail.com,Bernardine.Muriel@gmail.com,AnSDTDa U
181,Ermengarde,Hollingsworth,Ermengarde.Hollingsworth@yopmail.com,Ermengarde.Hollingsworth@gmail.com,IYQZ Nmv
182,Marguerite,Newell,Marguerite.Newell@yopmail.com,Marguerite.Newell@gmail.com,kSaD uaHH
183,Albertina,Nisbet,Albertina.Nisbet@yopmail.com,Albertina.Nisbet@gmail.com,Y jHyluB
184,Chere,Torray,Chere.Torray@yopmail.com,Chere.Torray@gmail.com,loElYdo
185,Vevay,O'Neill,Vevay.O'Neill@yopmail.com,Vevay.O'Neill@gmail.com,uLZSdatVn
186,Ann-Marie,Gladstone,Ann-Marie.Gladstone@yopmail.com,Ann-Marie.Gladstone@gmail.com,fwKlEksI
187,Donnie,Lymann,Donnie.Lymann@yopmail.com,Donnie.Lymann@gmail.com,deBrqXyyjf
188,Myriam,Posner,Myriam.Posner@yopmail.com,Myriam.Posner@gmail.com,gEMZo
189,Dale,Pitt,Dale.Pitt@yopmail.com,Dale.Pitt@gmail.com,OeMdG
190,Cindelyn,Thornburg,Cindelyn.Thornburg@yopmail.com,Cindelyn.Thornburg@gmail.com,kvhFmKGoMZ
191,Maisey,Hertzfeld,Maisey.Hertzfeld@yopmail.com,Maisey.Hertzfeld@gmail.com,OajjJ
192,Corina,Heisel,Corina.Heisel@yopmail.com,Corina.Heisel@gmail.com,luoDJeHo
193,Susette,Marcellus,Susette.Marcellus@yopmail.com,Susette.Marcellus@gmail.com,AXHtR AyV
194,Lanae,Sekofski,Lanae.Sekofski@yopmail.com,Lanae.Sekofski@gmail.com,FgToedU
195,Linet,Beebe,Linet.Beebe@yopmail.com,Linet.Beebe@gmail.com,DYGfRP
196,Emilia,Screens,Emilia.Screens@yopmail.com,Emilia.Screens@gmail.com,LXUcleSs
197,Tierney,Avi,Tierney.Avi@yopmail.com,Tierney.Avi@gmail.com,VegzbHH
198,Pollyanna,Thar,Pollyanna.Thar@yopmail.com,Pollyanna.Thar@gmail.com,GjYeEGK
199,Darci,Elephus,Darci.Elephus@yopmail.com,Darci.Elephus@gmail.com,DaQNdN
Enter fullscreen mode Exit fullscreen mode

Create a project folder:

mkdir node-streams-etl
Enter fullscreen mode Exit fullscreen mode

Create a csv file in the folder:

cd node-streams-etl
touch sample-data.csv
Enter fullscreen mode Exit fullscreen mode

Copy all sample data into the csv file and save it. Use copy+paste or fs.writeFile in the REPL or with the -p flag in the terminal.

2. Initialize project for NPM

We are going to use npm packages, hence, we have to initialize the project to get a package.json

npm init -y
Enter fullscreen mode Exit fullscreen mode

Let's add a main file for the code.

touch index.js
Enter fullscreen mode Exit fullscreen mode

First, we are going to create a readable stream to read the CSV data from sample-date.csv, and a writable stream, which will be the destination. For now, we just copy the sample data. To connect readStream and writeStream we are going to use the pipeline method. Error handling is much easier than with the pipe method. Check out the article How to Connect streams with the pipeline method.

const fs = require('fs');
const { pipeline } = require('stream');

const inputStream = fs.createReadStream('data/sample-data.csv');
const outputStream = fs.createWriteStream('data/sample-data.ndjson');

pipeline(inputStream, outputStream, err => {
  if (err) {
    console.log('Pipeline encountered an error.', err);
  } else {
    console.log('Pipeline completed successfully.');
  }
});
Enter fullscreen mode Exit fullscreen mode

3. Create a CSV parser

We have to convert the CSV file to JSON, as so often, for every problem, there is a package. In that use-case, there is csvtojson. This module will parse the header row to get key and then parse each row to create a JSON object.

Let's install it.

npm install csvtojson
Enter fullscreen mode Exit fullscreen mode

After the successful installation we can require the module and add it to the pipeline after the inputStream. The data will flow from CSV file to CSV Parser then into Output file.

We are going to use the pipeline method, since it's the preferred way since Node.js v.10 to connect streams and pipe data between them. It also helps to clean up streams on completion or failure, because when an error occurs the streams involved will be destroyed to avoid memory leaks.

const fs = require('fs');
const { pipeline } = require('stream');
const csv = require('csvtojson');

const inputStream = fs.createReadStream('data/sample-data.csv');
const outputStream = fs.createWriteStream('data/sample-data.ndjson');

const csvParser = csv();

pipeline(inputStream, csvParser, outputStream, err => {
  if (err) {
    console.log('Pipeline encountered an error.', err);
  } else {
    console.log('Pipeline completed successfully.');
  }
});
Enter fullscreen mode Exit fullscreen mode

4. Add Transform stream

The data is now emitted to the outputStream as ndjson with each data row a valid JSON. Now, we want to transform the data. Since we are using csvtojson, we could utilize the built-in subscribe method, which could be used to handle each record after it has been parsed. Though, we want to create a transform stream. Our sample data has the keys id, firstName, lastName, email, email2, randomized. We want to get rid of the randomized property in each entry and rename email2 to emailBusiness.

Transform streams must implement a transform method that receives chunk of data as the first argument. It will also receive the encoding type of the data chunk, and a callback function.

const transformStream = new Transform({
  transform(chunk, encoding, cb) {
    try {
      // clone person object
      let person = Object.assign({}, JSON.parse(chunk));
      // remove randomized property and rename email2 to emailBusiness
      person = {
        id: person.id,
        firstName: person.firstName,
        lastName: person.lastName,
        emailBusiness: person.email2,
      };
      cb(null, JSON.stringify(person) + `\n`);
    } catch (err) {
      cb(err);
    }
  },
});
Enter fullscreen mode Exit fullscreen mode

Now let's add the transformStream to the pipeline.

pipeline(
  inputStream,
  csvParser,
  transformStream,
  outputStream,
  err => {
    if (err) {
      console.log('Pipeline encountered an error.', err);
    } else {
      console.log('Pipeline completed successfully.');
    }
  },
);
Enter fullscreen mode Exit fullscreen mode

5. Run & Done

Run the application with node index.js and the data in the ndjson file should look like this.

{"id":"100","firstName":"Jobi","lastName":"Taam","emailBusiness":"Jobi.Taam@gmail.com"}
{"id":"101","firstName":"Dacia","lastName":"Elephus","emailBusiness":"Dacia.Elephus@gmail.com"}
{"id":"102","firstName":"Arlina","lastName":"Bibi","emailBusiness":"Arlina.Bibi@gmail.com"}
Enter fullscreen mode Exit fullscreen mode

Error handling always has to be done, when working with streams. Since we already did the error handling for all streams, because we are using the pipeline method, the sample project is done.

Congratulations. 🚀✨

TL;DR

  • The Newline-delimited JSON (ndjson) format works well with streaming data and large sets of data, where each record is processed individually, and it helps to reduce errors.
  • Using pipeline simplifies error handling and stream cleanup, and it makes combining streams more readable and maintainable.

Thanks for reading and if you have any questions , use the comment function or send me a message @mariokandut.

If you want to know more about Node, have a look at these Node Tutorials.

References (and Big thanks):

HeyNode,Node.js - Streams,MDN - Streams,Format and MIME Type,ndjson,csvtojson

Top comments (0)