loading...

Introducing Nexus.js: A multi-threaded JavaScript run-time

voodooattack profile image Abdullah Ali Updated on ・9 min read

First of all, I recommend reading the old series if you're not familiar with this project. If you don't want to read all of that, no worries! I got you covered!

Now that that's out of the way, let us begin.

Last year I started implementing Nexus.js, a multi-threaded server-side JavaScript run-time based on WebKit/JavaScriptCore. I dropped it for a while, due to circumstances out of my control that I'm not going to discuss here, but here we are again: I couldn't keep myself from working on it for long.


So let us start by discussing the architecture of Nexus and how things work:

Nexus.js Runtime

Event Loop

  • There is no event loop.
  • There is a thread-pool with a (lock-free) task queue.
  • Every time you call setTimeout or setImmediate or create a promise, a task is queued to the task queue.
  • Every time a task is scheduled, the first available thread will pick the task and execute it.
  • Promises resolve on all CPU cores. A call to Promise.all() will resolve the promises in parallel.

ES6

  • async/await is supported, and encouraged.
  • for await(...) is supported.
  • Destructuring is supported.
  • async try/catch/finally is supported.

Modules

  • CommonJS is not supported. (require(...) and module.exports)
  • All modules use the ES6 import/export syntax.
  • Dynamic importing is possible through import('file-or-package').then(...).
  • import.meta is supported. For example: import.meta.filename and import.meta.dirname to name a few.
  • Bonus feature: you can import URLs directly from the source code.

Example:

import { h } from 'https://unpkg.com/preact/dist/preact.esm.js';

EventEmitter

  • Nexus implements a promise-based EventEmitter class.
  • Event handlers will be queued on all threads, and will execute in parallel.
  • The result of EventEmitter.emit(...) is a promise that will resolve to an array of all values returned by the event handlers.

Example:

class EmitterTest extends Nexus.EventEmitter {
  constructor() {
    super();
    for(let i = 0; i < 4; i++)
      this.on('test', value => { console.log(`fired test ${i}!`); console.inspect(value); });
    for(let i = 0; i < 4; i++)
      this.on('returns-a-value', v => `${v + i}`);
  }
}

const test = new EmitterTest();

async function start() {
  await test.emit('test', { payload: 'test 1' });
  console.log('first test done!');
  await test.emit('test', { payload: 'test 2' });
  console.log('second test done!');
  const values = await test.emit('returns-a-value', 10);
  console.log('third test done, returned values are:'); console.inspect(values);
}

start().catch(console.error);

I/O

  • All input/output is done through three primitives: the Device, the Filter and the Stream.
  • All input/output primitives implement the EventEmitter class.
  • To use a device, you construct a ReadableStream or WritableStream on top of it.
  • To manipulate data, you add filters to your ReadableStream or WritableStream.
  • Lastly, you use source.pipe(...destinationStreams) and then await source.resume() to process the data.
  • All input/output operations are done using ArrayBuffer objects.
  • Filters implement process(buffer) to handle the data.

Example: (Converts UTF8 to UTF16 with 4 separate outputs files)

async function start() {
  const startTime = Date.now();
  try {
    const device = new Nexus.IO.FilePushDevice('enwik8');
    const stream = new Nexus.IO.ReadableStream(device);

    stream.pushFilter(new Nexus.IO.EncodingConversionFilter("UTF-8", "UTF-16LE"));

    const wstreams = [0,1,2,3]
      .map(i => new Nexus.IO.WritableStream(new Nexus.IO.FileSinkDevice('enwik16-' + i)));

    console.log('piping...');

    stream.pipe(...wstreams);

    console.log('streaming...');

    await stream.resume();

    await stream.close();

    await Promise.all(wstreams.map(stream => stream.close()));

    console.log(`finished in ${(Date.now() * startTime) / 1000} seconds!`);
  } catch (e) {
    console.error('An error occurred: ', e);
  }
}

start().catch(console.error);

TCP/UDP

  • Nexus.js provides an Acceptor class, responsible for binding addresses/ports and listening for connections.
  • Every time a connection is received, the connection event is fired and provided with a Socket device.
  • Each Socket instance is a bidirectional I/O device.
  • You may use ReadableStream and WritableStream to manipulate the Socket.

Very basic example: (Writes 'Hello world!' to clients)

const acceptor = new Nexus.Net.TCP.Acceptor();
let count = 0;

