DEV Community

theBridge2
theBridge2

Posted on

NodeJS Custom Streams.

Node JS streams are a great tool, simple and intuitive to use. *

...

If you have had an experience anything like mine, you noticed the asterisk in my previous statement and can probably begin to guess what that asterisk means - *Until you move past the very simple examples.

To really experience the power of the NodeJS stream, there are a lot of details we need to get through.

My aim here is to help explain the complexities of a complex and custom streams.

What is a NodeJS Stream?

A stream is an abstract interface for working with streaming data in Node.js.
https://nodejs.org/api/stream.html

This sounds very simple. In a sense it is, but with the power and flexibility of streams, it quickly becomes complicated. This is the simple version of this diagram.

Image description

The true power comes from the fact that data can be large and the flow rate variable if needed. Streaming handles large data and has a concept called "backpressure" which is an interaction between the available memory in the relevant buffers. In the diagram shown above we have a readable stream connected to a writable stream and both of these have a high water mark. Basically when that high water mark is reached, the downstream element communicates upstream and says "wait a minute, I'm full!".

Implementation can look as simple and straightforward as this:

stream1.pipe(stream2).pipe(stream3)
Enter fullscreen mode Exit fullscreen mode

In this example data is flowing from stream 1 through to stream 2 and 3. This is the cleanest and most intuitive code example.

How do NodeJS streams work under the hood?

As I mention in my Why NodeJS? Demystifying threads, synchronicity, and callbacks article NodeJS is a single threaded javascript environment that utilizes helper threads. Libuv is a helper threads that provides file system handling, pipes, streaming and a number of other things. Bottom line: NodeJS uses the libuv C library to implement streaming.

Types of Streams

  • Writable- data can be written (fs.createWriteStream() )
  • Readable - from which data can be read (fs.creadReadStream() )
  • Duplex - Streams both readable and writable - ex: net.socket
  • Transform - Duplex streams that can modify the data as it is written and read

All that sounds straightforward enough, but I sometimes find myself getting confused even with writable and readable.

Basic Example

var inFile = './test.txt';
var outFile = './testOut.txt';
var fs = require('fs');
var readStream = fs.createReadStream(inFile);
var writeStream = fs.createWriteStream(outFile);

readStream.pipe(writeStream);
Enter fullscreen mode Exit fullscreen mode

Ok, so that works fine. Just did a simple file with contents:

this is a test!
Enter fullscreen mode Exit fullscreen mode

What about a larger file? I took a simple 113 MB SQL backup and it ran fine.

What about a 1.06 GB mp4 file? That seemed to run fine as well.

Honestly though, aside from confirming that very basic file read and write work, this really isn't very useful. No one would write a node stream just to copy and paste a file on the same computer when their OS has tools much easier to use than this.

Before we look at a more interesting example, let's first improve this by adding a way to compare the contents and confirm they are correct. I just compared the file size and they matched but that isn't a very thorough approach.

The easiest way to verify file contents is with NodeJS's built in crypto library using the hash object. Note this hash object is a duplex stream since it is readable and writable.

Verifying File contents

const crypto = require('crypto');
var fs = require('fs');
const hash = crypto.createHash('sha1');
hash.setEncoding('hex');

const input = fs.createReadStream('./test.txt');
input.pipe(hash);

input.on('close',function(){
    console.log(hash.read())
})
//output: 3aa4cb08d481cfe2b08e4a5e31777f642263d58d
Enter fullscreen mode Exit fullscreen mode

After the familiar create read stream line you see we use the pipe method again. This is connecting the readable stream of reading the file to the duplex stream of hashing. The duplex stream gets each chunk of the file written to it and is ultimately read when the stream is completed and we call hash.read().

Let's ignore my input.on for now. We will cover this below when we get into transform streams.

If we run this for both the file we read and the one we wrote you will find that they match.

A few definitions:
Cryptology - study of codes, or the art of creating and solving them. The NodeJS crypto module does this mostly for the purpose of protecting data.
Hashing - is applying an algorithm to a variable sized data set to a fixed length unique output value.

This is why hashing is useful for file comparison. Running all the contents of a file through a hash provides us a unique value that is deterministic, or, is the same value every time.

Hashing is used in cryptology where it is much easier to apply an algorithm to create the hash one way and much harder to reverse it unless you know the key it was hashed with.

Other notes on hashing:
-Sha1 is weak encryption algorithm but totally fine for our purposes since it is less resource intensive.
-Other algorithms include 'md4', 'md5', 'sha512', 'sha256','sha384', 'sha512_224', 'sha512_256','ripemd160',

