DEV Community

Lasse Skindstad Ebert
Lasse Skindstad Ebert

Posted on

Elixir PubSub Hub With Pattern Matching

This story was originally posted by me on Medium on 2017-04-15, but moved here since I'm closing my Medium account.

In a previous post, I wrote about some ideas to utilize Elixir pattern matching when subscribing to messages in a pub-sub hub.

The idea evolved to production code, and has now been extracted as an open source Elixir library called Hub.

This post is about the making of the library.

If you just want to see some code and example usage, look at the github page:

My use case: IoT messages

As many ideas, this idea started when a need arose. I’m building an application that has persistent TCP connections to a lot of small linux boxes, which in turn each has a wireless connections to a lot of battery powered IoT devices.

To get the terms right, the small linux boxes are called gateways and the IoT devices are referred to as just devices.

I have a two way communication with each device via the gateways. Most messages are measurements send on a regular basis from the devices, but the messages can also be administrative messages just to keep the network running or to configure devices.


Some messages come in flows. That is, the messages form a conversation and only make sense if the previous messages in the flow are known. As an example for this blog post, I’ll look at how I ask a gateway which authorized devices it knows about:

I’ll send a get entry request message to the gateway, asking for all known authorized devices starting from index zero.

The gateway responds with a get entry response, giving me all serial numbers of the devices that can fit in one message with some maximim message size. Let’s say that 10 device serials can fit in one message.

I’ll re-query with another get entry request, this time asking for devices starting at index 10.

This continues until a get an empty get entry response, which means that there are no more authorized devices. I can then perform some logic with the known authorized devices in my end of the application.

This flow is easily recognized as a client-server pattern with a request and a response. However, the raw TCP connection I have does not know anything about server and client. In fact, the roles of the client and server might switch in the next flow.

Until now, I have solved this in a not-so-nice way by almost ignoring that flows exists, and only care about single messages. In the above case, if I need all authorized devices, I’ll send a get entry request. If I then later receive a get entry response, I assume that it was a response to a previous request I send and I’ll “continue” the flow and send another get entry request.

There are multuple problems with this approach:

I cannot register that a timeout happened. If I send a request and do not receive a response within some time frame, I can not register this as being an error. I just fire-and-forget the request.

I cannot handle complex flows where similar messages might mean something different depending on where we currently are in the flow or which flow it is received in.

The network sends a lot of messages by itself, and I can not easily differentiate which messages was a response to something I send, and which messages I can ignore, because the network itself handles them.

The solution

A TCP connection to a gateway is handled by a GenServer. Whenever a messages is received, it is until now handled within that same process.

Now, I have changed this to use a pub-sub hub. Instead of handling the message in the same process, it publishes the message to a pub-sub hub. If no other processes subscribed to this exact message, it is processed by a default message handler, which mostly just logs the message.

Whenever I want to start a flow of messages, I start a process that handles just that flow. The process is started as a Task and is monitored by a TaskSupervisor.

The new process subscribes to a specific kind of message from a specific gateway and has a timeout. The process only exists in the lifespan of the flow.

Hub, first version

To make this possible, I needed to be able to subscribe to a particular message, perhaps with some particular value inside the message. The many different message types and combinations of values didn’t make this possible unless I want to publish each message to each process, which I think could quickly become a performance concern.

Instead I would like to subscribe with an Elixir pattern, just like one can use them in guard clauses and function definitions.

In my previous post, I described how I can save a pattern and use it later. I used this to build a first version if Hub, which was powered by a GenServer with the internal state being all subscriptions.

In my flows, I can often limit a subscription to messages received via a specific gateway, so I use the gateway id as channel and can then subscribe with a pattern inside that channel.

This worked but had some missing parts. I especially missed monitoring of processes, so that dead processes was auto-unsubscribed.

Hub, with Phoenix Presence

Monitoring of processes is what Phoenix Presence is good at. I could easily change my GenServer to a simple module that used Phoenix Presence as the backend.
My application was already using Phoenix and Presence, so I actually had one less running process by this change.

Hub, the standalone version

I wanted to extract Hub as a standalone hex package, but I did not want it to only being able to run inside a Phoenix application.

I started to look at how Phoenix Presence is implemented and found out it is powered by Phoenix PubSub, which is a standalone package. Yay.

I extracted Hub to its own repository and changed it to use Phoenix PubSub instead of Phoenix Presence. This required me to run two processes. A Tracker and a PubSub process. The documentation in Phoenix PubSub is good, so I was up and running in no time.

Hub can now be used by anyone and has a simple interface.

See more on the github page:

Top comments (0)