DEV Community

Cover image for Java Message Brokers: 5 Essential Technologies for Enterprise Integration
Aarav Joshi
Aarav Joshi

Posted on

Java Message Brokers: 5 Essential Technologies for Enterprise Integration

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Java message brokers have transformed enterprise integration by enabling reliable communication pathways between distributed systems. I've worked extensively with these technologies and found them essential for building resilient architectures that can scale effectively.

Message Brokers in Java Enterprise Integration

Message brokers serve as intermediaries that handle message validation, routing, and delivery between application components. They form the foundation of asynchronous communication, allowing systems to interact without direct dependencies.

The core benefits include decoupling of services, improved fault tolerance, and better load handling during traffic spikes. In my experience, this architectural approach has consistently improved system reliability.

Apache ActiveMQ

ActiveMQ stands as one of Java's most established message brokers, offering comprehensive JMS support alongside modern protocols. Its maturity brings stability to enterprise environments.

I've implemented ActiveMQ in several production systems, and its straightforward configuration makes it accessible for teams new to message-oriented middleware.

// Producer example with ActiveMQ
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("order.processing");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

TextMessage message = session.createTextMessage("New order: #1001");
message.setStringProperty("OrderType", "Standard");
producer.send(message);

// Clean up resources
producer.close();
session.close();
connection.close();
Enter fullscreen mode Exit fullscreen mode

For consumers, implementing message reception requires similar connection setup:

// Consumer example with ActiveMQ
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("order.processing");
MessageConsumer consumer = session.createConsumer(destination);

consumer.setMessageListener(message -> {
    if (message instanceof TextMessage) {
        try {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("Received: " + textMessage.getText());
            System.out.println("Order type: " + textMessage.getStringProperty("OrderType"));
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
});

// Keep the consumer running (in a real application, you'd have a cleaner approach)
// When ready to shut down:
// consumer.close();
// session.close();
// connection.close();
Enter fullscreen mode Exit fullscreen mode

ActiveMQ also supports clustering for high availability, which I've configured in production to ensure message delivery even during broker failures.

RabbitMQ

RabbitMQ excels in scenarios requiring complex routing patterns. Its implementation of the Advanced Message Queuing Protocol (AMQP) provides sophisticated message routing through exchanges and queues.

I've deployed RabbitMQ in microservices architectures where its routing capabilities proved invaluable for directing messages to appropriate services.

// RabbitMQ producer with exchange-based routing
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {

    // Declare exchange and queue
    String exchangeName = "orders.exchange";
    String queueName = "orders.processing";
    String routingKey = "order.created";

    channel.exchangeDeclare(exchangeName, "topic", true);
    channel.queueDeclare(queueName, true, false, false, null);
    channel.queueBind(queueName, exchangeName, routingKey);

    // Create and send message
    String message = "{\"orderId\":\"12345\",\"customer\":\"John Doe\",\"amount\":99.95}";
    channel.basicPublish(exchangeName, routingKey,
                         new AMQP.BasicProperties.Builder()
                             .contentType("application/json")
                             .deliveryMode(2) // persistent
                             .build(),
                         message.getBytes(StandardCharsets.UTF_8));
}
Enter fullscreen mode Exit fullscreen mode

RabbitMQ consumers can implement acknowledgment patterns to ensure reliable processing:

// RabbitMQ consumer with manual acknowledgment
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String queueName = "orders.processing";
channel.queueDeclare(queueName, true, false, false, null);
// Limit to processing one message at a time
channel.basicQos(1);

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
    try {
        System.out.println("Processing message: " + message);
        // Process the message...

        // Acknowledge successful processing
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    } catch (Exception e) {
        // Nack and requeue on failure
        channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
    }
};

channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
Enter fullscreen mode Exit fullscreen mode

RabbitMQ's reliability extends to its management interface, which provides valuable insights into queue performance and system health.

Apache Kafka

Kafka represents a paradigm shift in messaging systems, focusing on high-throughput, durable, distributed event streaming. It's particularly suited for log aggregation, event sourcing, and real-time analytics.

I've implemented Kafka in systems processing millions of events daily, where its linear scalability proved essential.

// Kafka producer example
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");

Producer<String, String> producer = new KafkaProducer<>(props);

// Send messages with keys for consistent partitioning
for (int i = 1; i <= 5; i++) {
    String key = "user-" + i;
    String value = "{\"userId\":\"" + key + "\",\"action\":\"login\",\"timestamp\":" + System.currentTimeMillis() + "}";
    ProducerRecord<String, String> record = new ProducerRecord<>("user-events", key, value);

    producer.send(record, (metadata, exception) -> {
        if (exception == null) {
            System.out.printf("Message sent to partition %d with offset %d%n", 
                              metadata.partition(), metadata.offset());
        } else {
            System.err.println("Failed to send message: " + exception.getMessage());
        }
    });
}

producer.flush();
producer.close();
Enter fullscreen mode Exit fullscreen mode

Kafka's consumer model uses consumer groups for parallel processing:

// Kafka consumer group example
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "user-analytics-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user-events"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("Received message: key = %s, value = %s, partition = %d, offset = %d%n",
                             record.key(), record.value(), record.partition(), record.offset());

            // Process the message...

            // Manually commit offsets
            Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
            offsetMap.put(
                new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1)
            );
            consumer.commitSync(offsetMap);
        }
    }
} finally {
    consumer.close();
}
Enter fullscreen mode Exit fullscreen mode

