In this exercise, we will embark on a journey into the world of Node.js core concepts, focusing specifically on the power of streams. The challenge is to read and export one million SQL rows to a CSV file. So first things, first: what are streams?
Following Node.js documentation we can find:
A stream is an abstract interface for working with streaming data in Node.js.
But what does that mean? Think of streams as a way to handle a continuous flow of data in Node.js. Like a pipeline for data. But why do we need it? Well, sometimes our computer can't handle all the data at once because it's too big. Streams help by letting us work with small chunks of data at a time, kind of like how you watch a show on Netflix – it comes in small parts instead of the whole thing all at once.
In the upcoming example, we'll be working with three specific types of streams: Readable, Transform, and Writable. And based on a Postgres query, we are going to manipulate and save the results in a csv file.
You can find the example source code here:
Github Repo
Putting It All Together
Let's start by setting up the groundwork:
1. Adding the postgres dependencies
yarn add pg pg-query-stream
2. Creating a database pool
We set up a function to create a database connection pool using the pg
package. This connection pool will enable us to manage and handle database connections.
import pg from 'pg';
export function createDatabasePool() {
try {
const connectionString = `postgres://${USER}:${PASSWORD}@localhost:5432/postgres`;
const pool = new pg.Pool({ connectionString });
return pool;
} catch (error) {
console.error('Error creating database pool:', error);
throw error;
}
}
3. Configuring Streams
We create three types of streams to accomplish our task: a Readable stream to fetch data from the database, a Transform stream to process and format the data, and a Writable stream to save the processed data to a CSV file.
To create a readable stream, you need the package pg-query-stream
, which will receive result rows from pg as a readable (object) stream.
Readable Stream
The stream uses a cursor on the server so it keeps only a low number of rows in memory, the cursor size is defined by the variable batchSize
const queryStream = new QueryStream(
"SELECT * FROM generate_series(0, $1) num",
[1000000],
{ batchSize: 1000 }
);
Transform Stream
Because we receive an object, we need to transform the data before adding it to the file. I'm also adding new data to the chunk as show below
const transformStream = new Transform({
objectMode: true,
transform(row, encoding, callback) {
row.description = `Row ${row.num}`;
row.date = new Date().toString();
callback(null, `${row.num}, ${row.description}, ${row.date}` + "\n");
},
});
Writable Stream
In this case, we are writing data to a file so we can use the nodejs filestream
const fileWriteStream = fileStream.createWriteStream("output.csv");
Starting the Data Flow
With the streams configured, we define a function called startStream that initiates the data flow process. Inside this function, we establish a connection to the database using the connection pool and create a query stream from the provided SQL query.
const startStream = (transformStream, writeStream) => {
console.log("STARTED ", new Date());
pool.connect((err, client, done) => {
if (err) console.error(err);
const stream = client.query(queryStream);
stream
.pipe(transformStream)
.pipe(writeStream)
.on("error", console.error)
.on("finish", () => {
console.log("FINISHED: ", new Date());
done();
});
});
};
startStream(transformStream, fileWriteStream);
Explanation:
stream.pipe(transformStream)
: connects the query stream to the transform stream. This means that data retrieved from the database will be passed through the transformStream for processing.
transformStream.pipe(writeStream)
: connects the transform stream to the write stream. Processed data from the transform stream is then written to the specified file using the writeStream.
.on("error", console.error)
: attaches an error event listener to the pipeline. If an error occurs at any stage, it will be logged to the console.
.on("finish", () => {...})
: attaches a finish event listener to the pipeline. When the entire process of streaming, transforming, and writing is completed, this function will be executed.
Inside the finish event listener, a timestamp is logged using console.log("FINISHED: ", new Date())
, marking the completion of the data processing.
done()
is called to release the database client back to the pool, indicating that it's available for reuse.
Finally, the startStream function is invoked with transformStream and fileWriteStream as arguments, effectively starting the entire data processing and writing pipeline.
Visualizing the Process
For a visual representation of the process, take a look at the the terminal:
$ node streams.js
STARTED 2023-08-10T05:33:06.521Z
FINISHED: 2023-08-10T05:33:24.567Z
Done in 28.70s.
Also a new file with the name output.csv
will be created with 1 million transformed rows!
Conclusion
In this exercise, we've explored the power of Node.js streams and their ability to handle large amounts of data efficiently. We've learned how to use Readable, Transform, and Writable streams to read data from a PostgreSQL database, process it, and save it as a CSV file. By breaking down the data processing into smaller chunks, we can conserve memory and improve the overall performance of our application.
Feel free to explore the code, experiment with different settings, and adapt it to your own projects. Happy coding!
Top comments (7)
Awesome content, congrats! I recommend you explore some things: For the
transform
stream, maybe it will be better to use an async or sync generator. And for allpipe
operations, could be nice to use also the asyncpipeline
. I have something similar (almost) on this repo: github.com/LucasAndFlores/file-to-.... But as the Mandalorian says: This is the wayThank you for commenting! I'll definitely check your repo :)
Awesome content ❤️
Why not just use the COPY INTO and EXPORT native Postgres commands and do the CSV output in a couple seconds? It's 2 lines of SQL and extremely efficient.
Thank you for taking the time to comment on my post! You're absolutely right that using PostgreSQL's native commands like COPY INTO and EXPORT its well suited to exporting data to a CSV file.
However, the purpose of the post was to explore the capabilities of Node.js streams and give an example how streams work and can be used to handle large datasets.
The sql query here its just a a very simple example and I can give a few cases below where it cannot be achieved in a single query. Specially when your constraints are:
project requirements, dynamic data and available tools
Different data sources: sometimes you don't have all the information needed in the same database and the exported file needs to combine them for your customer
Transformations:
Specif Transformations that needs to be applied to every row or a dynamic field inside a jsonb column
Sometimes you just have complex data that changes along with your user configurations. User A can have access to something and User B to another etc
In the real world, speciallly startups, you might not have access to ETL tools to accomplish these cases, nor the skill and time to build them, refactoring and so on.
That's why it's not always 2 lines of SQL and the combination of 'pg' streams and nodejs streams can be very handy for that moment in time.
Hope this answers your comment 😊
Nice article! Will this utilise all available CPU cores?
No! Single thread.