acceptor.on('connection', (socket, endpoint) => {
  const connId = count++;
  console.log(`connection #${connId} from ${endpoint.address}:${endpoint.port}`);
  const rstream = new Nexus.IO.ReadableStream(socket);
  const wstream = new Nexus.IO.WritableStream(socket);
  const buffer = new Uint8Array(13);
  const message = 'Hello World!\n';
  for(let i = 0; i < 13; i++)
    buffer[i] = message.charCodeAt(i);
  rstream.pushFilter(new Nexus.IO.UTF8StringFilter());
  rstream.on('data', buffer => console.log(`got message: ${buffer}`));
  rstream.resume().catch(e => console.log(`client #${connId} at ${endpoint.address}:${endpoint.port} disconnected!`));
  console.log(`sending greeting to #${connId}!`);
  wstream.write(buffer);
});

acceptor.bind('127.0.0.1', 10000);
acceptor.listen();

console.log('server ready');

HTTP

  • Nexus provides a Nexus.Net.HTTP.Server class that essentially inherits TCPAcceptor.
  • Same basic interface.
  • When the server finishes parsing/validating the basic HTTP headers of an incoming connection, the connection event is fired with the connection and peer information.
  • Every Connection instance has a request and a response. Those are input/output devices.
  • You may construct ReadableStream and WritableStream to manipulate the request/response.
  • If you pipe to a Response object, the streams enter chunked-encoding mode. Otherwise, you may use response.write() to write a regular string payload.

Complex example: (Baseline HTTP server with Chunked encoding, details omitted)


....


/**
 * Creates an input stream from a path.
 * @param path
 * @returns {Promise<ReadableStream>}
 */
async function createInputStream(path) {
  if (path.startsWith('/')) // If it starts with '/', omit it.
    path = path.substr(1);
  if (path.startsWith('.')) // If it starts with '.', reject it.
    throw new NotFoundError(path);
  if (path === '/' || !path) // If it's empty, set to index.html.
    path = 'index.html';
  /**
   * `import.meta.dirname` and `import.meta.filename` replace the old CommonJS `__dirname` and `__filename`.
   */
  const filePath = Nexus.FileSystem.join(import.meta.dirname, 'server_root', path);
  try {
    // Stat the target path.
    const {type} = await Nexus.FileSystem.stat(filePath);
    if (type === Nexus.FileSystem.FileType.Directory) // If it's a directory, return its 'index.html'
      return createInputStream(Nexus.FileSystem.join(filePath, 'index.html'));
    else if (type === Nexus.FileSystem.FileType.Unknown || type === Nexus.FileSystem.FileType.NotFound)
      // If it's not found, throw NotFound.
      throw new NotFoundError(path);
  } catch(e) {
    if (e.code)
      throw e;
    throw new NotFoundError(path);
  }
  try {
    // First, we create a device.
    const fileDevice = new Nexus.IO.FilePushDevice(filePath);
    // Then we return a new ReadableStream created using our source device.
    return new Nexus.IO.ReadableStream(fileDevice);
  } catch(e) {
    throw new InternalServerError(e.message);
  }
}

/**
 * Connections counter.
 */
let connections = 0;

/**
 * Create a new HTTP server.
 * @type {Nexus.Net.HTTP.Server}
 */
const server = new Nexus.Net.HTTP.Server();

// A server error means an error occurred while the server was listening to connections.
// We can mostly ignore such errors, we display them anyway.
server.on('error', e => {
  console.error(FgRed + Bright + 'Server Error: ' + e.message + '\n' + e.stack, Reset);
});

/**
 * Listen to connections.
 */