Kafka's strength lies in its retention capabilities and stream processing integration through Kafka Streams and KSQL.

Apache ActiveMQ Artemis

Artemis represents the next generation of ActiveMQ, built on a high-performance core with improved throughput and clustering capabilities.

My teams have migrated legacy ActiveMQ deployments to Artemis and experienced significant performance improvements while maintaining protocol compatibility.

// Artemis producer using JMS 2.0 API
try (ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
     JMSContext context = cf.createContext(JMSContext.AUTO_ACKNOWLEDGE)) {

    Queue queue = context.createQueue("orders.priority");
    JMSProducer producer = context.createProducer();

    // Set message properties
    producer.setProperty("priority", "high")
           .setProperty("region", "EMEA")
           .setPriority(8);

    // Send the message
    producer.send(queue, "Urgent order requiring immediate processing");
}
Enter fullscreen mode Exit fullscreen mode

Artemis supports the JMS 2.0 simplified consumer API:

// Artemis consumer using JMS 2.0 API with message selector
try (ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
     JMSContext context = cf.createContext(JMSContext.AUTO_ACKNOWLEDGE)) {

    Queue queue = context.createQueue("orders.priority");
    String selector = "priority = 'high' AND region = 'EMEA'";

    // Create consumer with selector to filter messages
    JMSConsumer consumer = context.createConsumer(queue, selector);

    // Synchronous receiving with timeout
    String message = consumer.receiveBody(String.class, 5000);
    if (message != null) {
        System.out.println("Received priority message: " + message);
        // Process message...
    }

    // Or asynchronous with lambda (uncomment to use)
    /*
    consumer.setMessageListener(message -> {
        try {
            String body = message.getBody(String.class);
            System.out.println("Received priority message: " + body);
            // Process message...
        } catch (JMSException e) {
            e.printStackTrace();
        }
    });
    */
}
Enter fullscreen mode Exit fullscreen mode

Artemis provides significant improvements in clustering and high availability, which I've leveraged to build resilient messaging systems.

Redis Pub/Sub

Redis Pub/Sub offers a lightweight messaging solution that excels in scenarios where ultra-low latency is required, and message persistence is optional.

I've used Redis Pub/Sub for real-time notifications and system dashboards where its simplicity and speed were perfect fits.

// Redis publisher using Jedis
Jedis jedis = new Jedis("localhost");
String channel = "user-notifications";
String message = "{\"userId\":\"user123\",\"message\":\"New friend request\",\"timestamp\":1623412800}";

// Publish message to channel
jedis.publish(channel, message);
jedis.close();
Enter fullscreen mode Exit fullscreen mode

Subscribing to Redis channels requires a dedicated connection:

// Redis subscriber using Jedis
Jedis jedisSubscriber = new Jedis("localhost");
JedisPubSub pubSub = new JedisPubSub() {
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("Channel: " + channel + " Message: " + message);
        // Process the message...

        // To stop listening (from another thread):
        // this.unsubscribe();
    }

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("Subscribed to: " + channel);
    }
};

// Subscribe to channel(s) - this is a blocking call
jedisSubscriber.subscribe(pubSub, "user-notifications");
Enter fullscreen mode Exit fullscreen mode

While Redis Pub/Sub doesn't provide message persistence, it can be combined with Redis Streams for a more durable approach:

// Redis Streams example (for persistence)
Jedis jedis = new Jedis("localhost");
String streamKey = "user-activity";
Map<String, String> fields = new HashMap<>();
fields.put("userId", "user123");
fields.put("action", "login");
fields.put("timestamp", String.valueOf(System.currentTimeMillis()));

// Add entry to stream
String entryId = jedis.xadd(streamKey, StreamEntryID.NEW_ENTRY, fields);
System.out.println("Added entry with ID: " + entryId);

// Read from stream
List<StreamEntry> entries = jedis.xread(
    XReadParams.xReadParams().count(10).block(2000),
    Map.of(streamKey, new StreamEntryID("0-0"))
);

for (StreamEntry entry : entries) {
    System.out.println("Entry ID: " + entry.getID());
    for (Map.Entry<String, String> field : entry.getFields().entrySet()) {
        System.out.println(field.getKey() + ": " + field.getValue());
    }
}

