DEV Community

Tech Community for Software AG Tech Community

Posted on • Originally published at tech.forums.softwareag.com on

Kafka JMS How-to using Confluent JMS client

Note:
If you are trying out in windows, download the Kafka version kafka_2.12-2.8.1 (For other versions there are some permission issues in windows like accessing/writing to the tmp location where the kafka logs are stored). Try removing C:\tmp\kafka-logs directory, sometimes that helps to resolve the issue.

Note:
SoftwareAG has not certified this combination officially.

Follow the steps below to setup a producer/consumer test scenario

  1. Download Apache Kafka

  2. Extract to a location (ex: C:\kafka\kafka_2.12-2.8.1)

  3. Start ZooKeeper service

C:\kafka\kafka_2.12-2.8.1\bin\windows>zookeeper-server-start.bat …/…/config/zookeeper.properties

  1. Start Kafka Broker

C:\kafka\kafka_2.12-2.8.1\bin\windows>kafka-server-start.bat …/…/config/server.properties

  1. Create Topic

C:\kafka\kafka_2.12-2.8.1\bin\windows>kafka-topics.bat --create --topic t1 --bootstrap-server localhost:9092

  1. Download confluent JMS client library with all it’s dependencies, mostly the following list
  • guava
  • connect api
  • connect runtime
  • JMS spec 1.1
  • jose4j
  • kafka clients
  • kafka jms client
  • protobuf
  • protobuf java util
  • jackson
  1. JNDI configuration

    Initial context factory: io.confluent.kafka.jms.KafkaInitialContextFactory

    Consider providing all other properties like Topic, client id etc…

  2. We could either use a jndi properties file or directly provide them into the JNDI content properties, like

    java.naming.factory.initial=io.confluent.kafka.jms.KafkaInitialContextFactory

    bootstrap.servers=localhost:9092

    topic.t1=t1

    queue.q1=q1

    confluent.topic.replication.factor=3 (change based on requirement)

    client.id=my-test-client

Sample program to test the connection

import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class KafkaConnect {

    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "t1";
    private static final String CLIENT_ID = "testClient";

    public static void main(String[] args) {

        Connection conn = null;
        InitialContext _jndiContext = null;

        try {
            Properties props = new Properties();

            props.put("java.naming.factory.initial", "io.confluent.kafka.jms.KafkaInitialContextFactory");

            props.put("bootstrap.servers", "localhost:9092");
            props.put("topic.t1", "t1");
            props.put("queue.q1", "q1");
            props.put("confluent.topic.replication.factor", "3");
            props.put("client.id", "my-test-client");

            // Initialize the JNDI context
            Context ctx = new InitialContext(props);
            System.out.println("-----Initial Content Initialized------");

            // Lookup the connection factory to get a connection
            _jndiContext = new InitialContext(props);
            ConnectionFactory cf = (ConnectionFactory) _jndiContext.lookup("ConnectionFactory");
            System.out.println("-----Connection Factory lookup success-----");

            // Start the JMS connection from the factory
            conn = cf.createConnection();
            conn.start();
            System.out.println("-----Connection Started-----" + conn);

//This is just a sample, change the producer and consumer code according to your need
            new ProduceAndConsumerMessage(conn, (Destination) _jndiContext.lookup("q1")).sendReceive();

            conn.close();
            System.out.println("-----Connection Closed-----");
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (_jndiContext != null) {
                try {
                    _jndiContext.close();
                } catch (NamingException e) {
                    e.printStackTrace();
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

class ProduceAndConsumerMessage {

    Connection connection;
    Destination destination;
    javax.jms.MessageProducer producer;
    Session session;
    javax.jms.MessageConsumer consumer;

    public ProduceAndConsumerMessage(Connection connection, Destination destination) throws JMSException {

        this.connection = connection;
        this.destination = destination;     
    }

    public void sendReceive() throws JMSException {

        try {
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            TextMessage message = session.createTextMessage();
            producer = session.createProducer(destination);
            producer.send(destination, message);
            System.out.println("Sent message");

            //consume
            consumer = session.createConsumer(destination);
            Message msg = consumer.receive(500);
            if (msg != null) {
                System.out.println("Received message: " + msg.getJMSMessageID());
            }
        } finally {
            if (producer != null) {
                producer.close();
            }
            if (consumer != null) {
                consumer.close();
            }
            if (session != null) {
                session.close();
            }           
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

Trying out with Integration Server JMS

  1. For JNDI configuration, provide all properties in “Other Properties” section

  2. While creating the JMS connection, uncheck the “Create Temporary Queue” option as Kafka does not have a concept of a temporary queue.

  3. Use pub.jms:send to send a message to a destination

Note:
Currently the Integration Server JMS consumer implementation uses the javax.jms.Session.createConsumer(Destination, String) JMS API to create a consumer with a message selector, which kafka doesn’t support. So you can’t use JMS triggers for this combination and to add this support contact the product team.

Visit the original article in the Software AG Tech Community to check out the demo.

Top comments (0)