Using Publishers & Subscribers (Pub/Sub) is a powerful building block in a backend application. It lets you build systems that communicate by broadcasting events asynchronously, using an event-driven architecture. This is a great way to decouple services for better reliability and responsiveness.
π§ In this guide we'll compare the Pub/Sub approach to the more traditional API-only alternative. We'll also introduce Encore as a productive way of building an event-driven application without manually dealing with infrastructure.
β¨The benefits of Pub/Sub
- Pub/Sub can be used to improve app reliability by reducing the blast radius of faulty components and bottlenecks.
- Pub/Sub can be used to increase the speed of response to the user.
- Pub/Sub even helps reduce cognitive overhead for developers by inverting the dependencies between services.
π€ How does it differ from a normal API implementation?
πΌ Let's take a look at an example API in a user registration service.
The behavior we want to implement is that upon registration, we send a welcome email to the user and create a record of the signup in our analytics system. Now let's see how we could implement this only using APIs, compared to how a Pub/Sub implementation might look.
π An API-only approach
Using API calls between services, we might design a system which looks like this when the user registers:
- The
user
service starts a database transaction and records the user in its database. - The
user
service makes a call to theemail
service to send a welcome email. - The
email
service then calls an email provider to actually send the email. - Upon success, the
email
service replies to theuser
service that the request was processed. - The
user
service then calls theanalytics
service to record the signup. - The
analytics
service the writes to the data warehouse to record the information. - The
analytics
service then replies to theuser
service that the request was processed. - The
user
service commits the database transaction. - The
user
service then can reply to the user to say the registration was successful.
π Drawbacks of the API-only approach
Notice how we have to wait for everything to complete before we can reply to the user to tell then we've registered them.
- This means that if our email provider takes 3 seconds to send the email, we've now taken 3 seconds to respond to the user, when in reality once the user was written to the database, we could have responded to the user instantly at that point to confirm the registration.
- Another downside to this approach is if our data warehouse is currently broken and reporting errors, our system will also report errors whenever anybody tries to signup! Given analytics is purely internal and doesn't impact users, why should the analytics system being down impact user signup?
π Comparison: A Pub/Sub approach
A preferable solution would be if we could decouple the behaviour of emailing the user and recording our analytics, such that the user service only has to record the user in its own database and let the user know they are registered - without worrying about the downstream impacts.
Thankfully, this is exactly what Pub/Sub topics allow us to do.
In this example, this is what happens when a user registers:
- The
user
service starts a database transaction and records the user in its database. - Publish a signup event to the
signups
topic. - Commit the transaction and reply to the user to say the registration was successful.
At this point the user is free to continue interacting with the application and we've isolated the registration behaviour
from the rest of the application.
In parallel, the email
and analytics
services will receive the signup event from the signups
topic and will then perform their respective tasks. If either service returns an error, the event will automatically be backed off and retried
until the service is able to process the event successfully, or reaches the maximum number of attempts and is placed
into the deadletter queue (DLQ).
π The benefits of this version:
- The processing time of the two other services did not impact the end user, and in fact the
user
service is not even aware of theemail
andanalytics
services. - New systems that need to know about new users signing up can be added to the application, without the need to change the
user
service or impacting its performance.
π¨ Building an event-driven application with Go and Pub/Sub
Now that we've shown the benefits of using Pub/Sub, it's time to build our own event-driven Go backend! We'll be using Pub/Sub of course, and we'll use Encoreβ a backend development platformβto automate infrastructure provisioning for us. It works in both local environments and in your own cloud in AWS/GCP.
π Let's take a look at how you can build a fully Type-Safe event-driven backend in Go, implementing an Uptime Monitoring system as an example.
π What we'll do:
- Install Encore
- Create your app from a starter branch
- Run locally to try the frontend
- Build the backend
- Deploy to Encore's free development cloud
Don't feel like building right now? Watch a video instead:
Building an event-driven backend application in Go
β¨ Final result:
Demo app: Try the app
When we're done, we'll have a backend with this type-safe event-driven architecture:
In this diagram (automatically generated by Encore) you can see individual services as white boxes, and Pub/Sub topics as black boxes.
π Let's go!
To make it easier to follow along, we've laid out a trail of croissants to guide your way.
Whenever you see a π₯ it means there's something for you to do!
π½ Install Encore
Install the Encore CLI to run your local environment:
-
macOS:
brew install encoredev/tap/encore
-
Linux:
curl -L https://encore.dev/install.sh | bash
-
Windows:
iwr https://encore.dev/install.ps1 | iex
Create your Encore application
π₯ Create your new app from this starter branch with a ready-to-go frontend to use:
encore app create uptime --example=github.com/encoredev/example-app-uptime/tree/starting-point
π» Run your app locally
π₯ Check that your frontend works by running your app locally.
cd uptime
encore run
You should see this:
This means Encore has started your local environment and created local infrastructure for Pub/Sub and Databases.
Then visit http://localhost:4000/frontend/ to see the frontend.
The functionality won't work yet, since we haven't yet built the backend yet.
β Let's do that now!
π¨ Create the monitor service
Let's start by creating the functionality to check if a website is currently up or down.
Later we'll store this result in a database so we can detect when the status changes and
send alerts.
π₯ Create a service named monitor
containing a file named ping.go
. With Encore, you do this by creating a Go package:
mkdir monitor
touch monitor/ping.go
π₯ Add an API endpoint named Ping
that takes a URL as input and returns a response indicating whether the site is up or down.
With Encore you do this by creating a function and adding the //encore:api
annotation to it.
Paste this into the ping.go
file:
package monitor
import (
"context"
"net/http"
"strings"
)
// PingResponse is the response from the Ping endpoint.
type PingResponse struct {
Up bool `json:"up"`
}
// Ping pings a specific site and determines whether it's up or down right now.
//
//encore:api public path=/ping/*url
func Ping(ctx context.Context, url string) (*PingResponse, error) {
// If the url does not start with "http:" or "https:", default to "https:".
if !strings.HasPrefix(url, "http:") && !strings.HasPrefix(url, "https:") {
url = "https://" + url
}
// Make an HTTP request to check if it's up.
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return &PingResponse{Up: false}, nil
}
resp.Body.Close()
// 2xx and 3xx status codes are considered up
up := resp.StatusCode < 400
return &PingResponse{Up: up}, nil
}
π₯ Let's try it! Make sure you have Docker installed and running, then run encore run
in your terminal and you should see the service start up.
π₯ Now open up the Local Dev Dashboard running at http://localhost:9400 and try calling
the monitor.Ping
endpoint, passing in google.com
as the URL.
If you prefer to use the terminal instead run curl http://localhost:4000/ping/google.com
in a new terminal instead. Either way, you should see the response:
{"up": true}
You can also try with httpstat.us/400
and some-non-existing-url.com
and it should respond with {"up": false}
.
(It's always a good idea to test the negative case as well.)
π§ͺ Add a test
π₯ Let's write an automated test so we don't break this endpoint over time. Create the file monitor/ping_test.go
and add this code:
package monitor
import (
"context"
"testing"
)
func TestPing(t *testing.T) {
ctx := context.Background()
tests := []struct {
URL string
Up bool
}{
{"encore.dev", true},
{"google.com", true},
// Test both with and without "https://"
{"httpbin.org/status/200", true},
{"https://httpbin.org/status/200", true},
// 4xx and 5xx should considered down.
{"httpbin.org/status/400", false},
{"https://httpbin.org/status/500", false},
// Invalid URLs should be considered down.
{"invalid://scheme", false},
}
for _, test := range tests {
resp, err := Ping(ctx, test.URL)
if err != nil {
t.Errorf("url %s: unexpected error: %v", test.URL, err)
} else if resp.Up != test.Up {
t.Errorf("url %s: got up=%v, want %v", test.URL, resp.Up, test.Up)
}
}
}
π₯ Run encore test ./...
to check that it all works as expected. You should see something like this:
$ encore test ./...
9:38AM INF starting request endpoint=Ping service=monitor test=TestPing
9:38AM INF request completed code=ok duration=71.861792 endpoint=Ping http_code=200 service=monitor test=TestPing
[... lots more lines ...]
PASS
ok encore.app/monitor 1.660
π It works. Well done!
π¨ Create site service
Next, we want to keep track of a list of websites to monitor.
Since most of these APIs will be simple CRUD (Create/Read/Update/Delete) endpoints, let's build this service using GORM, an ORM library that makes building CRUD endpoints really simple.
π₯ Create a new service named site
with a SQL database. To do so, create a new directory site
in the application root with migrations
folder inside that folder:
mkdir site
mkdir site/migrations
π₯ Add a database migration file inside that folder, named 1_create_tables.up.sql
. The file name is important (it must look something like 1_<name>.up.sql
) as Encore uses the file name to automatically run migrations.
Add the following contents:
CREATE TABLE sites (
id BIGSERIAL PRIMARY KEY,
url TEXT NOT NULL
);
π₯ Next, install the GORM library and PostgreSQL driver:
go get -u gorm.io/gorm gorm.io/driver/postgres
Now let's create the site
service itself. To do this we'll use Encore's support for dependency injection to inject the GORM database connection.
π₯ Create site/service.go
and add this code:
package site
import (
"encore.dev/storage/sqldb"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
//encore:service
type Service struct {
db *gorm.DB
}
var siteDB = sqldb.Named("site").Stdlib()
// initService initializes the site service.
// It is automatically called by Encore on service startup.
func initService() (*Service, error) {
db, err := gorm.Open(postgres.New(postgres.Config{
Conn: siteDB,
}))
if err != nil {
return nil, err
}
return &Service{db: db}, nil
}
π₯ With that, we're now ready to create our CRUD endpoints.
Create the following files:
site/get.go
:
package site
import "context"
// Site describes a monitored site.
type Site struct {
// ID is a unique ID for the site.
ID int `json:"id"`
// URL is the site's URL.
URL string `json:"url"`
}
// Get gets a site by id.
//
//encore:api public method=GET path=/site/:siteID
func (s *Service) Get(ctx context.Context, siteID int) (*Site, error) {
var site Site
if err := s.db.Where("id = $1", siteID).First(&site).Error; err != nil {
return nil, err
}
return &site, nil
}
site/add.go
:
package site
import "context"
// AddParams are the parameters for adding a site to be monitored.
type AddParams struct {
// URL is the URL of the site. If it doesn't contain a scheme
// (like "http:" or "https:") it defaults to "https:".
URL string `json:"url"`
}
// Add adds a new site to the list of monitored websites.
//
//encore:api public method=POST path=/site
func (s *Service) Add(ctx context.Context, p *AddParams) (*Site, error) {
site := &Site{URL: p.URL}
if err := s.db.Create(site).Error; err != nil {
return nil, err
}
return site, nil
}
site/list.go
:
package site
import "context"
type ListResponse struct {
// Sites is the list of monitored sites.
Sites []*Site `json:"sites"`
}
// List lists the monitored websites.
//
//encore:api public method=GET path=/site
func (s *Service) List(ctx context.Context) (*ListResponse, error) {
var sites []*Site
if err := s.db.Find(&sites).Error; err != nil {
return nil, err
}
return &ListResponse{Sites: sites}, nil
}
site/delete.go
:
package site
import "context"
// Delete deletes a site by id.
//
//encore:api public method=DELETE path=/site/:siteID
func (s *Service) Delete(ctx context.Context, siteID int) error {
return s.db.Delete(&Site{ID: siteID}).Error
}
π₯ Restart encore run
to cause the site
database to be created, and then call the site.Add
endpoint:
curl -X POST 'http://localhost:4000/site' -d '{"url": "https://encore.dev"}'
{
"id": 1,
"url": "https://encore.dev"
}
π Record uptime checks
In order to notify when a website goes down or comes back up, we need to track the previous state it was in.
To do so, let's add a database to the monitor
service as well.
π₯ Create the directory monitor/migrations
and the file monitor/migrations/1_create_tables.up.sql
:
CREATE TABLE checks (
id BIGSERIAL PRIMARY KEY,
site_id BIGINT NOT NULL,
up BOOLEAN NOT NULL,
checked_at TIMESTAMP WITH TIME ZONE NOT NULL
);
We'll insert a database row every time we check if a site is up.
π₯ Add a new endpoint Check
to the monitor
service, that takes in a Site ID, pings the site, and inserts a database row in the checks
table.
For this service we'll use Encore's sqldb
package instead of GORM (in order to showcase both approaches).
Add this to monitor/check.go
:
package monitor
import (
"context"
"encore.app/site"
"encore.dev/storage/sqldb"
)
// Check checks a single site.
//
//encore:api public method=POST path=/check/:siteID
func Check(ctx context.Context, siteID int) error {
site, err := site.Get(ctx, siteID)
if err != nil {
return err
}
result, err := Ping(ctx, site.URL)
if err != nil {
return err
}
_, err = sqldb.Exec(ctx, `
INSERT INTO checks (site_id, up, checked_at)
VALUES ($1, $2, NOW())
`, site.ID, result.Up)
return err
}
π₯ Restart encore run
to cause the monitor
database to be created, and then call the new monitor.Check
endpoint:
curl -X POST 'http://localhost:4000/check/1'
π₯ Inspect the database to make sure everything worked:
encore db shell monitor
You should see this:
psql (14.4, server 14.2)
Type "help" for help.
monitor=> SELECT * FROM checks;
id | site_id | up | checked_at
----+---------+----+-------------------------------
1 | 1 | t | 2022-10-21 09:58:30.674265+00
If that's what you see, everything's working great!π
β° Add a cron job to check all sites
We now want to regularly check all the tracked sites so we can
immediately respond in case any of them go down.
We'll create a new CheckAll
API endpoint in the monitor
service that will list all the tracked sites and check all of them.
π₯ Let's extract some of the functionality we wrote for the
Check
endpoint into a separate function.
In monitor/check.go
it should look like so:
// Check checks a single site.
//
//encore:api public method=POST path=/check/:siteID
func Check(ctx context.Context, siteID int) error {
site, err := site.Get(ctx, siteID)
if err != nil {
return err
}
return check(ctx, site)
}
func check(ctx context.Context, site *site.Site) error {
result, err := Ping(ctx, site.URL)
if err != nil {
return err
}
_, err = sqldb.Exec(ctx, `
INSERT INTO checks (site_id, up, checked_at)
VALUES ($1, $2, NOW())
`, site.ID, result.Up)
return err
}
Now we're ready to create our new CheckAll
endpoint.
π₯ Create the new CheckAll
endpoint inside monitor/check.go
:
import "golang.org/x/sync/errgroup"
// CheckAll checks all sites.
//
//encore:api public method=POST path=/checkall
func CheckAll(ctx context.Context) error {
// Get all the tracked sites.
resp, err := site.List(ctx)
if err != nil {
return err
}
// Check up to 8 sites concurrently.
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(8)
for _, site := range resp.Sites {
site := site // capture for closure
g.Go(func() error {
return check(ctx, site)
})
}
return g.Wait()
}
This uses an errgroup to check up to 8 sites concurrently, aborting early if we encounter any error. (Note that a website being down is not treated as an error.)
π₯ Run go get golang.org/x/sync/errgroup
to install that dependency.
π₯ Now that we have a CheckAll
endpoint, define a cron job to automatically call it every 5 minutes.
Add this to monitor/check.go
:
import "encore.dev/cron"
// Check all tracked sites every 5 minutes.
var _ = cron.NewJob("check-all", cron.JobConfig{
Title: "Check all sites",
Endpoint: CheckAll,
Every: 5 * cron.Minute,
})
Note: For ease of development, cron jobs are not triggered when running the application locally, but work when deploying the application to your cloud.
π Deploy to Encore's free development cloud
To try out your uptime monitor for real, let's deploy it to Encore's development cloud.
Encore comes with built-in CI/CD, and the deployment process is as simple as a git push encore
.
(You can also integrate with GitHub to activate per Pull Request Preview Environments, learn more in the CI/CD docs.)
π₯ Deploy your app by running:
git add -A .
git commit -m 'Initial commit'
git push encore
Encore will now build and test your app, provision the needed infrastructure, and deploy your application to the cloud.
After triggering the deployment, you will see a URL where you can view its progress in Encore's Cloud Dashboard. It will look something like: https://app.encore.dev/$APP_ID/deploys/...
From there you can also see metrics, traces, link your app to a GitHub repo to get automatic deploys on new commits, and connect your own AWS or GCP account to use for production deployment.
π₯ When the deploy has finished, you can try out your uptime monitor by going to:
https://staging-$APP_ID.encr.app/frontend
.
You now have an Uptime Monitor running in the cloud, well done!β¨
Publish Pub/Sub events when a site goes down
An uptime monitoring system isn't very useful if it doesn't
actually notify you when a site goes down.
To do so let's add a Pub/Sub topic
on which we'll publish a message every time a site transitions from being up to being down, or vice versa.
π¬ Type-Safe Infrastructure: Practical example
Normally, Pub/Sub mechanisms are blind to the data structures of the messages they handle. This is a common source of hard-to-catch errors that can be a nightmare to debug.
However, thanks to Encore's Infrastructure SDK, you get fully type-safe infrastructure! You can now achieve end-to-end type-safety from the moment of publishing a message, right through to delivery. This not only eliminates those annoying hard-to-debug errors but also translates to major time savings for us developers.
β Now let's actually implement it!π
π₯ Define the topic using Encore's Pub/Sub package in a new file, monitor/alerts.go
:
package monitor
import "encore.dev/pubsub"
// TransitionEvent describes a transition of a monitored site
// from up->down or from down->up.
type TransitionEvent struct {
// Site is the monitored site in question.
Site *site.Site `json:"site"`
// Up specifies whether the site is now up or down (the new value).
Up bool `json:"up"`
}
// TransitionTopic is a pubsub topic with transition events for when a monitored site
// transitions from up->down or from down->up.
var TransitionTopic = pubsub.NewTopic[*TransitionEvent]("uptime-transition", pubsub.TopicConfig{
DeliveryGuarantee: pubsub.AtLeastOnce,
})
Now let's publish a message on the TransitionTopic
if a site's up/down state differs from the previous measurement.
π₯ Create a getPreviousMeasurement
function in alerts.go
to report the last up/down state:
import "encore.dev/storage/sqldb"
// getPreviousMeasurement reports whether the given site was
// up or down in the previous measurement.
func getPreviousMeasurement(ctx context.Context, siteID int) (up bool, err error) {
err = sqldb.QueryRow(ctx, `
SELECT up FROM checks
WHERE site_id = $1
ORDER BY checked_at DESC
LIMIT 1
`, siteID).Scan(&up)
if errors.Is(err, sqldb.ErrNoRows) {
// There was no previous ping; treat this as if the site was up before
return true, nil
} else if err != nil {
return false, err
}
return up, nil
}
π₯ Now add a function in alerts.go
to conditionally publish a message if the up/down state differs:
import "encore.app/site"
func publishOnTransition(ctx context.Context, site *site.Site, isUp bool) error {
wasUp, err := getPreviousMeasurement(ctx, site.ID)
if err != nil {
return err
}
if isUp == wasUp {
// Nothing to do
return nil
}
_, err = TransitionTopic.Publish(ctx, &TransitionEvent{
Site: site,
Up: isUp,
})
return err
}
π₯ Finally modify the check
function in check.go
to call the publishOnTransition
function:
func check(ctx context.Context, site *site.Site) error {
result, err := Ping(ctx, site.URL)
if err != nil {
return err
}
// Publish a Pub/Sub message if the site transitions
// from up->down or from down->up.
if err := publishOnTransition(ctx, site, result.Up); err != nil {
return err
}
_, err = sqldb.Exec(ctx, `
INSERT INTO checks (site_id, up, checked_at)
VALUES ($1, $2, NOW())
`, site.ID, result.Up)
return err
}
Now the monitoring system will publish messages on the TransitionTopic
whenever a monitored site transitions from up->down or from down->up.
However, it doesn't know or care who actually listens to these messages. The truth is right now nobody does. So let's fix that by adding a Pub/Sub subscriber that posts these events to Slack.
Send Slack notifications when a site goes down
π₯ Start by creating a Slack service slack/slack.go
containing the following:
package slack
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
)
type NotifyParams struct {
// Text is the Slack message text to send.
Text string `json:"text"`
}
// Notify sends a Slack message to a pre-configured channel using a
// Slack Incoming Webhook (see https://api.slack.com/messaging/webhooks).
//
//encore:api private
func Notify(ctx context.Context, p *NotifyParams) error {
reqBody, err := json.Marshal(p)
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, "POST", secrets.SlackWebhookURL, bytes.NewReader(reqBody))
if err != nil {
return err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("notify slack: %s: %s", resp.Status, body)
}
return nil
}
var secrets struct {
// SlackWebhookURL defines the Slack webhook URL to send
// uptime notifications to.
SlackWebhookURL string
}
π₯ Now go to a Slack community of your choice (where you have permission to create a new Incoming Webhook). If you don't have any, join the Encore Slack and ask in #help
and we're happy to help out.
π₯ Once you have the Webhook URL, save it as a secret using Encore's built-in secrets manager:
encore secret set --local,dev,prod SlackWebhookURL
π₯ Test the slack.Notify
endpoint by calling it via cURL:
curl 'http://localhost:4000/slack.Notify' -d '{"Text": "Testing Slack webhook"}'
You should see the Testing Slack webhook message appear in the Slack channel you designated for the webhook.
π₯ It's now time to add a Pub/Sub subscriber to automatically notify Slack when a monitored site goes up or down. Add the following to slack/slack.go
:
import (
"encore.dev/pubsub"
"encore.app/monitor"
)
var _ = pubsub.NewSubscription(monitor.TransitionTopic, "slack-notification", pubsub.SubscriptionConfig[*monitor.TransitionEvent]{
Handler: func(ctx context.Context, event *monitor.TransitionEvent) error {
// Compose our message.
msg := fmt.Sprintf("*%s is down!*", event.Site.URL)
if event.Up {
msg = fmt.Sprintf("*%s is back up.*", event.Site.URL)
}
// Send the Slack notification.
return Notify(ctx, &NotifyParams{Text: msg})
},
})
π Deploy your finished Uptime Monitor
You're now ready to deploy your finished Uptime Monitor, complete with a Slack integration!
π₯ As before, deploying your app to the cloud is as simple as running:
git add -A .
git commit -m 'Add slack integration'
git push encore
You now have a fully featured, production-ready, Uptime Monitoring system running in the cloud. Well done! β¨
π€― Wrapping up: All of this came in at just over 300 lines of code
You've now built a fully functioning uptime monitoring system, accomplishing a remarkable amount with very little code:
- You've built three different services (
site
,monitor
, andslack
) - You've added two databases (to the
site
andmonitor
services) for tracking monitored sites and the monitoring results - You've added a cron job for automatically checking the sites every 5 minutes
- You've set up a fully type-safe Pub/Sub implementation to decouple the monitoring system from the Slack notifications
- You've added a Slack integration, using secrets to securely store the webhook URL, listening to a Pub/Sub subscription for up/down transition events
All of this in just a bit over 300 lines of code!π€―
π Great job - you're done!
Keep building with these Open Source App Templates.π
If you have questions or want to share your work, join the developers hangout in Encore's community Slack.π
Top comments (2)
Love the code and learned a lot with it. I have sew some showcases using encore. But not sure to develop something and getting your code blocked to only use with Encore. Is a big risk. Go is made to be clean, basic and with no dependencies.
If I have an Encore project and later need to go with another structure, how difficult is to migrate to another cloud than encore cloud?
Hey, that's a very reasonable perspective.
We've designed it to be easy to stop using Encore.
Basically 99% of the code is normal Go, and the rest is all Open Source as part of Encore's Backend SDK.
If you are using Encore's platform to deploy your app, it's still using your own cloud in AWS/GCP for production environments, so all the infra is always in your control.
If you want to stop relying on Encore entirely, you can eject your app using
encore eject docker
to get a standalone Docker image for your own deployment process.You can see more details here: encore.dev/docs/how-to/migrate-away