Prerequisites
For this tutorial you need some background of Java, maven, and have docker installed on your machine
Queue
In computer science there’s the concept of queues. Queue is the set of messages that are meant to be delivered from one sender to one or more receivers. The messages can be delivered in order or out of order by design. The computer program that handles these transactions is called message broker. RabbitMQ is one the most popular message brokers that runs on top of Advanced Message Queuing Protocol (AMQP). There are four main components forming AMQP protocol: Publisher, Exchange, Queue, Consumer.
Publisher
Messages are Published to an exchange by a publisher, Publisher also is responsible for setting the attributes of the message which we will cover later.
Exchanges
Exchanges are responsible for routing the messages to one or more Queues, we will cover Queues Later. There are 4 different types of exchanges in rabbitmq.
1.Direct
2.Fanout
3.Topic
4.Header
For this tutorial we are going to cover only two: Direct, I’m gonna do another tutorial on the Fanout exchange later.
Direct exchanges are responsible for routing messages to a queue based on the Routing key. When you declare a queue you can “Bind” it to an exchange using a Routing key, we will cover this topic later. Direct queues are suitable for distributing tasks among Workers.
A Fanout exchange sends a message to all of the queues that are bound to the exchange by a routing key. When a message comes in, the exchange will send a copy of that message to all Queues. Fanout exchanges are useful to broadcast a message to multiple nodes in a distributed system.
Queues
Queues are responsible for storing the messages and delivering them to consumers. Queues need to get declared before you can start using them. A Queue needs to bind to an exchange so it can start receiving messages. Binding is a set of rules that exchanges use to route messages to one or more queues.
Consumers
Consumers are the last piece of the puzzle, they need to Subscribe to a queue so they can start receiving messages, when a consumer receives and processes a message, it needs to “Acknowledge” the message in order to get another one.
Installing rabbitMQ
We will use docker to install rabbitmq and it’s management UI.
docker run --rm -it --hostname my-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
The above command will install rabbitmq and bind two ports to your local port: 5672 and 15672.
You can use 15672 to get into rabbitMQ management portal: http://localhost:15672 the default username password is guest/guest.
The above command will install rabbitmq and bind two ports to your local port: 5672 and 15672.
You can use 15672 to get into rabbitMQ management portal: http://localhost:15672 the default username password is guest/guest.
We need to use a client library on port 5672 to communicate with the rabbitMQ server.
Now let's create a direct exchange and a queue. But first we have to download the client library. I use maven, paste this dependency in your pom.xml then do a maven install to download the client.
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
Now we have the library downloaded, let’s create a connection factory. I create a package called com.queue
package com.queue;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Queue {
private static final String HOST = "localhost";
private Channel channel;
public Queue() {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost(HOST);
try {
Connection connection = cf.newConnection();
channel = connection.createChannel();
} catch (Exception e) {
System.err.println(e);
}
}
public void createExchangeQueue(String queueName, String exchangeName, String exchangeType, String key) {
try {
channel.queueDeclare(queueName, false, false, false, null);
channel.exchangeDeclare(exchangeName, exchangeType);
channel.queueBind(queueName, exchangeName, key);
} catch (Exception e) {
System.err.println(e);
}
}
}
There are a lot happening, let’s break it down
I first created my public constructor called Queue. Then within that constructor opened the connection to rabbitMQ and created a channel and assigned the channel object to a variable called channel. After that, I created a public void method called createExchangeQueue to create a queue and an exchange and bind those two together over a key.
Ok now we have our Queue class. Let's use it. Just for fun I’m gonna write a program to calculate the square of a number.
package com.rabbit;
import com.queue.Queue;
public final class App {
private static String QUEUE_NAME = "square";
private static String EXCHANGE_NAME = "myExchange";
private static String KEY_NAME = "key";
public static void main(String[] args) {
Queue queue = new Queue();
queue.createExchangeQueue(QUEUE_NAME, EXCHANGE_NAME, "direct", KEY_NAME);
}
}
After you run the above code you can navigate to the management interface and click on queues tab and you will see the square under the list of queues. Now let’s modify our Queue class to send messages to our square queue and consume messages from the queue.
package com.queue;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.nio.charset.StandardCharsets;
import com.rabbitmq.client.DeliverCallback;
public class Queue {
private static final String HOST = "localhost";
private Channel channel;
public Queue() {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost(HOST);
try {
Connection connection = cf.newConnection();
channel = connection.createChannel();
} catch (Exception e) {
System.err.println(e);
}
}
public void sendMessage(String exchange, String key, String message){
try {
channel.basicPublish(exchange, key, null, message.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
System.err.println(e);
}
}
public void listenToQueue(String queueName, DeliverCallback dlr) {
try {
channel.basicConsume(queueName, true, dlr, consumerTag -> { });
} catch (Exception e) {
System.err.println(e);
}
}
public void createExchangeQueue(String queueName, String exchangeName, String exchangeType, String key) {
try {
channel.queueDeclare(queueName, false, false, false, null);
channel.exchangeDeclare(exchangeName, exchangeType);
channel.queueBind(queueName, exchangeName, key);
} catch (Exception e) {
System.err.println(e);
}
}
}
I created two new methods. The first one is a public void method called sendMessage, what it does is it accepts exchange, key and message then it uses the basicPublish method of our channel object to deliver the message to our square queue. The second method is another public void one which accepts the queue name and a callbackFunction. The basicConsume method invokes our callbackFunction every time there’s a new message inside our square queue. Now we have our queue class fully written, let's start working on our square logic which is our consumer.
package com.square;
import com.rabbitmq.client.DeliverCallback;
import com.queue.Queue;
public class Square{
private static String QUEUE_NAME = "square";
private static String EXCHANGE_NAME = "myExchange";
private static String KEY_NAME = "key";
public void listenToMessage(){
Queue queue = new Queue();
queue.createExchangeQueue(QUEUE_NAME, EXCHANGE_NAME, "direct", KEY_NAME);
queue.listenToQueue(QUEUE_NAME, findSquare);
}
DeliverCallback findSquare = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
int number = Integer.parseInt(message);
int squareNumber = number * number;
System.out.println("Square of " + message + " is: " + squareNumber );
};
}
This is our Consumer, it has one public method, the first one opens a new connection, creates a new queue(the square queue already exists, so it’s just making sure it’s there before it starts listening to it) and then starts listening to the queue. Then we have a callback function called findSquare, it gets the message from the queue, parses the message (The message comes as a string), finds the square of the number and prints it into the screen.
And here’s our main Class
package com.rabbit;
import com.queue.Queue;
import com.square.Square;
public final class App {
private static String QUEUE_NAME = "square";
private static String EXCHANGE_NAME = "myExchange";
private static String KEY_NAME = "key";
public static void main(String[] args) {
Queue queue = new Queue();
queue.createExchangeQueue(QUEUE_NAME, EXCHANGE_NAME, "direct", KEY_NAME);
queue.sendMessage(EXCHANGE_NAME, KEY_NAME, "5");
Square sq = new Square();
sq.listenToMessage();
}
}
Now after you run the above code, you can navigate to your queue from the management ui and you’ll see that you have a message inside square queue
Let's consume that message and find the square
package com.rabbit;
import java.util.ArrayList;
import java.util.List;
import com.queue.Queue;
import com.square.Square;
public final class App {
private static String QUEUE_NAME = "square";
private static String EXCHANGE_NAME = "myExchange";
private static String KEY_NAME = "key";
public static void main(String[] args) {
Queue queue = new Queue();
queue.createExchangeQueue(QUEUE_NAME, EXCHANGE_NAME, "direct", KEY_NAME);
List<String> numbers = new ArrayList<String>();
numbers.add("1");
numbers.add("2");
numbers.add("3");
numbers.add("4");
numbers.add("5");
numbers.forEach((n)-> queue.sendMessage(EXCHANGE_NAME, KEY_NAME, n));
Square sq = new Square();
sq.listenToMessage();
}
}
Now we have a list of numbers instead of one, and we are using the forEach method to iterate through the list and send the numbers to the queue one by one. Now run your consumer and you will see the consumer automatically consumes all the messages and shows the square of all the numbers.
Conclusion
RabbitMQ is a popular message broker that runs on top of AMPQ protocol. AMPQ protocol is consist of 4 components: 1-Publisher, 2-Exchange, 3-Queue, 4-Consumer.
Top comments (2)
Awesome writeup, Farzad.
Thanks for including the set up code, very useful. One couple things I'd like to add is that RabbitMQ with the MQTT protocol has priority queueing capability and can also handle federated data and shoveled data. I think they're pretty cool, just in case someone reading this doesn't know.
Thanks for your comment, I will read about MQTT, sounds interesting!