DEV Community

Cover image for Building simple event-driven applications with Pub/Sub
Marcus Kohlberg for Encore

Posted on

Building simple event-driven applications with Pub/Sub

In this post, I’ll walk you through how to create an event-driven Node.js app in TypeScript. We will start with a traditional application and then take the steps needed make the services loosely coupled by making them communicate through Pub/Sub.

We will look at how run the application locally but also how to go about getting an event-driven app deployed to the cloud.

Video version:

The application

The application we will be looking at is an uptime monitoring system. We have a list of websites to monitor and a CronJob for checking every site and seeing if they are reachable or not, our application will send a notification if any of the statuses changes. The status of a newly added site will be unknown until the CronJob has checked the status of the site. This is one of the things we want to change when making our application event-driven.

uptime app

Here is the full code of the finished application: https://github.com/encoredev/examples/tree/main/ts/uptime

Architecture

Here we have two architectural digrams, on the left side is our current system and on the right is how we want it to look when we are done.

architectural digrams

In our current state, you can see that we have four services: frontend, monitor, site and slack. The filled in arrows indicate that the monitor service are calling endpoints in both the site and slack services, the monitor service has a hard dependency on those two services. We can also see that both the monitor and the site services has databases. The monitor service is the one that has the CronJob which checks the status of each site once every hour.

So, we want to introduce a site.added topic that we can publish to from the site service whenever we add a new site. We will subscribe to the site.added topic in the monitor service and then ping the website to check the status when a new site is added.

We will also want to remove the hard dependency between the monitor and the slack service by introducing a uptime-transition topic.

Benefits

Before we continue, let’s talk about why it’s a good idea to make these changes to our system.

  • By making these changes we make our services more loosely coupled, which almost always is a good thing. The monitor service does no longer need to know that the slack service even exists and the site service can remain independent of the monitor service.

  • The slack service can now be offline without affecting the monitor service. And then when it gets back online it will just read from the event queue and pick up where it left off. Are system is therefore more robust.

  • Both the monitor and slack service are now dependent on an abstraction rather than each other, this is called Dependency Inversion. This allows us to replace or add other notification channels like discord or email without needing to make changes to to the monitor service.

Pitfalls

But making a system event-driven is not always a good idea. Here are some things to watch out for:

  • It’s not an all or nothing approach! Just use Pub/Sub in places where it’s a great fit. There is no need to be dogmatic about this. Please don’t be dogmatic… WOFF!🐶

  • Simply replacing traditional API calls with events isn’t enough. There are essential concepts to understand when working with asynchronous queues, particularly Eventual Consistency and Idempotency. Take the time to learn about these ideas. Trust me, you will thank me later.

  • An event-driven system is more complex. You have more components you need to build and there are additional infrastructure requirements. Using the right tools for the job is super duper important. Debugging problems and managing your environments will be really frustrating if you don’t have the right tools.

And that is why we will be using Encore.ts to build our event-driven application. Encore.ts is an Open Source framework that is specifically designed to make it easier to build robust and type-safe distributed systems with TypeScript, exactly like the event-driven backend we’re going to build today. And it has a lot of useful built-in tools to make the development experience smoother, like a local development dashboard which we’ll look at a little later.

Now, let’s look at some code.

Adding our Pub/Sub topic

From a code perspective, a service is just another folder in your repo when working with Encore. You will most likely end up with a lot of services when building an event-driven application, so creating new services needs to be easy. This is one of the reasons why Encore is a great fit for this kind of application.

Let’s start by looking at the site service.

import { api } from "encore.dev/api";
import { SQLDatabase } from "encore.dev/storage/sqldb";
import knex from "knex";

// Site describes a monitored site.
export interface Site {
  id: number;
  url: string;
}

export interface AddParams {
  url: string;
}

// Add a new site to the list of monitored websites.
export const add = api(
  { expose: true, method: "POST", path: "/site" },
  async (params: AddParams): Promise<Site> => {
    const site = (await Sites().insert({ url: params.url }, "*"))[0];
    return site;
  },
);

// Get a site by id.
export const get = api(
  { expose: true, method: "GET", path: "/site/:id", auth: false },
  async ({ id }: { id: number }): Promise<Site> => {
    const site = await Sites().where("id", id).first();
    return site ?? Promise.reject(new Error("site not found"));
  },
);

// Delete a site by id.
export const del = api(
  { expose: true, method: "DELETE", path: "/site/:id" },
  async ({ id }: { id: number }): Promise<void> => {
    await Sites().where("id", id).delete();
  },
);

export interface ListResponse {
  sites: Site[]; // Sites is the list of monitored sites
}

// Lists the monitored websites.
export const list = api(
  { expose: true, method: "GET", path: "/site" },
  async (): Promise<ListResponse> => {
    const sites = await Sites().select();
    return { sites };
  },
);

// Define a database named 'site', using the database migrations
// in the "./migrations" folder. Encore automatically provisions,
// migrates, and connects to the database.
const SiteDB = new SQLDatabase("site", {
  migrations: "./migrations",
});

const orm = knex({
  client: "pg",
  connection: SiteDB.connectionString,
});

const Sites = () => orm<Site>("site");
Enter fullscreen mode Exit fullscreen mode

This service has a few CRUD endpoints like add, get, delete and list. We are interested in the add endpoint because we want to publish an event when a new site is added. Let’s start to make our application event-driven by adding our site.added Topic. We do this by calling the Topic class, specifying the type that will be published on this topic (in this case the Site type) and we specify the delivery guarantee.

import { Topic } from "encore.dev/pubsub";

export const SiteAddedTopic = new Topic<Site>("site.added", {
  deliveryGuarantee: "at-least-once",
});
Enter fullscreen mode Exit fullscreen mode

