Here we go again! I’m doing another Node module deep-dive on the ol’ bloggity blog today. I figured I would dive into the WriteableStreams
object. WriteableStreams
expose methods that allow you to write to a stream. They expose multiple events like close
, drain
, and pipe
and several functions like cork
, end
, and write
. Before I dive into the WriteableStream
object, it helps to provide a quick primer on the concept of streams.
Alright! Now that we’ve got that foundation set up, it’s time to dive into the code. I’ll be doing a code walkthrough of this version of the WritableStream
. As I skimmed through the file, I was glad to find out that there were quite a few comments sprinkled throughout the code base to clarify what different parts of the library did. These explanatory comments made reading through the codebase much easier to parse through the code. The first thing that I did was examine the properties that were defined on the WriteableState object. The comments in the code base do a pretty good job of describing what each of the properties are, so I’ll avoid going into detail on them here. From reading the code, it appears that the WritableState
object holds information about the current WriteableStream
(that makes sense!).
There’s single function defined on the WriteableState
that is designed to get the current buffer on the Writeable
as a list.
WritableState.prototype.getBuffer = function getBuffer() {
var current = this.bufferedRequest;
var out = [];
while (current) {
out.push(current);
current = current.next;
}
return out;
};
The definition of the Writable
stream outlines a few properties on the function. Namely, the programmer can specify special write
, destroy
, and final
functions to the Writable
.
function Writable(options) {
// Writable ctor is applied to Duplexes, too.
// `realHasInstance` is necessary because using plain `instanceof`
// would return false, as no `_writableState` property is attached.
// Trying to use the custom `instanceof` for Writable here will also break the
// Node.js LazyTransform implementation, which has a non-trivial getter for
// `_writableState` that would lead to infinite recursion.
if (!(realHasInstance.call(Writable, this)) &&
!(this instanceof Stream.Duplex)) {
return new Writable(options);
}
this._writableState = new WritableState(options, this);
// legacy.
this.writable = true;
if (options) {
if (typeof options.write === 'function')
this._write = options.write;
if (typeof options.writev === 'function')
this._writev = options.writev;
if (typeof options.destroy === 'function')
this._destroy = options.destroy;
if (typeof options.final === 'function')
this._final = options.final;
}
Stream.call(this);
}
The first function defined on the Writeable
prototype introduces a rather whimsical comment.
// Otherwise people can pipe Writable streams, which is just wrong.
Writable.prototype.pipe = function() {
this.emit('error', new errors.Error('ERR_STREAM_CANNOT_PIPE'));
};
You can’t read from a Writeable
stream so of course it doesn’t make sense that you’d be able to pipe the output from a WriteableStream
since it doesn’t exist in the first place.
The write
function is defined next. It takes three parameters: a chunk
of data to write, the encoding
of the data, and a cb
(callback) to be executed once the write is done.
Writable.prototype.write = function(chunk, encoding, cb) {
var state = this._writableState;
var ret = false;
var isBuf = !state.objectMode && Stream._isUint8Array(chunk);
if (isBuf && Object.getPrototypeOf(chunk) !== Buffer.prototype) {
chunk = Stream._uint8ArrayToBuffer(chunk);
}
if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
}
if (isBuf)
encoding = 'buffer';
else if (!encoding)
encoding = state.defaultEncoding;
if (typeof cb !== 'function')
cb = nop;
if (state.ended)
writeAfterEnd(this, cb);
else if (isBuf || validChunk(this, state, chunk, cb)) {
state.pendingcb++;
ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb);
}
return ret;
};
The function grabs the current state of the WritableStream
and checks to see if the data being written to the stream consists of Buffers or Objects and stores this distinction in isBuf
. If the data being written to the stream is expected to be a Buffer
but the chunk
passed is not a Buffer
, the function assumes it is an integer array and converts it to a Buffer
. After that, there is some logic that makes sure that parameters are mapped properly. Namely, the user doesn’t have to pass an encoding
parameter to the function. When this is the case, the second argument passed is actually the callback to be called. If the stream has been ended, the function will call a writeAfterEnd
function which will emit an error to the user since you can’t write to a stream that has been closed.
function writeAfterEnd(stream, cb) {
var er = new errors.Error('ERR_STREAM_WRITE_AFTER_END');
// TODO: defer error events consistently everywhere, not just the cb
stream.emit('error', er);
process.nextTick(cb, er);
}
Otherwise, if the data is a buffer the function will invoke a writeOrBuffer
function.
// if we're already writing something, then just put this
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
if (!isBuf) {
var newChunk = decodeChunk(state, chunk, encoding);
if (chunk !== newChunk) {
isBuf = true;
encoding = 'buffer';
chunk = newChunk;
}
}
var len = state.objectMode ? 1 : chunk.length;
state.length += len;
var ret = state.length < state.highWaterMark;
// we must ensure that previous needDrain will not be reset to false.
if (!ret)
state.needDrain = true;
if (state.writing || state.corked) {
var last = state.lastBufferedRequest;
state.lastBufferedRequest = {
chunk,
encoding,
isBuf,
callback: cb,
next: null
};
if (last) {
last.next = state.lastBufferedRequest;
} else {
state.bufferedRequest = state.lastBufferedRequest;
}
state.bufferedRequestCount += 1;
} else {
doWrite(stream, state, false, len, chunk, encoding, cb);
}
return ret;
}
There is a lot going on here so let’s step through it bit-by-bit. The first couple of lines in the function check to see if chunk
passed is not a Buffer. If it is not, the chunk
is decoded using the decodeChunk
, which creates a chunk from a string using the Buffer.from
function.
function decodeChunk(state, chunk, encoding) {
if (!state.objectMode &&
state.decodeStrings !== false &&
typeof chunk === 'string') {
chunk = Buffer.from(chunk, encoding);
}
return chunk;
}
It then checks to see if the capacity of the stream has been reached by evaluating if the length of the stream has exceeded the highWaterMark
and sets the needDrain
parameter appropriately. Afterwards, it updates the value of the lastBufferedRequest
stored in the state to the Buffer that was passed as a parameter and calls the doWrite
function which writes the chunk to the stream.
The next functions defined are the cork
and uncork
function which are defined as follows. The cork function increments the corked
counter. The corked
counter actually acts as a Boolean, when it has a non-zero value it means there are writes that will need to be buffered. The uncork
function decrements the corked
parameter and clears the buffer.
Writable.prototype.cork = function() {
var state = this._writableState;
state.corked++;
};
Writable.prototype.uncork = function() {
var state = this._writableState;
if (state.corked) {
state.corked--;
if (!state.writing &&
!state.corked &&
!state.finished &&
!state.bufferProcessing &&
state.bufferedRequest)
clearBuffer(this, state);
}
}
There next function is a short and sweat function that allows the user to set the default encoding on the WriteableStream
or raising an error if the user provides an invalid encoding.
Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
// node::ParseEncoding() requires lower case.
if (typeof encoding === 'string')
encoding = encoding.toLowerCase();
if (!Buffer.isEncoding(encoding))
throw new errors.TypeError('ERR_UNKNOWN_ENCODING', encoding);
this._writableState.defaultEncoding = encoding;
return this;
};
The end
function is called when the last chunk
needs to be written to the stream. It writes the chunk by invoking the write
function that we explored above, uncorks it fully, and clears out the WritableState
by invoking endWriteable.
Writable.prototype.end = function(chunk, encoding, cb) {
var state = this._writableState;
if (typeof chunk === 'function') {
cb = chunk;
chunk = null;
encoding = null;
} else if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
}
if (chunk !== null && chunk !== undefined)
this.write(chunk, encoding);
// .end() fully uncorks
if (state.corked) {
state.corked = 1;
this.uncork();
}
// ignore unnecessary end() calls.
if (!state.ending && !state.finished)
endWritable(this, state, cb);
};
And that’s that! I went through and read through the main portions of the WriteableStream
object. I’ll admit that prior to reading the code dilligently, I was a little overwhelmed by everything that was going on under the hood. Going through and reading the code function-by-function definitely cleared up a lot of things for me.
If you have any questions or comments about the above, feel free to ask me a question or reach out to me on Twitter.
Top comments (0)