Enough on cryptology, we are supposed to be talking about NodeJS streams!

More Interesting Example

Ok, so lets do a more interesting example. Lets say you have a client and a server that want to send data to one another.

Image description

Looking back at the types of streams, we know we will have to use a readable stream and a writable stream for file handling on each side to read/write.

We will want to introduce another duplex stream the net.socket object.

Client

var fs = require('fs');
var socket = require('net').Socket();
socket.connect(8082);


const input = fs.createReadStream('./test.txt');

input.pipe(socket)


input.on('close',function(){    
    console.log('end');    
    socket.end();
})
Enter fullscreen mode Exit fullscreen mode

As you can see with the client setup, the socket setup is pretty straightforward. 8082 is the port number, and everything else is mostly from what we wrote before. The input.pipe(s) line is taking the output of the readable stream reading the file and sending it on to the socket object which sends it on to something on the other end at port 8082.

Server

var outFile = 'testOut.txt';
var fs = require('fs');
const net = require('net');

var writeStream = fs.createWriteStream(outFile);
net.createServer(function (socket) {
  console.log("connected");

  socket.pipe(writeStream);
  socket.on('data',function(data){
    console.log(data)
  })
  socket.on('close', function () {
      console.log("done!");
  });
})
.listen(8082);
Enter fullscreen mode Exit fullscreen mode

Here the net.createServer function enters in and prints "connected" when the client.js code is called and connects.

Then the socket.pipe(writeStream) line is sending all data received from the socket to the write stream. In this case our write stream is writing to a file.

Taking a look at our file system, the write completes successfully!

Last thing we need to add is hashing to make sure it truly transferred successfully.

Unfortunately though, it isn't very clean to do hashing in this example without a transform stream, so let's move in that direction. To introduce a transform stream we need to properly introduce custom streams.

Custom Streams

Custom streams are streams where we start to get underneath the hood of a stream and use its properties, call its methods, and catch its events.

There are a lot of details here. To best follow this I recommend you look at the official documentation. I did my best to pull out the interesting details.

NodeJS Stream properties

Property Description
readableFlowing if true, will stream until end of data. If false, have to manually call readable.read to keep stream moving.
objectMode if set, stream is capable of passing objects. Otherwise data is passed as strings or buffers.

NodeJS Stream Methods

These are the methods you can call on a nodeJS stream.

Method Usage
_read data from readable source comes in here before going out
_write data from writable source comes in here before leaving
_transform data comes in to be manipulated
push data exits stream from _write and _transform methods from a push
callback used to exit custom stream methods. name is user defined.
pause readable stream paused.
resume readable stream resumed.
pipe connect readable stream to writable. this starts the stream
unpipe removes readable stream from writable.

NodeJS Stream Events
These are the events you can catch from a nodeJS stream.

myStream.on(event)

Event Description readable? writable?
'close' indicates no more events will be emitted, no more computations X X
'error' happens when error occurs while writing or piping data should not be any more events X X
'data' stream is relinquishing chunk of data to a consumer x
'end' no more data to be consumed from the stream X
'pause' when stream.pause() is called if stream is in flowing state X
'resume' when readable stream is resumed and readableFlowing is false. X
'readable' if readable stream has data available. stream.read() will return the data. X
'drain' if writable stream fails to write, the 'drain' event tells you the writable stream is ready to begin writing again. this is how backpressure is implemented. X
'unpipe' when a readable stream gets unpiped, this 'unpipe' event will trigger in the writable stream X
'finish' emitted from writable stream after stream.end is emitted from readable stream. this means all chunks have been processed by writable stream. X

The below diagram attempts to show the relationship between readable streams and writable streams with some of the methods you can call on them and some of the events you can catch.

Image description

Custom Stream basic example

For this example, lets go back to just a single file to make sure we have the concepts. This is our first implementation here of a transform stream.

Basic Transform Stream Code

var fs = require('fs');
var readStream = fs.createReadStream('./test.txt');
var writeStream = fs.createWriteStream('./testOut.txt');
var stream = require('stream');

class CustomTransform extends stream.Transform{
    constructor(){
        super();
    }
    _transform(chunk,encoding,callback){
        var obj = this;        
        obj.push(chunk);
        callback();
    }
}

transformStream = new CustomTransform();
readStream.pipe(transformStream).pipe(writeStream);
Enter fullscreen mode Exit fullscreen mode

