What is it?
The LocalMessageQueue is a framework for building asynchronous and observable pipelines locally within an Elixir application. The queues in this pipeline use the Pub-Sub pattern to subscribe to incoming data and to publish the results of processing that data to listeners. Messages can be cached to avoid redundant work and queues can be easily observed to determine current workloads. Queues can be composed by subscribing to each others topics, but other processes can also listen in or dispatch messages as well.
Motivation
The main motivation for building this framework was a small feature of a side-project. This feature required crawling web pages where each "crawl" required its own unique set of processing steps. The first iteration of this feature was synchronous, which meant each level of the "crawl" was performed in its entirety before the next level could begin. The main drawback of this approach was that it was slow to complete.
Fortunately, levels were only loosely coupled with each other. As soon as one level began to process and retrieve results, there was no reason that the next level couldn't immediately start, which would trigger the next level, etc, and ultimately, the final results could be made available as they came in instead of when they were all ready. This prompted a search for an asynchronous solution. Elixir, with its actor model and concurrent message passing capabilities (and which was also the language that the app was written in), made the solution easy to achieve. Each level would get its own process and trigger work in subsequent levels via message passing, which meant work could be done concurrently.
Ultimately though, I wanted decouple the processes from each other as much as possible. For one, I wanted to make sure that all kinds of processes could interact with this pipeline, not just processes it itself knew about, and that it would be responsive to those requests: if it was busy with I\O (e.g., crawling a web page), would another process be delayed in checking in on the length of the process's queue? Also another goal of mine was to publish results (as well as status updates) through Phoenix's Channels without my business logic layer needing to know anything about my web layer. This meant it would be great if a channel could somehow listen to events from a process without that process knowing anything about its listeners. Other goals included being able to cache results in certain stages and forward them immediately without needing to wait for their turn in the queue.
PubSub felt like a natural solution to this problem and is easily possible thanks to Elixir's built-in Registry module. The Registry is probably best known for enabling simple location transparency for processes (i.e., giving processes names, in this case safe and dynamic names that are not atoms). But it was also released with dispatching and pub-sub functionality, and since then it had been a goal of mine to find a legitimate use case for that part of the module.
Architecture
LocalMessageQueues can be thought of from the outside as a single unit, but underneath, each unit is a supervised set of processes with a narrow set of responsibilities. These processes are: the Preloader, the Cache, the Queue and the Consumer.
The Preloader uses the Registry to subscribe to a particular topic, and when this process receives messages, it first checks those messages against the Cache to see if it can immediately publish a subset of the results. Messages not found in the Cache are then added to the Queue. Upon receiving new messages, the Queue dispatches events to interested listeners.
The primary listener is the Consumer, which will process the event, but other processes can listen in as well if they want to observe the status of the Queue. When the Consumer gets an event that its Queue has items to process, it will begin to pull from the Queue until it is out of messages. This allows the Consumer to remain dormant until there is work to be done. Finally, when the Consumer processes a message, it adds those results to the Cache and then dispatches them through the Registry so that listeners can react.
In Practice
Now, thanks to this framework, all levels of the crawl are performed asynchronously and setup through simple configuration (e.g., what function to use for processing, should there be a cache, should processing be throttled, should some messages be filtered, etc.). As an added bonus, Phoenix Channels, being processes themselves, can easily subscribe to these LocalMessageQueues through the Registry and receive messages through their handle_info/2 callback, thus the business logic layer and web layer remain untangled.
Top comments (0)