DEV Community

Masatoshi Nishiguchi
Masatoshi Nishiguchi

Posted on • Edited on

6 3

Real-time page refresh using Phoenix LiveView and PubSub

I enjoy IoT development using Elixir programming language, Nerves IoT platform and Phoenix web framework. It is so much fun. I was able to build a real-time temperature and humidity monitoring system for my living room. My API server accepts a sensor measurement then broadcasts the update to all the users of my real-time dashboard.

Today I am going to write about real-time page refresh using Phoenix LiveView and Phoenix PubSub so that I can quickly implement the same thing in the future.

4/3(土) 00:00〜 4/5(月) 23:59開催のautoracex #21での成果です。

日本語版

hello-nerves-2

Implement PubSub utilities in a context module

First of all, I prepare utility functions for subscribing and broadcasting messages in a given context module. Here I add subscribe/0 and broadcast/2 to the Example.Environment context module. I use inspect(__MODULE__) as a topic so that I can ensure that the topic name is unique as well as saving time for decision making on the topic name and discovery of the topic name.

 defmodule Example.Environment do

   ...
+
+  @topic inspect(__MODULE__)
+
+  @doc """
+  Subscribe to this context module's messages.
+  """
+  def subscribe do
+    Phoenix.PubSub.subscribe(Example.PubSub, @topic)
+  end
+
+  @doc """
+  Broadcast a message to the subscribers when something happens.
+  """
+  def broadcast({:ok, record}, event) do
+    Phoenix.PubSub.broadcast(Example.PubSub, @topic, {event, record})
+    {:ok, record}
+  end
+
+  def broadcast({:error, _} = error, _event), do: error
+
Enter fullscreen mode Exit fullscreen mode

Broadcast a message as needed

Now that I have PubSub utility functions, I can broadcast a message when something happens. In the following example, I notify all the subscribers of a newly-inserted measurement record.

 defmodule Example.Environment do

   ...

   @doc """
   Creates a measurement.

   ## Examples

       iex> create_measurement(%{field: value})
       {:ok, %Measurement{}}

       iex> create_measurement(%{field: bad_value})
       {:error, %Ecto.Changeset{}}

   """
   def create_measurement(attrs \\ %{}) do
     %Measurement{}
     |> Measurement.changeset(attrs)
     |> Repo.insert()
+    |> broadcast(:measurement_inserted)
   end
Enter fullscreen mode Exit fullscreen mode

Subscribe to PubSub topic when LiveView is connected

In a LiveView, I subscribe to a PubSub topic using the PubSub utilities prepared above. It is important that the subscription has to be done after the LiveView connecton has been established.

 defmodule ExampleWeb.EnvironmentLive do
   use ExampleWeb, :live_view

   ...

   @impl true
   def mount(_params, _session, socket) do
+    if connected?(socket) do
+      Environment.subscribe()
+    end

     ...

     {:ok, socket, temporary_assigns: [measurements: []]}
   end
Enter fullscreen mode Exit fullscreen mode

Handle PubSub message as needed

Once a LiveView has subscribed to a PubSub topic, it will receive a message whenever a mesage is broadcast on the subscribed topic. I can handle the event, pattern-matching the event in handle_info/2. In this example, an event is a tuple like {event_name, new_record} because that is the format that my broadcast function uses.

 defmodule ExampleWeb.EnvironmentLive do
   use ExampleWeb, :live_view

   ...

+  def handle_info({:measurement_inserted, new_measurement}, socket) do
+    # TODO: do something
+  end
Enter fullscreen mode Exit fullscreen mode

Throttle incoming PubSub messages

I want to refresh the real-time dashboard based on incoming PubSub messages, but at the same time I want to control how often I refresh the dashboard no matter how fast PubSub messages are coming in. In other words, I do not want to overwhelm my LiveView rendering in case messages are coming in ridiculously fast.

So I calculate the time when I want to do the refresh next based on refresh_interval value, ignoring any messages until will_refresh_at has elapsed. I use Timex library so the time-related calculation is human-readable.

   def handle_info({:measurement_inserted, new_measurement}, socket) do
-    # TODO: do something
+    if refresh_interval_elapsed?(socket) do
+      {:noreply, assign(socket, last_measurement: new_measurement)}
+    else
+      {:noreply, socket}
+    end
   end
+
+  # Check if the refresh interval has elapsed. (next_refresh >= now)
+  defp refresh_interval_elapsed?(socket) do
+    next_refresh = DateTime.add(socket.assigns.last_measurement.measured_at, socket.assigns.refresh_interval)
+
+    case DateTime.compare(DateTime.utc_now(), next_refresh) do
+      :gt -> true
+      :eq -> true
+      _ -> false
+    end
+  end
Enter fullscreen mode Exit fullscreen mode

That's it!

Top comments (0)

Image of Docusign

🛠️ Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more