server.on('connection', async (connection, peer) => {
  // Start with a connection ID of 0, increment with every new connection.
  const connId = connections++;
  // Record the start time for this connection.
  const startTime = Date.now();
  // Destructuring is supported, why not use it?
  const { request, response } = connection;
  // Parse the URL parts.
  const { path } = parseURL(request.url);
  // Here we'll store any errors that occur during the connection.
  const errors = [];
  // inStream is our ReadableStream file source, outStream is our response (device) wrapped in a WritableStream.
  let inStream, outStream;
  try {
    // Log the request.
    console.log(`> #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${
      FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}"`, Reset);
    // Set the 'Server' header.
    response.set('Server', `nexus.js/0.1.1`);
    // Create our input stream.
    inStream = await createInputStream(path);
    // Create our output stream.
    outStream = new Nexus.IO.WritableStream(response);
    // Hook all `error` events, add any errors to our `errors` array.
    inStream.on('error', e => { errors.push(e); });
    request.on('error', e => { errors.push(e); });
    response.on('error', e => { errors.push(e); });
    outStream.on('error', e => { errors.push(e); });
    // Set content type and request status.
    response
      .set('Content-Type', mimeType(path))
      .status(200);
    // Hook input to output(s).
    const disconnect = inStream.pipe(outStream);
    try {
      // Resume our file stream, this causes the stream to switch to HTTP chunked encoding.
      // This will return a promise that will only resolve after the last byte (HTTP chunk) is written.
      await inStream.resume();
    } catch (e) {
      // Capture any errors that happen during the streaming.
      errors.push(e);
    }
    // Disconnect all the callbacks created by `.pipe()`.
    return disconnect();
  } catch(e) {
    // If an error occurred, push it to the array.
    errors.push(e);
    // Set the content type, status, and write a basic message.
    response
      .set('Content-Type', 'text/plain')
      .status(e.code || 500)
      .send(e.message || 'An error has occurred.');
  } finally {
    // Close the streams manually. This is important because we may run out of file handles otherwise.
    if (inStream)
      await inStream.close();
    if (outStream)
      await outStream.close();
    // Close the connection, has no real effect with keep-alive connections.
    await connection.close();
    // Grab the response's status.
    let status = response.status();
    // Determine what colour to output to the terminal.
    const statusColors = {
      '200': Bright + FgGreen, // Green for 200 (OK),
      '404': Bright + FgYellow, // Yellow for 404 (Not Found)
      '500': Bright + FgRed // Red for 500 (Internal Server Error)
    };
    let statusColor = statusColors[status];
    if (statusColor)
      status = statusColor + status + Reset;
    // Log the connection (and time to complete) to the console.
    console.log(`< #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${
      FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}" ${status} ${(Date.now() * startTime)}ms` +
      (errors.length ? " " + FgRed + Bright + errors.map(error => error.message).join(', ') + Reset : Reset));
  }
});

/**
 * IP and port to listen on.
 */
const ip = '0.0.0.0', port = 3000;
/**
 * Whether or not to set the `reuse` flag. (optional, default=false)
 */
const portReuse = true;
/**
 * Maximum allowed concurrent connections. Default is 128 on my system. (optional, system specific)
 * @type {number}
 */
const maxConcurrentConnections = 1000;
/**
 * Bind the selected address and port.
 */
server.bind(ip, port, portReuse);
/**
 * Start listening to requests.
 */
server.listen(maxConcurrentConnections);
/**
 * Happy streaming!
 */
console.log(FgGreen + `Nexus.js HTTP server listening at ${ip}:${port}` + Reset);

Benchmark

I think I've covered almost everything I've implemented so far. So now, let's talk about performance.

Here's the current benchmark for the above HTTP server, with 100 concurrent connections and a total of 10,000 requests:

This is ApacheBench, Version 2.3 <$Revision: 1796539 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient).....done


Server Software:        nexus.js/0.1.1
Server Hostname:        localhost
Server Port:            3000

Document Path:          /
Document Length:        8673 bytes

