DEV Community

Cover image for Subscribe to messages with pattern matching
Lasse Skindstad Ebert
Lasse Skindstad Ebert

Posted on

Subscribe to messages with pattern matching

This story was originally posted by me on Medium on 2017-04-13, but moved here since I'm closing my Medium account.

Today I needed to build a small and simple publish/subscribe hub. The feature I cared about was to be able to subscribe to messages with Elixir pattern matching.

Before I dive into that, let me explain the difference from a normal publish/subscribe hub.

Often with the publish/subscribe pattern, subscribers subscribe to some topic and publishers publish a term to some topic.

Example:

# Probably started by a supervisor:
{:ok, pid} = PubSub.start_link

# In our process
PubSub.subscribe(:user_created)
receive do
  {:user_created, user} -> # Do something with user
end

# And in some other process:
PubSub.publish(:user_created, user)

This is all fine. As long as we don’t need an infinite number of topics.

Subscribe with pattern matching

In my application, I receive around 100 different types of messages from a lot of IoT devices. I would like different processes to be able to subscribe to incomming messages, but each process may have very different needs in terms of which messages they need to see.

Some processes would like to see all messages originating from a specific device. Some would like all messages of a certain type. Some would like to subscribe to a message with a certain sequence number.

This is not doable with the topic-approach. What I would really like is to use Elixir pattern matching to decide which messages to subscribe to.

Something like this:

# Subscribe to all messages from a specific origin
PubSub.subscribe(%{origin: "1234"})

# Subscribe to all messages of a specific type
PubSub.subscribe(%SomeMessageType{})

# Subscribe to all messages where the signal strength is bad
PubSub.subscribe(%{signal_strength: strength} when strength < 5)

Can this be done in Elixir? YES! It can be done. Or at least in my small prototype, which I will start to use in a real application very soon to test it out.

I started to have a look at Kernel.match/2 (docs here: https://hexdocs.pm/elixir/Kernel.html#match?/2), as it seemed to solve the same problem I was aiming at. Unfortunately, it couldn’t be used directly, but I found a lot of inspiration from the (pretty simple) source code of that macro.

I began to play in iex with quote, Macro.escape and other functions and macros that does something to the AST:

iex(41)> quote do: %{signal_strength: strength} when strength < 5
{:when, [],
 [{:%{}, [], [signal_strength: {:strength, [], Elixir}]},
  {:<, [context: Elixir, import: Kernel], [{:strength, [], Elixir}, 5]}]}
iex(42)> quote do
...(42)> case %{signal_strength: 7} do
...(42)> %{signal_strength: strength} when strength < 5 -> true
...(42)> _ -> false
...(42)> end
...(42)> end
{:case, [],
 [{:%{}, [], [signal_strength: 7]},
  [do: [{:->, [],
     [[{:when, [],
        [{:%{}, [], [signal_strength: {:strength, [], Elixir}]},
         {:<, [context: Elixir, import: Kernel],
          [{:strength, [], Elixir}, 5]}]}], true]},
    {:->, [], [[{:_, [], Elixir}], false]}]]]}

This is almost all I need.

The solution

I came up with this lovely function, which takes a quoted pattern and a value and returns true if the value matches the pattern:

def pattern_match?(quoted_pattern, term) do
  ast = {:case, [], [
    Macro.escape(term), [
      do: [
        {:->, [], [[quoted_pattern], true]},
        {:->, [], [[{:_, [], Elixir}], false]}
      ]
    ]
  ]}

  {result, _} = Code.eval_quoted(ast)
  result
end

And it worked! This nice little function lets me use a saved pattern stored as AST to match a value.

I assembled a simple Hub built on GenServer:

defmodule Hub do
  @moduledoc """
  Publish/subscribe server.
  Subscription is done with a pattern.
  Example:
    {:ok, _pid} = Hub.start_link(name: :hub)
    Hub.subscribe(:hub, %{count: count} when count > 42)
    Hub.publish(:hub, %{count: 45, message: "You rock!"})
  """

  @doc """
  Starts the Hub GenServer process
  """
  @spec start_link(Genserver.options) :: GenServer.on_start
  def start_link(options \\ []) do
    GenServer.start_link(__MODULE__, :ok, options)
  end

  @doc """
  Convenience macro for subscribing without the need to unquote the pattern.
  example:
    Hub.subscribe(:hub, %{count: count} when count > 42)
  """
  defmacro subscribe(hub, pattern, pid \\ nil) do
    quote do
      pid = unquote(pid) || self()
      Hub.subscribe_quoted(unquote(hub), unquote(Macro.escape(pattern)), pid)
    end
  end

  @doc """
  Subscribes the given pid to the quoted pattern
  example:
    Hub.subscribe(:hub, quote do: %{count: count} when count > 42)
  """
  @spec subscribe_quoted(pid, any, pid) :: :ok
  def subscribe_quoted(hub, quoted_pattern, pid \\ self()) do
    GenServer.call(hub, {:subscribe, {pid, quoted_pattern}})
  end

  @doc """
  Publishes the term to all subscribers that matches it
  """
  @spec publish(pid, any) :: :ok
  def publish(hub, term) do
    GenServer.cast(hub, {:publish, term})
  end

  def init(:ok) do
    {:ok, []}
  end

  def handle_call({:subscribe, subscriber}, _from, subscribers) do
    {:reply, :ok, [subscriber | subscribers]}
  end

  def handle_cast({:publish, term}, subscribers) do
    subscribers
    |> Enum.each(fn {pid, pattern} ->
      if pattern_match?(pattern, term) do
        send(pid, term)
      end
    end)
    {:noreply, subscribers}
  end

  defp pattern_match?(pattern, term) do
    ast = {:case, [], [
      Macro.escape(term), [
        do: [
          {:->, [], [[pattern], true]},
          {:->, [], [[{:_, [], Elixir}], false]}
        ]
      ]
    ]}

    {result, _} = Code.eval_quoted(ast)
    result
  end
end

Notice that it is possible to subscribe either using a function or a convenience macro.

This lets me do this:

{:ok, _pid} = Hub.start_link(name: :hub)

# In one process
Hub.subscribe(:hub, %{signal_strength: strength} when strength < 5)

# In another process
Hub.publish(:hub, %MyMessageType{signal_strength: 4})

And that’s it!

If you know of a better or more obvious way to acheive the same thing, please let me know :)

Top comments (0)