Now, in the add endpoint we can now call the .publish method on the SiteAddedTopic object.

export const add = api(
  { expose: true, method: "POST", path: "/site" },
  async (params: AddParams): Promise<Site> => {
    const site = (await Sites().insert({ url: params.url }, "*"))[0];
    await SiteAddedTopic.publish(site);
    return site;
  },
);
Enter fullscreen mode Exit fullscreen mode

Using Pub/Sub with Encore type-safe so you will get compile time errors if you publish to a Topic with the incorrect parameters 🤯

Our architectural diagram now looks like this:

architectural digram

We are publishing to the site.added topic but we are not yet subscribing to it from the monitor service, so let’s fix that now.

Adding the subscriber

So, let’s open the monitor service.

import { api } from "encore.dev/api";
import { SQLDatabase } from "encore.dev/storage/sqldb";
import { Site } from "../site/site";
import { ping } from "./ping";
import { site, slack } from "~encore/clients";
import { CronJob } from "encore.dev/cron"; // Check checks a single site.

// Check checks a single site.
export const check = api(
  { expose: true, method: "POST", path: "/check/:siteID" },
  async (p: { siteID: number }): Promise<{ up: boolean }> => {
    const s = await site.get({ id: p.siteID });
    return doCheck(s);
  },
);

// CheckAll checks all sites.
export const checkAll = api(
  { expose: true, method: "POST", path: "/check-all" },
  async (): Promise<void> => {
    const sites = await site.list();
    await Promise.all(sites.sites.map(doCheck));
  },
);

async function doCheck(site: Site): Promise<{ up: boolean }> {
  const { up } = await ping({ url: site.url });

  const wasUp = await getPreviousMeasurement(site.id);
  if (up !== wasUp) {
    const text = `*${site.url} is ${up ? "back up." : "down!"}*`;
    await slack.notify({ text });
  }

  await MonitorDB.exec`
      INSERT INTO checks (site_id, up, checked_at)
      VALUES (${site.id}, ${up}, NOW())
  `;
  return { up };
}

async function getPreviousMeasurement(siteID: number): Promise<boolean> {
  const row = await MonitorDB.queryRow`
      SELECT up
      FROM checks
      WHERE site_id = ${siteID}
      ORDER BY checked_at DESC
      LIMIT 1
  `;
  return row?.up ?? true;
}

const cronJob = new CronJob("check-all", {
  title: "Check all sites",
  every: "1h",
  endpoint: checkAll,
});

export const MonitorDB = new SQLDatabase("monitor", {
  migrations: "./migrations",
});
Enter fullscreen mode Exit fullscreen mode

Here we have a check endpoint that checks and updates the status of a single site. Inside the API handler we are calling the get endpoint in the site service. We import the service through the encore/clients folder and then we can call the endpoint just like calling a regular function, with complete type-safety. But the cool part is that under the hood, those function calls gets converted to actual HTTP calls, resulting in traces and logs.

The doCheck function looks like the function we want to call whenever a new site is added, so let’s add our Pub/Sub subscriber.

import { Subscription } from "encore.dev/pubsub";

const _ = new Subscription(SiteAddedTopic, "check-site", {
  handler: doCheck,
});
Enter fullscreen mode Exit fullscreen mode

To do that we call the Subscription class, passing in the the Topic we want to subscribe to, the name of the subscription and an options object. In the options object we only need to specify the handler, the function that gets called for every new event. The doCheck function accepts a Site so there is no need to do anything more than this.

Local infrastructure

So, how does this work? Encore comes with automatic local infrastructure. When you start your Encore application locally using encore run, Encore automatically spinns upp all the infrastructure your app needs on your computer, including databases and Pub/Sub. So you don’t need to deal with writing YAML, or setting up Docker Compose, and other tools like LocalStack to run your environment.

Local Development Dashboard

Encore also comes with a built-in development dashboard. When you start your Encore app, the development dashboard is available on port localhost:9400. From here you can call your endpoints, a bit like Postman. Each call to your application results in a trace that you can inspect to see the API requests, database calls, and Pub/Sub messages.

Getting local tracing out of the box and being able to easily debug your application like this is another reason why Encore is a great choice when building event-driven apps.

The local development dashboard also includes a Service Catalog with automatic API documentation. Oh, and by the way. The pretty architectural digram from earlier in this post, that is Encore Flow which is also built into the Development Dashboard. It’s an always up-to-date representation of your system that changes in real-time as you develop. 😎

Deployment

So, how do we deploy this application? Well you can build your application using encore build, and you get it as a docker image you can deploy anywhere you want. You will need to supply a runtime configuration where you can specify how the application should connect to the infrastructure, like Pub/Sub and databases. If you don’t feel like managing this stuff manually, you can use Encore's Cloud Platform which automates setting up the needed infrastructure in your cloud account on AWS or GCP, and it comes with built-in CI/CD so you just need to push to deploy. The Platform also comes with monitoring, tracing, and automatic preview environments so you can test each pull request in a dedicated temporary environment.

Running the Uptime application yourself

If you want to play around with the Uptime application yourself you can easily do so by installing Encore and then run encore app create in your terminal. Select TypeScript and then the Uptime application in the list of starter templates. You will need to have Docker desktop installed as that is needed to create databases locally. When you have the code checked out then you can also take a look at the uptime-transition topic used by the slack service.

Wrapping up

  • ⭐️ Support Encore by giving the project a star on GitHub.

  • Check out Encore’s example repo where you can find a bunch of deployable applications.

  • If you have questions or want to share your work, join the developers hangout in Encore's community on Discord.

Other related posts

Top comments (0)