Concurrency Level:      100
Time taken for tests:   9.991 seconds
Complete requests:      10000
Failed requests:        0
Total transferred:      87880000 bytes
HTML transferred:       86730000 bytes
Requests per second:    1000.94 [#/sec] (mean)
Time per request:       99.906 [ms] (mean)
Time per request:       0.999 [ms] (mean, across all concurrent requests)
Transfer rate:          8590.14 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.1      0       1
Processing:     6   99  36.6     84     464
Waiting:        5   99  36.4     84     463
Total:          6  100  36.6     84     464

Percentage of the requests served within a certain time (ms)
  50%     84
  66%     97
  75%    105
  80%    112
  90%    134
  95%    188
  98%    233
  99%    238
 100%    464 (longest request)

That's 1,000 requests per second! On an old i7 that's running both the benchmark software, an IDE consuming 5GB of RAM, and the server itself!

voodooattack@voodooattack:~$ cat /proc/cpuinfo 
processor   : 0
vendor_id   : GenuineIntel
cpu family  : 6
model       : 60
model name  : Intel(R) Core(TM) i7-4770 CPU @ 3.40GHz
stepping    : 3
microcode   : 0x22
cpu MHz     : 3392.093
cache size  : 8192 KB
physical id : 0
siblings    : 8
core id     : 0
cpu cores   : 4
apicid      : 0
initial apicid  : 0
fpu     : yes
fpu_exception   : yes
cpuid level : 13
wp      : yes
flags       : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts
bugs        :
bogomips    : 6784.18
clflush size    : 64
cache_alignment : 64
address sizes   : 39 bits physical, 48 bits virtual
power management:

And here are the results in graph form:

Benchmark results

I tried testing with 1,000 concurrent requests, but ApacheBench times out due to that many sockets being open. I tried httperf and here are the results:

voodooattack@voodooattack:~$ httperf --port=3000 --num-conns=10000 --rate=1000
httperf --client=0/1 --server=localhost --port=3000 --uri=/ --rate=1000 --send-buffer=4096 --recv-buffer=16384 --num-conns=10000 --num-calls=1
httperf: warning: open file limit > FD_SETSIZE; limiting max. # of open files to FD_SETSIZE
Maximum connect burst length: 262

Total: connections 9779 requests 9779 replies 9779 test-duration 10.029 s

Connection rate: 975.1 conn/s (1.0 ms/conn, <=1022 concurrent connections)
Connection time [ms]: min 0.5 avg 337.9 max 7191.8 median 79.5 stddev 848.1
Connection time [ms]: connect 207.3
Connection length [replies/conn]: 1.000

Request rate: 975.1 req/s (1.0 ms/req)
Request size [B]: 62.0

Reply rate [replies/s]: min 903.5 avg 974.6 max 1045.7 stddev 100.5 (2 samples)
Reply time [ms]: response 129.5 transfer 1.1
Reply size [B]: header 89.0 content 8660.0 footer 2.0 (total 8751.0)
Reply status: 1xx=0 2xx=9779 3xx=0 4xx=0 5xx=0

CPU time [s]: user 0.35 system 9.67 (user 3.5% system 96.4% total 99.9%)
Net I/O: 8389.9 KB/s (68.7*10^6 bps)

Errors: total 221 client-timo 0 socket-timo 0 connrefused 0 connreset 0
Errors: fd-unavail 221 addrunavail 0 ftab-full 0 other 0

As you can see, it still works; albeit with some connections timing out due to the stress. I'm still working on figuring out what's causing this issue.

The source code to the project is available at GitHub, feel free to check it out.

Happy hacking, and until next time!

Edit:

Here is a pre-alpha release in case you wish to do your own benchmarking. Please note that this was only tested on two machines, both were running Ubuntu 17.10:

https://github.com/voodooattack/nexusjs/releases/tag/4dd3419

Discussion

pic
Editor guide
Collapse
the_spyke profile image
Anton Alexandrenok
let count = 0;

acceptor.on('connection', (socket, endpoint) => {
  const connId = count++;

How does it work? If event listeners run on parallel threads, there will be a race condition in the increment without synchronization.

Collapse
voodooattack profile image
Abdullah Ali Author

Hi! Concurrent access to variables does not corrupt them or cause a race condition. All variables behave atomically in Nexus.

Read more here: nexusjs.com/architecture/

Collapse
the_spyke profile image
Anton Alexandrenok

So, in this particular example

When two different execution contexts that share a closure variable are entered in parallel, one will acquire a lock

means that a listener will lock on parent closure right at the start of the function till its end? Or it will lock only from the time of identifier resolution for count to finishing its value reading?

Thread Thread
voodooattack profile image
Abdullah Ali Author

It will only lock upon access to the variable and then it will release the lock as soon as it is done.

Thread Thread
the_spyke profile image
Thread Thread
voodooattack profile image
Abdullah Ali Author

You’re welcome :)

Collapse
brandonros profile image
Brandon Ros

Post a graph comparing a similar node.js application (and maybe even Java/Haskell/whatever people compare these days) and show how this handles concurrent requests in a more performant fashion?

Collapse
voodooattack profile image
Abdullah Ali Author

That’s all planned. :)

I’m just waiting till Nexus is a little more mature before making big comparisons.

Collapse
trusktr profile image
Joe Pea

A simple test comparing simple things like many setTimeout operations compared to Node.js would be nice, before the more complicated tests.

Collapse
jtenner profile image
jtenner

Is there a location for chat? Like discord, or slack where people can chat about it?

Collapse
voodooattack profile image
Collapse
sreejithms profile image
Sreejith

Few newbie questions.
Where can I use this? inside a node server? or can I use it on the clientside?
From the docs it says multi-threaded JavaScript run-time, so is it like V8?
Can we make use of WebAssembly and create a library to support multi-threading much more effectively?

Collapse
voodooattack profile image
Abdullah Ali Author

Hi! You can only use Nexus on the server right now. It's a stand-alone environment and doesn't require a Node.js installation.

It's not based on V8, it's based on JSC.

I just introduced WebAssembly support to the repo. So yes, you can use multi-threading with WebAssembly now. Although I'm still ironing out the details.

Collapse
blonkm profile image
Collapse
voodooattack profile image
Collapse
vaibhav93 profile image
Vaibhav Bansal

This sounds amazing. Can you do an article comparing it with napajs ? That'll be great

Collapse
voodooattack profile image
Abdullah Ali Author

I certainly plan to do that at some point, once I get the code to a more stable state. :)

Collapse
trusktr profile image
Joe Pea

This is a nice project! Any news on this?

Collapse
leecen profile image
leecen

You can support commonJs like Node module Suorce Code

Collapse
voodooattack profile image
Abdullah Ali Author

Even Node itself is phasing away from CommonJS, why would I want to repeat its mistakes?