Now this is starting to look like a more realistic setup with 3 streams involved and actually using some of the stream methods. You will however note, that our transform method is not doing anything and the end result is that we just send a file from one place to another. If we add a basic function to the mix and call it from the rest of the transform stream, we will start to actually manipulate the data.

Example data transform function

testStr = 'this is a test -'
myTransform(chunk){
   var tmp = chunk.toString();
   tmp = tmp + testStr;
   return tmp;
}
Enter fullscreen mode Exit fullscreen mode

Note I'm converting to string here, since the data in chunk is a buffer object. With my small test file of:

1
2
3
4
5
Enter fullscreen mode Exit fullscreen mode

The ultimate result of using this transform script is that 'this is a test - ' will be added to the end of the file since the file is small enough that it will be written in only one chunk. Here is the code for that example.

Transform stream code with actual data transformation

var fs = require('fs');
var readStream = fs.createReadStream('./test.txt');
var writeStream = fs.createWriteStream('./testOut.txt');
var stream = require('stream');

var testStr = "this is a test -";

class CustomTransform extends stream.Transform{
    constructor(){
        super();
    }
    _transform(chunk,encoding,callback){
        var obj = this;        

        var transformedData = obj.myTransform(chunk);
        obj.push(transformedData);
        callback();
    }
    myTransform(chunk){
        var tmp = chunk.toString();
        tmp = tmp + testStr;
        return tmp;
    }
}

transformStream = new CustomTransform();
readStream.pipe(transformStream).pipe(writeStream);
Enter fullscreen mode Exit fullscreen mode

Ok, now lets put it all together and use a custom transform stream in with our client/server example and calculate the hash to confirm it worked!

Custom Stream Client/Server/Hashing w/ Transform example

Basically we want to add the transform stream in line on both the client and server so we can check the hash value.

Data flow for complex example
Image description

First step is to write a custom transform class that checks hashing and can be imported on both sides so we don't have to write it twice.

Custom Transform Class shared by client and server

const stream = require('stream');
const crypto = require('crypto');

class CustomTransform extends stream.Transform{
    constructor(){
        super();
        this.hash = crypto.createHash('sha1');
        this.hash.setEncoding('hex');
    }
    _transform(chunk,encoding,callback){
        var obj = this;        
        obj.hash.update(chunk); //this is synchronous.        
        obj.push(chunk);
        callback();
    }        
}

module.exports = CustomTransform;
Enter fullscreen mode Exit fullscreen mode

As you can see, all we did was merge together our transform example above and the crytpo hash example. Now lets integrate it into the client and server code.

Client Code

// client


//setup read file
var fs = require('fs');
const input = fs.createReadStream('./streamClientServer/test.txt');



//setup transform stream
var CustomTransform = require('./myCustomTransformStream.js');
transformStream = new CustomTransform();

transformStream.on('finish',function(){
    transformStream.hash.end();
    console.log("hash value is: " + transformStream.hash.read());
})


//Setup connection to server
var fs = require('fs');
var socket = require('net').Socket();
socket.connect(8082);


//Connect everything together!
input.pipe(transformStream).pipe(socket)


input.on('close',function(){    
    console.log('end');    
    socket.end();
})

Enter fullscreen mode Exit fullscreen mode

The main new thing added here is using the transform stream 'finish' event so that when the readable stream (socket in this case) triggers the end of the readable data, we can trigger the hashing to complete and print.

Server Code

var outFile = 'testOut.txt';
var fs = require('fs');
const net = require('net');


//setup transform stream
var CustomTransform = require('./myCustomTransformStream.js');
transformStream = new CustomTransform();

transformStream.on('finish',function(){
    transformStream.hash.end();
    console.log("hash value is: " + transformStream.hash.read());
})


var writeStream = fs.createWriteStream(outFile);
net.createServer(function (socket) {
  console.log("connected");

  socket.pipe(transformStream).pipe(writeStream);
  socket.on('data',function(data){
    console.log(data)
  })
  socket.on('close', function () {
      console.log("done!");      
  });
})
.listen(8082);

Enter fullscreen mode Exit fullscreen mode

Now lets run it!

Image description

Success!

I will leave this for another time, but what would be even more useful is to actually send the hash to the server so we don't have to manually check file transfer success. This gets significantly harder because we have to differentiate between file data being sent and other metadata.

Next Steps

Move on to my Part 2 (to be posted soon) where we will go through two more advanced examples covering object mode and a basic packet structure.

Top comments (0)