DEV Community

Eric Goldman for Sequin

Posted on • Updated on • Originally published at blog.sequin.io

Handling Twilio Events Part 1: PG Notify

Sequin gives developers a secondary source of truth for their data. Instead of making GET requests to fetch data or keep your local copy of data in sync, you can rely on Sequin to handle that for you in real-time.

We recently added support for Twilio. With our Twilio sync, it takes just a couple clicks to have all your messages, phone calls, and call recordings synced to a twilio schema in your database as they are sent and received.

While this second source of truth is powerful, Twilio is the most event-oriented platform we've integrated with. Often, developers want to handle inbound text messages or phone calls right away, like by having their app respond to a text. With Sequin, new text messages will be synced straight to your database. This gives you a reliable source to query from. But in contrast to a webhook, by default your own code will not be invoked when you receive a new text.

We're exploring different ways to attach events to our sync process. The simplest way is using PG Notify. In this post, we'll explore how this might work.

To do so, we'll build an SMS service where a group of people can manage a list of tasks over text message:

  1. Texting in a string of text adds it to the global list.
  2. Texting the number 1 or ls will send back the current list. Each text entry is numbered, eg:

    1. clean rackets
    2. order more pickle balls
    3. make court reservations for saturday
    
  3. Texting del [num] will delete the entry listed at [num]

For simplicity, we're going to have just one global list in this app. But a real-world implementation would allow users to invite certain members (phone numbers) to participate in their list.

This post showcases both the Postgres code for triggering notifications as well as Elixir code to handle those notifications. If you don't know Elixir, context is given around code blocks to help you get the gist.

We won't cover every line of code or command needed to get a functioning version of this app. However, if you're following along and want help, feel free to reach out if you get stuck!

Setup the sync

We're using Twilio to send and receive text messages, which offers a free trial. After signing up, spin up a sync to your database with Sequin by supplying your API key:

setting up Twilio on Sequin

Sequin's Twilio sync polls for changes to Twilio every couple seconds, meaning your database is always up-to-date.

Instead of using Twilio's webhooks to subscribe to new texts, we'll use PG Notify. However, Twilio needs to call some endpoint when new texts come in, otherwise it won't handle them. It's a bit of a hack, but on the page where you configure the webhooks for your Twilio Phone Number, you can just have Twilio make a GET request to our homepage:

GET https://sequin.io
Enter fullscreen mode Exit fullscreen mode

set Twilio to make a GET request to sequin.io when receiving a text message

PG Notify

The code in this section is largely adapted from the wonderful article How to use LISTEN and NOTIFY PostgreSQL commands in Elixir by Kamil Lelonek.

Sequin will sync changes in Twilio's API right to your database. In our case, text messages will be synced to the message table inside the schema twilio.

But our requirements for this app are event-oriented. We want to respond to text messages right away, as they come in.

We could have a worker that polls the message table for changes. Perhaps every tick, it polls the message table with an incrementing inserted_at cursor to see if there's anything new to handle. But for fun, and for illustrative purposes, we'll use PG Notify in this project.

Each Postgres notification includes a name and a payload. The name is the event name. You can send a notification using the pg_notify() function like this:

select pg_notify('new_message', '{"body": "clean rackets", "from": "+12135550000"}');
Enter fullscreen mode Exit fullscreen mode

For our application, we want to publish new notifications based on inserts into the twilio.message table. When Sequin sees a new message at Twilio's /Messages.json endpoint, it will insert that message into our database. We can use a Postgres trigger that executes a function that creates a notification whenever this happens.

We'll ignore updates to messages for now. In Twilio, the only time we'll see updates for messages is when the delivery status has changed (from eg sent to delivered). (In a more robust production application, we might want to monitor these events for delivery failures.)

Notify function

Here's what the function that creates the notification will look like:

create or replace function notify_new_message()
  returns trigger
  as $$
begin
  perform
    pg_notify('new_message', json_build_object('operation', tg_op, 'message', row_to_json(new))::text);
  return new;
end;
$$
language plpgsql;
Enter fullscreen mode Exit fullscreen mode

Everything around the BEGIN/END block is standard syntax to declare a Postgres function. Our function is called notify_new_message(). The name of the notification is new_message. In json_build_object(), we define a JSON object with two properties: operation and message. operation is set to the special variable TG_OP, which in our case will always be 'INSERT'. Then message will contain the full message row that was just inserted (the special variable NEW inside a trigger function body).

Notifications are cast to text. But here's what they will look like as parsed JSON:

