RabbitMQ is the most widely deployed open source message broker. With tens of thousands of users, RabbitMQ is one of the most popular open source message brokers. From T-Mobile to Runtastic, RabbitMQ is used worldwide at small startups and large enterprises.
Using RabbitMQ message broker two Java applications can interact with each other in same way as we human being do by text message.
Install using Docker
docker-compose.yaml
version: "3.5"
# Docker services
services:
rabbitmq:
image: rabbitmq:3.8.9-management
container_name: rabbitmq
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
restart: always
tty: true
stdin_open: true
ports:
# HTTP DASHBOARD FOR MANAGEMENT
- "15672:15672"
# AMQP PROTOCOL
- "5672:5672"
# MQTT PROTOCOL
# - "1883:1883"
# MQTT OVER WEBSOCKETS
# - "15675:15675"
docker-compose up -d
username, password: guest
Dashboard RabbitMQ
Create a new Exchange
Create a new Queue
Binding Exchange with Queue
Publish message
Get message from Queue
Messaging
Is basically a form of communication, from person to person, from application to application, basically anything.
Loosely-coupled
Message: we exchange messages while we talk to each other. WhatsApp, Teams, Slack, ETC. Software components need to communicate somehow to do actual work.
STOMP: Simple Text-Oriented Messaging Protocol is basically designed for exchanging simple text messages around.
MQTT: Message Queue Telemetry Transport is basically designed for machine to machine or internet of things connectivity protocol.
AMQP: Advanced Message Queuing Protocol is basically advanced message protocol, is highly reliable and interoperable.
Four actors of messaging with RabbitMQ
Producer -sends-> Exchange -routes-> Queue -receves-> Consumer
Exchanges: are basically elements where messages are sent at first, take a message and routes it into one or more queues. Bindings are simply used to bind exchanges to queues for message delivery.
Four exchange types:
- Direct exchange: Empty string and amq.direct, direction for any exchange.
- Fanout exchange: amq.fanout
- Topic exchange: amq.topic
- Headers exchange: amq.match and amq.headers is RabbitMQ
Queues: a core element in any MQ protocol especially for RabbitMQ, messages are routed to queues from exchanges, queues are final destinations in RabbitMQ before being received by suscribers.
Properties of a Queue:
- Name: The name of the queue
- Durable: Either persist the queue to the disk or not
- Exclusive: Delete the queue if not used anymore
- Auto-Delete: Delete the queue when consumer unsubscribes
Topics: are simply the subject part of the messages, defined as routing_key for message grouping, you can send and receive messages without any topic information.
Bindings: we have exchange and between queue and the exchange we have a binding.
Init spring boot application
spring initializr : https://start.spring.io/
dependency:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.port=5672
Start application
@SpringBootApplication
public class DemoApplication implements CommandLineRunner {
@Autowired
private RabbitTemplate rabbitTemplate;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
rabbitTemplate.convertAndSend("TestExchange", "testRouting", "Hello from code!!!");
}
}
Exchange
@Bean
Exchange exampleExchange() {
return new TopicExchange("ExampleExchange");
}
@Bean
Exchange example2ndExchange() {
return ExchangeBuilder.directExchange("Example2ndExchange")
.autoDelete()
.internal()
.build();
}
@Bean
Exchange newExchange() {
return ExchangeBuilder.topicExchange("TopicTestExchange")
.autoDelete()
.durable(true)
.internal()
.build();
}
@Bean
Exchange fanoutExchange() {
return ExchangeBuilder.fanoutExchange("FanoutTestExchange")
.autoDelete()
.durable(false)
.internal()
.build();
}
@Bean
Exchange headersExchange() {
return ExchangeBuilder.headersExchange("HeadersTestExchange")
.internal()
.durable(true)
.ignoreDeclarationExceptions()
.build();
}
Message
public class RabbitMQMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("Message = [" + new String( message.getBody() ) + "]");
}
}
Queue
@Bean
Queue exampleQueue() {
return new Queue(NAME_QUEUE, false);
}
@Bean
Queue example2ndQueue() {
return QueueBuilder.durable(NAME_2ND_QUEUE)
.autoDelete()
.exclusive()
.build();
}
Top comments (0)