DEV Community

Gian felipe theodorowicz
Gian felipe theodorowicz

Posted on

Data processing on-demand with Node.js streams

For sure you already heard some phrases like, "node.js don't handle to process data" but, is it real?

A topic that ghost the life of seniors programmers, and not even is talk by juniors, Node.js streams.

What is

Node.js streams is data clusters, as arrays, but, streams don't is fully allocated in memory, what I mean is that they are allocated in pieces, this pieces is called chunks, basically is like you take a data, and break up it:

// static, alocated in memory
let nodejs = "node.js isn't single thread"

// stream pseudo-code
stream = ["nodejs", "isn't", "single", "thread"]
Enter fullscreen mode Exit fullscreen mode

Notice that by this pseudocode the phrase is broking by fragments, they are called chunks, they are utils when we want to work on-demand data, as an example to who uses that type of on-demand data, is the chatgpt, which as the bot write a letter, is displayed to the user.

Transforming in code

Let's simule that we have a big reservoir of water purify, which extract water, and purify, to serve it to drink

The first step to transform it in to code to justly have a watter source


// this is a generator function, I will explain this in next topic
function * waterGenerator() {
  // 1e5 is 100000
  for(let i = 0; i < 1e5; i++){
    // here I create matter like god
    const matter = {
      // matter have water, so let's create it
      water: {
        // we need a way to represent the quantity in numbers, so let's use 'ml' (milliliters), that's physics baby
        mlQuantity: Math.random()
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Now that we have where take water, we have to find a way to extract it.

But I might have to explain the usability of generators function by the context of this example

Generators function

that's isn't my words, it's from mdn docs:

Generators are functions that can be exited and later re-entered. Their context (variable bindings) will be saved across re-entrances.

Generators in JavaScript — especially when combined with Promises — are a very powerful tool for asynchronous programming as they mitigate — if not entirely eliminate -- the problems with callbacks, such as Callback Hell and Inversion of Control. However, an even simpler solution to these problems can be achieved with async functions.

Transforming in code

proceeding...

To extract our water from our water fountain, we will to do this:

// I will use the new node's import so the module 'stream' will be 'node:stream', the prefix is use to explain that this is a native module from node
import { Readable } from 'node:stream'

// this 'new' word is opcional in this context, but if you choose to don't use, you won't have the IntelliSense help.

// it's important to say that readale streams is responsible to break our data to fragments (chunks).
const extract = new Readable({
  // the class 'Readable' from the stream module receive in contructor and config object.
  // inside the config object, I am passing a read module, this is responsible to the dirty work

  read() {
    // here we will consume
    for(const data of waterGenerator()){
      // an inportant topic that we have to know before start writing code is that streams only accepts buffers and strings as yours data so if we want to use objects as data...

      // we have to use JSON.stringify()
      const stringified = JSON.stringify(data)


      // okay, the way to pass the data throught the 'pipe' <i will explain pipe later> in readable streams is using the *this.push()*
      this.push(stringified)
    }

    // here we push null to represents that our data ended
    this.push(null)
  }
})
Enter fullscreen mode Exit fullscreen mode

Now that we completed the extraction part of our water purifier, we have to clean this water, right?

When we want to transform our data, in the nodejs stream context, we have the class Transform that's responsible to trasnform the data and pass it throught the data pipeline

import { Transform } from 'node:stream'

const betterWater = new Transform({
  // notice that every class that have configs in their contructors at stream module receive a method which name is exact the same or close to the class name


  // the transform method receive 3 params that is:
  /**
   * @param('chunk') => represents the chunk data
   * @param('encoding') => represents the encoding format in the chunk
   * @param('cb') => represents the way that you return the data
  */

  // we won't use the encoding so I am passing it with a _ as prefix
  transform(chunk, _ec, cb) {

    // remember that when we implement the readable stream we pass the data as string? now we have to parse it to work with objects
    const data = JSON.parse(chumk)

    // let's think that water when is purified, loss quantity so:
    data.water.mlQuantity = data.water.mlQuantity - 0.1

    // remember, we have to parse the data to string to pass throught
    const stringfied = JSON.stringify(data)

    // here is the callback, the responsible to pass the data throught the pipe 
    // pipe receive an error as the first param, and the data as the second param
    // as I ain't handling any error, i will pass null as error, to represents that don't have any error
    cb(null, stringfied)
  }
})
Enter fullscreen mode Exit fullscreen mode

Okay so now we have all the pipe... wait... where is the pipe?

// to create a pipeline, we extract the pipeline function from the stream module
import { pipeline } from 'node:stream'
// here we are extracting the promisify from util module to transform the pipeline in async pipeline
import { promisify } from 'node:util'

// transforming the pipeline function in to an async pipeline function
const asyncPipeLine = promisify(pipeline)


// notice that I am using await as top level, you don't need it as well but I like it,
await asyncPipeLine(
  // the first argument is the readableStream
  extract,
  // the middle arguments is the transforms, it can be 1, 2, 3 transforms, all you want.
  betterWater,
  // the last argument have to be a writable stream, in this case i will use the 'process.stdout', yes, console.log() is a writable stream and can be print as demand
  process.stdout
)
Enter fullscreen mode Exit fullscreen mode

Conclusion

Wow, that's too much information, right? No.

Node.js streams it's a dense content but it's kind easy, read again as much you need, you will notice that I ain't pass tech specification about node.js streams, this article was wrote to who is diving in to node.js streams but still don't got the concepts in pratical therms.

the github source code:

github

Top comments (1)

Collapse
 
giancarlozucoloto profile image
giancarlozucoloto

Top!