{
  "operation": "INSERT",
  "record": {
    "account_id": "ACc96f9",
    "api_version": "2010-04-01",
    "body": "pickup jersey from cleaners ",
    "date_created": "2021-09-27T15:29:34",
    "date_sent": "2021-09-27T15:29:35",
    "date_updated": "2021-09-27T15:29:35",
    "deleted": false,
    "direction": "inbound",
    "error_code": null,
    "error_message": null,
    "from": "+12135551111",
    "id": "SMbdac6",
    "messaging_service_id": null,
    "num_media": 0,
    "num_segments": 1,
    "price": null,
    "price_unit": "USD",
    "status": "received",
    "to": "+12135550000",
    "uri": "/2010-04-01/Accounts/ACc96f9d47739a/Messages/SMbdac623.json"
  }
}
Enter fullscreen mode Exit fullscreen mode

The after insert trigger

Now, we just need to invoke our notify function after messages are created in the database:

create trigger new_message
after insert
on twilio.message
for each row
execute procedure notify_new_message()
Enter fullscreen mode Exit fullscreen mode

Our notify function is written and it's connected to an after insert trigger on message. Now it's time to subscribe to this notification from Elixir.

The Elixir subscription

If you want the complete starting point for a brand new app that uses PG Notify, I recommend you check out this article.

Touching on the highlights: First, you need to ensure that your app subscribes to the new_message event on boot. One way to do this is to create a new GenServer that invokes Postgrex.Notifications.listen/2 inside its init.

Below is an Elixir GenServer declaration. A GenServer in Elixir is a process that can send and receive messages. In our case, our GenServer will subscribe to notifications from Postgres. We'll first see how to subscribe to these notifications, then we'll see how to handle them.

If you're not familiar with Elixir, gloss over the methods child_spec/1 and start_link/1 – these are standard for a GenServer:

defmodule SmsBot.Listener do
  use GenServer

  def child_spec(opts) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [opts]}
    }
  end

  def start_link(opts \\ []),
    do: GenServer.start_link(__MODULE__, opts)

  def init(opts) do
    with {:ok, _pid, _ref} <- setup_listener("new_message") do
      {:ok, opts}
    else
      error -> {:stop, error}
    end
  end

  defp setup_listener(event_name) do
    with {:ok, pid} <- Postgrex.Notifications.start_link(SmsBot.Repo.config()),
         {:ok, ref} <- Postgrex.Notifications.listen(pid, event_name) do
      {:ok, pid, ref}
    end
  end
Enter fullscreen mode Exit fullscreen mode

The important part is inside init/1, which is invoked when this process is booted. That calls setup_listener/1, which tells Postgres (via the Postgrex library), that our process would like to receive notifications for all "new_message" events.

To get the connection details for Postgrex.Notifications.start_link/1, we're using the config/0 function on an Ecto repo (common in Phoenix apps).

Now, just add this new GenServer to your child spec in your Application:

defp children do
  [
    SmsBot.Listener
  ]
end
Enter fullscreen mode Exit fullscreen mode

If we were to boot the app, we'd now have a GenServer listening for notifications. But we still need to handle those notifications.

Postgrex.Notifications will send notifications to our GenServer as regular process messages. That means we can handle them with handle_info/2:

defmodule SmsBox.Listener do

  @impl true
  def handle_info({:notification, _pid, _ref, "new_message", payload}, _state) do
    with {:ok, data} <- Jason.decode(payload, keys: :atoms) do
      data
      |> inspect()
      |> Logger.info()

      {:noreply, :event_handled}
    else
      error -> {:stop, error, []}
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

The event name is "new_message" and the payload is the stringified JSON object from our notify_new_message() function.

If we booted our server and sent an SMS in, we'd see an insert notification like the one above printed to the console.

With notifications flowing from inserts through Elixir, we're almost ready to write our app's handler code. We just need to add one more construct to our data model.

Creating list_item

The message table will contain an append-only list of all messages flowing through Twilio. For managing lists in our app, we'll want to group messages in a separate table. In the short run, this will allow users to remove messages from the list. In the long run, this is the construct we'd use to isolate lists between groups of phone numbers. We'd add a list table then group list_items under that table.

Here's the declaration for the list_item table:

create table list_item (
  id serial not null,
  message_id text references message (id)
);
Enter fullscreen mode Exit fullscreen mode

Handling inbound SMS messages

We'll use this handler to route the incoming SMS notification to one of three functions, depending on the command. We'll rewrite the body of handle_info/2 and extract the logic out to a new function handle_message/1:

defmodule SmsBot.Listener do

  @impl true
  def handle_info({:notification, _pid, _ref, "new_message", payload}, _state) do
    with {:ok, data} <- Jason.decode(payload, keys: :atoms) do
      handle_message(data[:message])

      {:noreply, :event_handled}
    else
      error -> {:stop, error, []}
    end
  end

  defp handle_message(%{ direction: "inbound" }) do
    message = data[:message]
      message_body =
          data[:message][:body]
                |> String.trim_leading()
                |> String.trim_trailing()

      case message_body do
        body when body in ["1", "ls"] ->
          hande_list(message)

          "del " <> num ->
            num = String.to_integer(num)
              handle_delete(message, num)

          _body ->
              handle_add(message)
      end
  end

  defp handle_message(_outbound_message) do
    :ok
  end
