DEV Community

Ivor
Ivor

Posted on

How to unsubscribe from all topics in Phoenix.PubSub

In this post I want to share a quick tip that I found very useful when relying on Phoenix.PubSub to reflect server side changes on the UI.

Phoenix.LiveView makes it wonderfully easy to build rich UIs without needing to use a big client-side javascript framework. The server does live tracking of variables, determines the diff based on those changes and pushes those to the client where the changes are applied.

This means that it is easy to build a page where live updates can be monitored without needing to reload a page. An example could be a dashboard to monitor the state of various servers on a network. They can all update the app via an API. Each time an update comes in we can broadcast the update to a topic for that server so that all the users viewing those servers will get the update, for example:

MyApp.PubSub
|> Phoenix.PubSub.broadcast("server-#{server_id}", %{cpu: update.cpu, mem: update.mem})
Enter fullscreen mode Exit fullscreen mode

If we have a liveview that shows the state of the server with various stats about it, we can subscribe to the topic to update the stats as they come in.

Phoenix.PubSub.subscribe(MyApp.PubSub, "server-#{server.id}")
Enter fullscreen mode Exit fullscreen mode

We can do this in the mount/3 callback of the liveview, but mount is not called when you patch so instead we can do the subscribe in the handle_params/3 callback which is called every time the browser location is changed.

To make for a snappy experience, we can add a dropdown on the page to switch to a server with a different id. By using patch instead of redirect we can swop out the server we're monitoring without having to load a new page and without the need for a new liveview process.

The problem is that if you subscribe to the new server in the handle_params/3 callback you are now subscribed to two servers and will get updates for each of them.
It would be great if you can unsubscribe from the previous server. In this simple case you can probably unsubscribe from the previous one by referring to the server variable that is currently in the socket assigns. But, you may not always have that luxury. And in some cases you might have also subscribed a number of other related topics, and some of them maybe conditionally depending on the attributes of the server. Maybe a database server will get server updates but also database specific updates etc.

A function that allows you to see all the topics that the current process is subscribed to will be very handy at this point. I could not find such a function in Phoenix.PubSub, but, looking at the source for the subscribe/3 and unsubscribe/2functions we can see how to achieve this result.

Phoenix.PubSub uses the Registry module under the hood. The name of the PubSub instance that you started in your application's Application.start/2 callback is the registry.

  ...
  {Phoenix.PubSub, name: MyApp.PubSub}, # this is the registry
  ...
Enter fullscreen mode Exit fullscreen mode

The key for the registry entry is the topic that is being subscribed to.

Phoenix.PubSub.subscribe(pubsub, topic, opts \\ [])
Enter fullscreen mode Exit fullscreen mode

is (pretty much)

Registry.register(registry, key, value)
Enter fullscreen mode Exit fullscreen mode

If the key is the topic we can find all the topics that our process is subscribed to by calling

Registry.keys(MyApp.PubSub, self())
Enter fullscreen mode Exit fullscreen mode

which returns a list of the topics.

In order to start with a fresh set of subscriptions and avoid double subscribing or mistaking updates for one server for updates to a previous one, we can unsubscribe from all these topics before subscribing to the new one:

  defp unsubscribe_all() do
    MyApp.PubSub
    |> Registry.keys(self())
    |> Enum.map(fn topic ->
      Phoenix.PubSub.unsubscribe(MyApp.PubSub, topic)
    end)
  end

  def handle_params(%{"id" => server_id}, _url, socket) do
    unsubscribe_all()
    MyApp.PubSub
    |> Phoenix.PubSub.subscribe("server-#{server_id}")

    socket = assign(socket, :server, fetch_server(server_id))

    {:noreply, socket}
  end
Enter fullscreen mode Exit fullscreen mode

The ability to know which subscriptions a process has seems like a useful function. I am curious to know if this is useful to others or if there are problems with dropping down to the registry and retrieving this information.

Top comments (0)