DEV Community

Jeff Riggle
Jeff Riggle

Posted on

How deep does the callback go?

Building a scheduler

Despite being single-threaded, JavaScript has concurrency as a first-class citizen. As a result, I can offload work by creating a scheduler. This scheduler allows me to leverage the JavaScript event loop to its fullest.

class Scheduler {
  constructor() {
    this.tasks = [];
  }

  defer(task) {
    this.tasks.push(task);

    if (this.activeTimeout) return;

    this.activeTimeout = setTimeout(() => this.runNextTask(), 0);
  }

  runNextTask() {
    let task = this.tasks.splice(0, 1)[0];
    try {
      task();
    } catch (e) {
      // swallow exception
    }

    if (this.tasks.length > 0) {
      this.activeTimeout = setTimeout(() => this.runNextTask(), 0);
    } else {
      this.activeTimeout = null;
    }
  }
}

module.exports = { Scheduler };
Enter fullscreen mode Exit fullscreen mode

While this scheduler does not give me direct access to parallelism, it does allow me to defer execution and maximize concurrency in JavaScript.

Putting it to "good" use

Now, a scheduler without a purpose isn’t particularly interesting, so let’s try it out in a toy HTTP server. Why not build a simple HTTP server that transmits dynamic HTML and gets XSS comically wrong?

app.get('/', (req, res) => {
  res.send('<html><body><form action="file" method="post"><label for="fscript">Create a script</label><input type="text" name="fscript" required /><input type="submit" value="send"/></form></body></html>');
});

app.post('/file', (req, res) => {
  // Read script off of POST body.
  const script = req.body.fscript;
  const fileName = `./submission-${Date.now()}.js`;
  scheduler.defer(() => {
    // Store script on file system
    fs.writeFile(fileName, script, 'utf8', err => {
      if (err) {
        console.error('Failed to write file', err);
      }
    });
  });
  scheduler.defer(() => {
    const stats = buildStats(fileName, script);

    // After parsing file store statistics on file system
    fs.writeFile(fileName + '.stats', JSON.stringify(stats), 'utf8', err => {
      if (err) {
        console.error('Failed to write stats file', err);
      }
    });

    // "alert()" and "eval()" are clearly the only way we can
    // have a problem here, let's remove that.
    if (stats.evil > 0) {
      scheduler.defer(() => {
        fs.unlink(fileName, err => {
          if (err) {
            console.error('Failed to delete file', err);
          }
        });
      });
    }
  });
  // Send the input back as a script to a new page
  // about:blank is too much of a hassle
  res.send(`<html><script>${script}</script></html>`);
});

app.listen(3000, () => console.log('app started'));
Enter fullscreen mode Exit fullscreen mode

In this server, we can see that we take in user input as JavaScript and just send that back on a new page. In the background, we save the JavaScript asset and only delete it if we flag it as “malicious.”

However, let’s take a closer look at that. Did we just put a scheduler in a scheduler?

scheduler.defer(() => {
    const stats = buildStats(fileName, script);

    fs.writeFile(fileName + '.stats', JSON.stringify(stats), 'utf8', err => {
      if (err) {
        console.error('Failed to write stats file', err);
      }
    });

    if (stats.evil > 0) {
      scheduler.defer(() => {
        fs.unlink(fileName, err => {
          if (err) {
            console.error('Failed to delete file', err);
          }
        });
      });
    }
  });
Enter fullscreen mode Exit fullscreen mode

No, not this!

scheduler.defer(() => {
    fs.unlink(fileName, err => {
        if (err) {
            console.error('Failed to delete file', err);
        }
    });
});
Enter fullscreen mode Exit fullscreen mode

Let's take a closer look at the callback on the fs.writeFile and fs.unlink.

fs.writeFile(fileName + '.stats', JSON.stringify(stats), 'utf8', err => {
    if (err) {
        console.error('Failed to write stats file', err);
    }
});
Enter fullscreen mode Exit fullscreen mode

What is Libuv?

NodeJS has a scheduler of its own, hidden behind callback functions, promises, or async await. This scheduler manages access to the file system, network interface, and various other OS features.

To understand a bit better, NodeJS builds on a library called libuv. This allows C/C++ programs to operate in an event loop. Now, if we are willing to tolerate owning more complexity, we can use the internals of NodeJS directly to create our own HTTP server. However, to do that, we have to drop down into C.

First, we need to use libuv as a TCP server.

int main() {
    uv_loop_t *loop = uv_default_loop();

    uv_tcp_t server;
    uv_tcp_init(loop, &server);
    struct sockaddr_in addr;

    uv_ip4_addr("0.0.0.0", 3000, &addr);
    uv_tcp_bind(&server, (const struct sockaddr *)&addr, 0);
    int r = uv_listen((uv_stream_t *)&server, 128, on_new_connection);
    if (r) {
      fprintf(stderr, "Error while listening: %s\n", uv_strerror(r));
    }

    printf("Started listening...\n");

    return uv_run(loop, UV_RUN_DEFAULT);
}
Enter fullscreen mode Exit fullscreen mode

It can be easy to forget that HTTP/2 is just structured data over TCP. Probably because the layers of abstraction are so baked in at this point. So, we can handle incoming requests over a TCP stream, parse the HTTP request, and provide the correct response.

