Event sourcing is a powerful way to structure an application, and I've discovered that it pairs beautifully with Phoenix LiveView.
The context: I'm building a cryptocurrency exchange application. I don't have the business chops to run an actual exchange, so this is just for fun. The application is built in Elixir, using the Commanded framework for CQRS/ES goodness, and Phoenix LiveView because it's the hot new thing that I wanted to learn.
My goal is to use LiveView to update a price chart as trades are executed by the system. A LiveView process is a lot like a GenServer, with a bespoke process for each client, executing handle_*
functions as the client does things. The first step to real-time chart updates is to trigger one of these handler functions in my LiveView controller when a trade is executed. I'm using Commanded's own EventStore library to dispatch and store my events, so their documentation is the place to start.
In the EventStore documentation, I found that subscribing to the event stream is really simple: it's a single function call. Here's the example from EventStore's documentation on transient subscriptions that we care about:
alias EventStore.RecordedEvent
alias MyApp.EventStore
EventStore.subscribe(stream_uuid, selector: fn
%RecordedEvent{data: data} -> data != nil
end)
# receive first batch of mapped event data
receive do
{:events, %RecordedEvent{} = event_data} ->
IO.puts("Received non nil event data: " <> inspect(event_data))
end
All we need to do is call EventStore.subscribe/2
, optionally with a selector function, and then the current process will start receiving events. We're going to call this in our LiveView's mount/3
callback. I'm also going to load the initial set of data here.
defmodule ExchangeWeb.DashboardLive do
use ExchangeWeb, :live_view
alias EventStore.RecordedEvent
alias Exchange.EventStore
def mount(_, _, socket) do
:ok = EventStore.subscribe("$all",
selector: fn %RecordedEvent{event_type: type, data: data} ->
type == "Elixir.Exchange.Orderbook.TradeExecuted" and
data.symbol == "BTCUSDT"
end,
mapper: fn %RecordedEvent{data: data} -> data end)
trades = Exchange.Orderbooks.trades(symbol)
{:ok, assign(socket, trades: trades)}
end
def handle_info({:events, events}, socket) do
trades =
Enum.map(events, &Map.take(&1, [:timestamp, :price]))
{:noreply, push_event(socket, "trades", %{trades: trades})}
end
end
Now, if you were using a server-side charting library like ContEx, then you would just append the new events to what you've already got assigned to the socket, and your normal rendering function would rebuild the chart. You're done! But I wanted to make it more complicated.
I'm using Chart.js, which is a popular graphing JavaScript library. It lives entirely on the client-side, which isn't very agreeable with Phoenix LiveView's server-side focus. Fortunately, LiveView allows you to set up JavaScript hooks, and then push events to them. We can make the client event-sourced, too! That's why I'm using push_event/3
instead of assign/3
in the example above. I'm using LiveView's JavaScript hooks to respond to events that I push from the LiveView process. Read more about LiveView JavaScript interoperability, it's really interesting.
A LiveView client hook is an object containing a couple of functions.
We're going to create a mounted()
function to initialize the chart with the data we already have. After that, we're going to set up an event handler. The mounted()
function receives a this
object with a few utilities, with the handleEvent
function being the most important to us. We call this.handleEvent
to set up the function that will handle the event that we pushed in our LiveView module's handle_info
callback.
import Chart from 'chart.js/auto';
let Hooks = {};
Hooks.TradesChart = {
mounted() {
let chart = new Chart(this.el, {
# Configuration for your chart, nothing interesting here
});
this.handleEvent("trades", (event) => {
event.trades.forEach((trade) => {
chart.data.datasets.forEach((dataset) => {
trade.executed_at = new Date(Date.parse(trade.executed_at))
dataset.data.push(trade)
})
})
chart.update();
});
},
};
let liveSocket = new LiveSocket("/live", Socket, {hooks: Hooks})
This will push new data into the chart without asking it to completely re-render.
Lastly, we need to attach the hook to the element containing the chart. That's accomplished by adding a phx-hook
attribute to your markup element:
<canvas id="trades-chart" phx-hook="TradesChart" width="400" height="200"></canvas>
By adding that attribute, you've told Phoenix LiveView to call the mounted()
hook when the <canvas>
element is mounted, which will then subscribe the update function to "trade"
events sent by the backend.
All together, EventStore pushes events to our LiveView process, which pushes an event to our client hook, which puts the new data in the chart. Event sourcing is so cool!
See the whole project here:
Top comments (1)
Ah, interesting. I've been thinking about combining Commanded and Live view lately, but instead of just subscribing to EventStore in LiveViews I more though about having some kind of a dispatching GenServer in the middle: one "liveview subscriber" on EventStore would just call this dispatcher and it would dispatch the message to interested liveviews subscribed to it. Thanks to your post I see that I would probably introduce unnecessary level of abstraction and complexity 👍