end
Enter fullscreen mode Exit fullscreen mode

Inside handle_message/1, we first normalize the inbound message by trimming trailing and leading whitespace. Then we use a case statement to direct the message to the appropriate sub-handler:

  • If the message was a 1 or ls, then we know the user wants to list all list items so far.
  • If the message matches del [num], then they want to delete a list item.
  • Otherwise, we'll create a new list item for the message.

Note: A production app would want to do way more inbound text normalization.

Note that we only want to handle messages that have direction="inbound". Twilio will also record all of our outbound messages (replies to the users), which will in turn trigger PG Notifications. We just ignore those.

Here's what each of those handler functions might look like. Note that this assumes we've setup an Ecto repo for list items, called ListItem.

First, we define a helper function that lists all list items in the database, with a corresponding index. These indexes will be used when displaying the list, to both number the list and to allow the user to delete items:

defp list_messages_with_indexes do
  ListItem
  |> SmsBot.Repo.all()
  |> SmsBot.Repo.preload(:message)
  |> Enum.map(fn item -> item.message.body end)
  |> Enum.with_index(1)
end
Enter fullscreen mode Exit fullscreen mode

The result of calling this function will be a list that looks like this:

[
  {1, "clean rackets"},
  {2, "order more pickle balls"},
  {3, "make court reservations for saturday"}
]
Enter fullscreen mode Exit fullscreen mode

We'll use this helper function in a couple places in our handler functions:

defmodule SmsBot.Listener do

  # ...

  defp handle_list(message) do
    messages =
      list_messages_with_indexes()
      |> Enum.map(fn {idx, msg} -> "#{idx}. #{msg}" end)
      |> Enum.join("\n")

    outgoing_body = "Here's the list:\n#{messages}"

    send_sms(message.from, outgoing_body)
  end

  defp handle_delete(message, del_idx) do
        to_delete =
          ListItem
      |> SmsBot.Repo.all()
      |> Enum.with_index(1)
          |> Enum.find(fn {idx, msg} -> idx == del_idx end)

      if to_delete do
          with {:ok, _li} <- SmsBot.Repo.delete(to_delete) do
            send_sms(message.from, "Item deleted")
          end
      else
            send_sms(message.from, "Error: No entry for #{del_idx}")
      end
  end

  defp handle_add(message) do
    changeset = Ecto.Changeset.cast(%ListItem{}, %{ message_id: message.id })
    with {:ok, _li} <- SmsBot.Repo.create(changeset) do
        send_sms(message.from, "Item added")
        handle_list(message)
    else
      err ->
        Logger.error(err)
        send_sms(message.from, "Something went wrong.")
    end
  end
end
Enter fullscreen mode Exit fullscreen mode

Digging in to how each of these functions work is beyond the scope of this tutorial. But touching on the highlights:

  • We assume the implementation of send_sms/2, which makes an API call to send a message via Twilio. This expects a phone number as the first argument and a message body as the second. Note that all messages are sent to message.from, or the sender of the message we are handling.
  • Our ordering/indexing of list items is simple. We just generate the indexes on the fly, relying on insertion order to keep the list stable.
  • The message table is a read-only table that reflects our Twilio data, kept in-sync by Sequin. By creating the list_item table, we are maintaining our domain-specific data (list items) elsewhere. That gives us the freedom to e.g. delete list items, while preserving the full record of messages sent and received by Twilio.

Wrapping up

PG Notify is a great way to trigger events around your sync process. It's well-supported among programming languages, quick to setup, and doesn't require any other tools outside of Postgres (like eg Kafka).

Using Sequin as the backbone of our Twilio integration offers us a foundation that is both easy to build on and scalable. Our app – like many apps built with Twilio – relies on a historical record of all text messages received. Retrieving that text message history via Twilio every time we needed it would be inefficient. Having the data readily available in a database alleviates the need for polling or handling webhooks.

However, using PG Notify here has some big limitations:

  • If your app is down (no listeners), events are not buffered anywhere. After broadcast, events disappear immediately. This means even if you're down for just a few seconds, you could miss text messages that you need to respond to.
  • There are no retries. If your notification handler in your code has a bug, or hits a temporary error (Twilio is unable to process your outgoing text), the event will be lost.

All this means that PG Notify is best used in production in collaboration with some type of database polling process. For example, in this application, we could store a cursor somewhere, last_message_processed. As we process messages, we increment the cursor. If our app restarts, we use the cursor to "catch up" on any messages we may have missed before subscribing to the notify stream for new ones.

At Sequin, we're brainstorming other ways to trigger events around your sync. If there's anything you'd love to see here, let us know! Otherwise, stay tuned.

Top comments (0)