jedis.close();
Enter fullscreen mode Exit fullscreen mode

Redis' simplicity makes it an excellent choice for lightweight messaging needs or as a complementary system alongside more robust message brokers.

Integration Patterns and Best Practices

Across all these message brokers, I've found certain integration patterns consistently valuable:

Message idempotency ensures safe message reprocessing, which I implement by adding unique message IDs and tracking processed messages:

// Simplified idempotent consumer example
public class IdempotentMessageProcessor {
    private final Set<String> processedMessageIds = ConcurrentHashMap.newKeySet();

    public void processMessage(String messageId, String content) {
        if (processedMessageIds.contains(messageId)) {
            System.out.println("Message " + messageId + " already processed, skipping");
            return;
        }

        try {
            // Process the message
            System.out.println("Processing message: " + content);

            // Mark as processed
            processedMessageIds.add(messageId);
        } catch (Exception e) {
            System.err.println("Failed to process message: " + e.getMessage());
            // Don't mark as processed to allow retry
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Dead letter queues (DLQs) handle message processing failures gracefully:

// Example of configuring a dead letter queue in ActiveMQ
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Create the original destination
Queue originalQueue = session.createQueue("orders.processing");

// Create the DLQ
Queue dlq = session.createQueue("ActiveMQ.DLQ.orders.processing");

// Configure a producer to use the original queue
MessageProducer producer = session.createProducer(originalQueue);

// Configure the message to use the DLQ if it can't be delivered
TextMessage message = session.createTextMessage("Order data that might fail processing");
message.setIntProperty("JMSXDeliveryCount", 0);
message.setStringProperty("JMS_AMQP_FirstAcquirer", "true");

// Send the message
producer.send(message);
Enter fullscreen mode Exit fullscreen mode

Circuit breakers prevent system overload when downstream services fail:

// Simple circuit breaker pattern for messaging
public class MessagingCircuitBreaker {
    private enum State { CLOSED, OPEN, HALF_OPEN }

    private State state = State.CLOSED;
    private int failureCount = 0;
    private final int failureThreshold;
    private long openTimestamp;
    private final long resetTimeoutMs;

    public MessagingCircuitBreaker(int failureThreshold, long resetTimeoutMs) {
        this.failureThreshold = failureThreshold;
        this.resetTimeoutMs = resetTimeoutMs;
    }

    public synchronized boolean allowRequest() {
        if (state == State.CLOSED) {
            return true;
        } else if (state == State.OPEN) {
            // Check if timeout has elapsed to transition to half-open
            if (System.currentTimeMillis() - openTimestamp > resetTimeoutMs) {
                state = State.HALF_OPEN;
                return true;
            }
            return false;
        } else { // HALF_OPEN
            return true;
        }
    }

    public synchronized void recordSuccess() {
        if (state == State.HALF_OPEN) {
            state = State.CLOSED;
            failureCount = 0;
        }
    }

    public synchronized void recordFailure() {
        failureCount++;
        if (state == State.CLOSED && failureCount >= failureThreshold) {
            state = State.OPEN;
            openTimestamp = System.currentTimeMillis();
        } else if (state == State.HALF_OPEN) {
            state = State.OPEN;
            openTimestamp = System.currentTimeMillis();
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Conclusion

Selecting the right message broker depends on specific requirements:

  • ActiveMQ offers stability and a rich feature set suitable for traditional enterprise integration.
  • RabbitMQ excels at complex message routing patterns needed in microservices architectures.
  • Kafka provides high-throughput stream processing capabilities for data-intensive applications.
  • Artemis delivers next-generation performance for JMS applications.
  • Redis Pub/Sub offers lightweight messaging for real-time updates.

From my experience implementing these systems across various industries, the most successful integrations combine appropriate technology choices with solid architectural patterns. Message brokers not only connect systems but also enable resilient, scalable architectures that can evolve over time.

The adoption of message brokers has consistently improved the reliability and maintainability of the distributed systems I've worked on, allowing for more granular scaling and better fault isolation than traditional monolithic approaches.


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Hot sauce if you're wrong - web dev trivia for staff engineers

Hot sauce if you're wrong · web dev trivia for staff engineers (Chris vs Jeremy, Leet Heat S1.E4)

  • Shipping Fast: Test your knowledge of deployment strategies and techniques
  • Authentication: Prove you know your OAuth from your JWT
  • CSS: Demonstrate your styling expertise under pressure
  • Acronyms: Decode the alphabet soup of web development
  • Accessibility: Show your commitment to building for everyone

Contestants must answer rapid-fire questions across the full stack of modern web development. Get it right, earn points. Get it wrong? The spice level goes up!

Watch Video 🌶️🔥

Top comments (0)

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs

👋 Kindness is contagious

If you found this post useful, consider leaving a ❤️ or a nice comment!

Got it