This post is the second of a series about how I wrote in JavaScript the equivalent of Go(lang) channels.
If you haven't already, I highly recommend reading the first post before reading this one:
Go channels in JS (1/5): Sending and Receiving
Nicolas Lepage for Zenika ・ Dec 2 '19
In the last post we built a basic equivalent of Go channels in JS.
We were able to create channels, send values to these, and receive values from these.
This time we will add a new feature to our JS channels: Buffering.
So let's start with a quick presentation of buffered channels in Go.
Buffered channels
Last time we saw that the send and receive operations of channels are blocking operations.
A send operation will block until a receive operation is ready on the same channel, and vice versa.
At least this is true for unbuffered channels, but a channel may have a buffer!
Let's take back our send123()
example from last time, with a buffered channel:
func main() {
ch := make(chan int) // Create an integer channel
go send123(ch) // Start send123() in a new goroutine
// Receive an integer from ch and print it to stdout 3 times
fmt.Println(<-ch)
fmt.Println(<-ch)
fmt.Println(<-ch)
fmt.Println(<-ch)
}
func send123(ch chan int) {
// Send 3 integers to ch
ch <- 1
ch <- 2
ch <- 3
close(ch) // Close
}
As you can see make()
accepts a second argument which is the size of the channel's buffer.
Our channel ch
now has a buffer with a size of 3, which means that it is able to store 3 values.
As a result, send123()
doesn't have to wait for main()
to be ready to receive from ch
.
Of course the order of execution isn't deterministic, however it is possible that send123()
sends the three integers at once to the channel, which will store these into its buffer.
The send operation becomes a non blocking operation.
And the reverse is also true, as long as ch
has values in its buffer, receiving values from ch
won't be a blocking operation for main()
.
However what happens if the buffer is "too small", if ch
has a buffer of size 1 for example?
Well send123()
will be able to perform only one non blocking send operation, then it will have to wait for a receive operation to free some space in the ch
's buffer.
To sum it up:
- Send operations block if the channel's buffer is full
- Receive operations block if the channel's buffer is empty
Buffered channels are often used to smooth the execution of send/receive intensive processings.
With the right buffer size, it allows the different involved goroutines to face very few blocking time.
Let's transpose our example to JS:
function* main() {
const ch = yield chan(3) // Create a buffered channel
yield fork(send123, ch) // Start send123()
// Receive a value from ch and log it to console 3 times
console.log(`main() received ${yield recv(ch)}`)
console.log(`main() received ${yield recv(ch)}`)
console.log(`main() received ${yield recv(ch)}`)
}
function* send123(ch) {
// Send 3 integers to ch
yield send(ch, 1); console.log('send123() sent 1')
yield send(ch, 2); console.log('send123() sent 2')
yield send(ch, 3); console.log('send123() sent 3')
}
The only thing that has changed compared to last time, is the chan()
operation factory which now accepts an optional buffer size.
We also added some logs in order to see the order of execution.
Now let's add this buffering feature to our JS channels!
Implementing buffered channels
Let's start with buffered channels creation.
Buffered channel creation
To begin with, we have to change our chan()
operation factory in order to take a bufferSize
argument:
export const chan = (bufferSize = 0) => ({
[CHAN]: true,
bufferSize,
})
bufferSize
defaults to 0
, so by default we will create an unbuffered channel.
Last time we decided to create channel keys using the String
constructor, which ensures us a unique reference and gives us a toString()
method out of the box.
We won't change that, but we can add the buffer size in the string for debug purposes:
let nextChanId = 1
const chanKey = bufferSize => new String(
`chan #${nextChanId++} { bufferSize: ${bufferSize} }`
)
Now we must change our channelMiddleware
, so that it will manage the creation of buffered channels.
For now our channels' state contained only a receive queue and a send queue.
Let's add the necessary to make buffered channels work:
export const channelMiddleware = () => (next, ctx) => async operation => {
if (operation[CHAN]) {
const key = chanKey(operation.bufferSize)
ctx[CHANS].set(key, {
sendQ: [],
recvQ: [],
buffer: Array(operation.bufferSize),
bufferLength: 0,
})
return key
}
// ...
}
The buffer
array will have two purposes:
- it will store buffered values
- its length will tell us the buffer's size (or capacity if you prefer)
And the bufferLength
integer will tell us how many values there actually are in the buffer.
This should give us enough information:
- does our buffer have values:
bufferLength !== 0
- is our buffer at full capacity:
bufferLength === buffer.length
And now the fun part! We have to modify the send and receive operations to manage buffered channels.
Send to buffered channel
Until now when we sent a value to a channel, we did only two things: check the receive queue for a waiting receiver and send to it, or push a sender in the send queue.
Now we must also check if there is some place left in the buffer before pushing a sender in the send queue:
if (operation[SEND]) {
const chanState = ctx[CHANS].get(operation.chanKey)
const recver = chanState.recvQ.shift()
if (recver) {
recver(operation.value)
return
}
if (chanState.bufferLength != chanState.buffer.length) {
// Store value in the buffer
}
return new Promise(resolve => {
chanState.sendQ.push(() => {
resolve()
return operation.value
})
})
}
Values should be received in the order they were sent so the buffer must be a FIFO queue, this means that we are always going to store values at then end of the buffer.
We cannot push values, because this would change buffer.length
which tells us the buffer's capacity, but we can use bufferLength
to know the index where to store our value:
if (chanState.bufferLength != chanState.buffer.length) {
chanState.buffer[chanState.bufferLength++] = operation.value
return
}
chanState.bufferLength++
allows use to store operation.value
at the current chanState.bufferLength
index and increment it afterward.
And this it! Now our channels will store values in the buffer as long as there is some space left, and push senders in the send queue only if the buffer is full.
Receive from buffered channel
Until now when we received from a channel, all we did was check the send queue for a waiting sender and receive from it, or push a receiver in the receive queue.
Now we must check if the buffer contains any values beforehand:
if (operation[RECV]) {
const chanState = ctx[CHANS].get(operation.chanKey)
if (chanState.bufferLength !== 0) {
// Receive from buffer
}
const sender = chanState.sendQ.shift()
if (sender) return sender()
return new Promise(resolve => {
chanState.recvQ.push(resolve)
})
}
The buffer being be a FIFO queue, we have to take values from the head of the buffer.
And just like when receiving, we cannot use buffer.shift()
or we would accidentally change the buffer's capacity.
What we should do is read the index 0
and then move all the buffer's values one index to the left without changing its length.
Arrays have a method for this called copyWithin:
if (chanState.bufferLength !== 0) {
const value = chanState.buffer[0]
chanState.buffer.copyWithin(0, 1)
chanState.bufferLength--
return value
}
We also decrement ch.bufferLength
to reflect the new buffer's content.
But there is still a problem, when we free some space in the buffer, we should check if there are senders in the send queue.
Having a non full buffer and senders in the send queue would be an invalid state for the channel.
So let's check the send queue when we take a value from the buffer:
if (chanState.bufferLength !== 0) {
const value = chanState.buffer[0]
chanState.buffer.copyWithin(0, 1)
const sender = chanState.sendQ.shift()
if (sender) {
chanState.buffer[chanState.bufferLength - 1] = sender()
} else {
chanState.bufferLength--
}
return value
}
If there is a sender in the send queue, we receive from it and put the value at the end of the buffer.
As a consequence we decrement chanState.bufferLength
only if there is no sender in the send queue.
And this is it! We now have fully working buffered channels.
What next
Next time we are going to see how channels may be closed, maybe not the funniest part, but definitely a key feature of channels.
So I have three more posts coming:
- Go channels in JS (3/5): Closing
- Go channels in JS (4/5): Ranging
- Go channels in JS (5/5): Selecting
I hope you enjoyed this second post, give a ❤️, 💬 leave a comment, or share it with others, and follow me to get notified of my next posts.
Top comments (0)