void on_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
  if (nread > 0) {
    // Parse the HTTP string.
    HttpHeaderDetails details = get_header_details(buf);
    // Determine "route" and handle response.
    if (is_home_page_request(details)) {
      handle_home_request(client);
    } else if (is_post_file_request(details)) {
      handle_post_file_request(client, buf);
    }
    else {
      fprintf(stderr, "Invalid method %s or path %s\n", details.method, details.path);
      free(buf->base);
      return;
    }
  }

  // Handle the edge cases.
}
Enter fullscreen mode Exit fullscreen mode

As before, in our POST handler, we can even write the same file using libuv.

void handle_post_file_request(uv_stream_t *client, const uv_buf_t *buf) {
  ScriptData script = extract_script(buf);
  char* res = generate_post_response(script);
  uv_write_t *write_req = (uv_write_t *)malloc(sizeof(uv_write_t));
  uv_buf_t response_buf = uv_buf_init((char *)res, strlen(res));

  // Respond on TCP connection.
  uv_write(write_req, client, &response_buf, 1, on_write_end);

  uv_fs_t* open_req = malloc(sizeof(uv_fs_t));
  open_req->data = &script;

  char file_name[256];
  snprintf(file_name, sizeof(file_name), "./submission-%lld.js", current_time_ms());
  // Queue up writing data to file system.
  uv_fs_open(uv_default_loop(), open_req, file_name, O_CREAT | O_WRONLY, 0666, on_file_open);
}
Enter fullscreen mode Exit fullscreen mode

But what about system libraries?

Now we may feel compelled to stop here, but where is the fun in that? We must go deeper! You see, libuv is a scheduler that provides a clean abstraction over the kernel.

Part of what libuv brings to the table is the ability to offload blocking IO from the event loop. It does this by using standard libraries included in the operating system. In the case of Network IO, there are a variety of different options depending on the target operating system. Linux-based operating systems provide

epoll, while MacOS provides kqueue, and Windows provides IOCP.

Since there is nothing stopping us from interacting directly with those libraries, and I happen to be playing around on a Mac, I will use kqueue to build a non-blocking event loop over a TCP socket.

int main() {
  int kqueue_fd;
  struct kevent change_event, event_list[32];

  int server_fd = connect_server();
  if (server_fd == -1) {
    fprintf(stderr, "Failed to setup server aborting");
    return -1;
  }
  printf("Server is listenting on port 3000...\n");

  kqueue_fd = kqueue();
  if (kqueue_fd == -1) {
    fprintf(stderr, "failed to create queue. Closing server socket!\n");
    close(server_fd);
    return -1;
  }

  EV_SET(&change_event, server_fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, NULL);
  if (kevent(kqueue_fd, &change_event, 1, NULL, 0, NULL) == -1) {
    fprintf(stderr, "Failed to register socket closing server socket!\n");
    close(server_fd);
    return -1;
  }

  // Start the event loop
  while (1) {
    int event_count = kevent(kqueue_fd, NULL, 0, event_list, 32, NULL);
    if (event_count == -1) {
      fprintf(stderr, "Failed to wait on kqueue\n");
      break;
    }

    for (int i = 0; i < event_count; i++) {
      // Handle incoming TCP messages
      struct kevent target_event = event_list[i];
      process_event(target_event, server_fd, &change_event, kqueue_fd);
    }
  }

  close(server_fd);
  close(kqueue_fd);
  return 0;
}
Enter fullscreen mode Exit fullscreen mode

Once again, we can handle the same sort of processing of HTTP messages as we had in the prior applications.

void process_event(struct kevent evt, int server_fd, struct kevent* change_event, int kqueue_fd) {
  int target_fd = evt.ident;
  // omitted handling of connect and disconnect

  if (evt.filter == EVFILT_READ) {
    char* buffer = malloc(1024);
    ssize_t bytes_read = read(target_fd, buffer, 1023);
    if (bytes_read > 0) {
      tcp_buf_t tcp_buffer;
      tcp_buffer.body = buffer;
      tcp_buffer.len = 1023;

      http_header_details_t header_details = get_header_details(&tcp_buffer);
      if (is_home_page_request(header_details)) {
        handle_home_request(target_fd);
      } else if (is_post_file_request(header_details)) {
        handle_post_file_request(target_fd, &tcp_buffer);
      } else {
        // Unhandled request
        return;
      }

      return;
    }
    // Omitted error handling
  }
}

Enter fullscreen mode Exit fullscreen mode

Breaking down the architecture

At this point, you must be thinking you have got me looking at C. Certainly, it cannot go any further than this, right? Of course, the answer to that is no. The operating system you run on is, in effect, a scheduler as well. While I will not give you a code example for this one, the operating system has the difficult problem of scheduling multiple userland applications and the underlying resources (CPU, Disk, etc).

OS Architecture

As we can see here, the operating system must manage access between many userland applications and resources like the CPU, the filesystem, and the network interface.

If you are interested in any of the code presented, you can find it here. Alternatively, if you are interested in a similar dive into event loops and file systems, you can check out this blog post I did. This focused more on file system access.

Depending on where you are writing your scheduler, remember that there are many other layers of schedulers propping up your application. Performance isn’t strictly concurrency versus parallelism. A failure to recognize this might cause you to mistake a performance gain via parallelism with something that could have been achieved with non-blocking IO. If you are willing to look a bit deeper, you might find that the problem is in how you access data. The abstraction you are using might just be using a set of operating system features that make parallel execution seem like the only valid option. As it turns out, with the right set of APIs, a single thread can get you further than you might think.

Top comments (0)