DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

Streaming XML messages from IBM MQ into Kafka into MongoDB

Let’s imagine we have XML data on a queue in IBM MQ, and we want to ingest it into Kafka to then use downstream, perhaps in an application or maybe to stream to a NoSQL store like MongoDB.

Note

This same pattern for ingesting XML will work with other connectors such as JMS and ActiveMQ.

Streaming XML messages from IBM MQ through Apache Kafka into MongoDB

I’ve got a Docker Compose stack running that includes:

  • IBM MQ

  • Apache Kafka (deployed as Confluent Platform to include the all-important Schema Registry)

  • MongoDB

Loading some test data onto IBM MQ

Let’s load some messages onto the queue from an XML file:

docker exec --interactive ibmmq \
  /opt/mqm/samp/bin/amqsput DEV.QUEUE.1 QM1 < data/note.xml
Enter fullscreen mode Exit fullscreen mode

Streaming from IBM MQ to Kafka and translating the XML messages

Now we can ingest this into Kafka using the Kafka Connect with the IbmMQSourceConnector plugin and XML Transformation:

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-ibmmq-note-01/config \
    -d '{
    "connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
    "kafka.topic":"ibmmq-note-01",
    "mq.hostname":"ibmmq",
    "mq.port":"1414",
    "mq.queue.manager":"QM1",
    "mq.transport.type":"client",
    "mq.channel":"DEV.APP.SVRCONN",
    "mq.username":"app",
    "mq.password":"password123",
    "jms.destination.name":"DEV.QUEUE.1",
    "jms.destination.type":"queue",
    "confluent.license":"",
    "confluent.topic.bootstrap.servers":"broker:29092",
    "confluent.topic.replication.factor":"1",
    "transforms": "extractPayload,xml",
    "transforms.extractPayload.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.extractPayload.field": "text",
    "transforms.xml.type": "com.github.jcustenborder.kafka.connect.transform.xml.FromXml$Value",
    "transforms.xml.schema.path": "file:///data/note.xsd",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry:8081"
    }'
Enter fullscreen mode Exit fullscreen mode

Note

ExtractField is needed otherwise the XML transform will fail with java.lang.UnsupportedOperationException: STRUCT is not a supported type. since it will be trying to operate on the entire payload from IBM MQ which includes fields other than the XML that we’re interested in.

The resulting Kafka topic holds the value of the text field in the messages, serialised in Avro:

docker exec kafkacat \
    kafkacat                            \
      -b broker:29092                   \
      -r http://schema-registry:8081    \
      -s key=s -s value=avro            \
      -t ibmmq-note-01                  \
      -C -o beginning -u -q -J | \
    jq -c '.payload'
Enter fullscreen mode Exit fullscreen mode
{"Note":{"to":"Tove","from":"Jani","heading":"Reminder 01","body":"Don't forget me this weekend!"}}
{"Note":{"to":"Jani","from":"Tove","heading":"Reminder 02","body":"Of course I won't!"}}

Enter fullscreen mode Exit fullscreen mode

To understand more about the concepts around getting XML data into Kafka see here, and I’ve written about the specifics of Kafka Connect and the XML transformation here.

Streaming the data from Kafka to MongoDB

We can then add another Kafka Connect connector to the pipeline, using the official plugin for Kafka Connect from MongoDB, which will stream data straight from a Kafka topic into MongoDB:

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/sink-mongodb-note-01/config \
    -d '{
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "topics":"ibmmq-note-01",
    "connection.uri":"mongodb://mongodb:27017",
    "database":"rmoff",
    "collection":"notes",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry:8081"
    }'
Enter fullscreen mode Exit fullscreen mode

Check out the data in MongoDB:

docker exec --interactive mongodb mongo localhost:27017 <<EOF
use rmoff
db.notes.find()
EOF
Enter fullscreen mode Exit fullscreen mode
MongoDB shell version v4.4.1
connecting to: mongodb://localhost:27017/test?compressors=disabled&gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("9aae83c4-0e25-43a9-aca5-7278d366423b") }
MongoDB server version: 4.4.1
switched to db rmoff
{ "_id" : ObjectId("5f77b64eee00df1cc80135a1"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 01", "body" : "Don't forget me this weekend!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a2"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 02", "body" : "Of course I won't!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a3"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 03", "body" : "Where are you?" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a4"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 04", "body" : "I forgot 🤷‍♂️" }
bye
Enter fullscreen mode Exit fullscreen mode

Let’s check that this is actually streaming, by sending another record to the MQ:

echo "<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 05</heading> <body>Srsly?</body> </note>" | docker exec --interactive ibmmq /opt/mqm/samp/bin/amqsput DEV.QUEUE.1 QM1

Sample AMQSPUT0 start
target queue is DEV.QUEUE.1
Sample AMQSPUT0 end
Enter fullscreen mode Exit fullscreen mode

And, behold, the new record in MongoDB:

docker exec --interactive mongodb mongo localhost:27017 <<EOF
use rmoff
db.notes.find()
EOF
Enter fullscreen mode Exit fullscreen mode
MongoDB shell version v4.4.1
connecting to: mongodb://localhost:27017/test?compressors=disabled&gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("2641e93e-9c5d-4270-8f64-e52295a60309") }
MongoDB server version: 4.4.1
switched to db rmoff
{ "_id" : ObjectId("5f77b64eee00df1cc80135a1"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 01", "body" : "Don't forget me this weekend!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a2"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 02", "body" : "Of course I won't!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a3"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 03", "body" : "Where are you?" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a4"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 04", "body" : "I forgot 🤷‍♂️" }
{ "_id" : ObjectId("5f77b77cee00df1cc80135a6"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 05", "body" : "Srsly?" }
bye
Enter fullscreen mode Exit fullscreen mode

What if my data’s not in XML? What if we want other fields from the payload?

In the example above we’re taking data from the source system (IBM MQ) and Kafka Connect is applying a schema to the field called text within it (the XML transformation does this, based on the supplied XSD). When it’s written to Kafka it’s serialised using the selected converter which since it’s Avro stores the schema in the Schema Registry. This is a Good Way of doing things, since we retain the schema for use by any consumer. We could use Protobuf or JSON Schema here too if we wanted. If this doesn’t all make sense to you then check out Schemas, Schmeeeemas / Why not just JSON?.

But the full payload that comes through from IBM MQ looks like this:

messageID=ID:414d5120514d3120202020202020202060e67a5f06352924
messageType=text
timestamp=1601893142430
deliveryMode=1
redelivered=false
expiration=0
priority=0
properties={JMS_IBM_Format=Struct{propertyType=string,string=MQSTR   }, 
            JMS_IBM_PutDate=Struct{propertyType=string,string=20201005}, 
            JMS_IBM_Character_Set=Struct{propertyType=string,string=ISO-8859-1}, 
            JMSXDeliveryCount=Struct{propertyType=integer,integer=1}, 
            JMS_IBM_MsgType=Struct{propertyType=integer,integer=8}, 
            JMSXUserID=Struct{propertyType=string,string=mqm         }, 
            JMS_IBM_Encoding=Struct{propertyType=integer,integer=546}, 
            JMS_IBM_PutTime=Struct{propertyType=string,string=10190243}, 
            JMSXAppID=Struct{propertyType=string,string=amqsput                     }, 
            JMS_IBM_PutApplType=Struct{propertyType=integer,integer=6}}
text=<note> <to>Jani</to> <from>Tove</from> <heading>Reminder 02</heading> <body>Of course I won't!</body> </note>
Enter fullscreen mode Exit fullscreen mode

If we want to retain some or all of these fields, we’re going to have to approach things a different way. As things stand, there is no Single Message Transform that I’m aware of that can take both the non-XML fields and the XML field and wrangle them into a single structured schema (which is the ideal outcome, or perhaps putting the non-XML fields into the Kafka message header). By default the IBM MQ Source Connector will write the full payload to a schema. This means that you still use a schema-supporting serialisation method, but the text payload field remains unparsed.

Here’s an example:

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-ibmmq-note-03/config \
    -d '{
    "connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
    "kafka.topic":"ibmmq-note-03",
    "mq.hostname":"ibmmq",
    "mq.port":"1414",
    "mq.queue.manager":"QM1",
    "mq.transport.type":"client",
    "mq.channel":"DEV.APP.SVRCONN",
    "mq.username":"app",
    "mq.password":"password123",
    "jms.destination.name":"DEV.QUEUE.1",
    "jms.destination.type":"queue",
    "confluent.license":"",
    "confluent.topic.bootstrap.servers":"broker:29092",
    "confluent.topic.replication.factor":"1",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry:8081"
    }'
Enter fullscreen mode Exit fullscreen mode

Now the full IBM MQ message is written to a Kafka topic, serialised with a schema. We can deserialise it with something like kafkacat:

kafkacat                                \
      -b broker:29092                   \
      -r http://schema-registry:8081    \
      -s key=s -s value=avro            \
      -t ibmmq-note-03                  \
      -C -c1 -o beginning -u -q -J | \
    jq  '.'
Enter fullscreen mode Exit fullscreen mode
{
  "topic": "ibmmq-note-03",
  "partition": 0,
  "offset": 0,
  "tstype": "create",
  "ts": 1601894073400,
  "broker": 1,
  "key": "Struct{messageID=ID:414d5120514d3120202020202020202060e67a5f033a2924}",
  "payload": {
    "messageID": "ID:414d5120514d3120202020202020202060e67a5f033a2924",
    "messageType": "text",
    "timestamp": 1601894073400,
    "deliveryMode": 1,
    "properties": {
      "JMS_IBM_Format": {
        "propertyType": "string",
        "boolean": null,
        "byte": null,
        "short": null,
        "integer": null,
        "long": null,
        "float": null,
        "double": null,
        "string": {
          "string": "MQSTR   "
        }
      },
    
    "map": null,
    "text": {
      "string": "<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 01</heading> <body>Don't forget me this weekend!</body> </note>"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Observe that the text field is just a string, holding [what happens to be] XML.

You can use ksqlDB to work with the data, to an extent - although there’s currently no support for handing the XML:

SELECT "PROPERTIES"['JMSXAppID']->STRING as JMSXAppID,
        "PROPERTIES"['JMS_IBM_PutTime']->STRING as JMS_IBM_PutTime,
        "PROPERTIES"['JMSXDeliveryCount']->INTEGER as JMSXDeliveryCount,
        "PROPERTIES"['JMSXUserID']->STRING as JMSXUserID,
        text
  FROM IBMMQ_SOURCE
  EMIT CHANGES;
Enter fullscreen mode Exit fullscreen mode
+-----------+-----------------+-------------------+------------+------------------------------------+
|JMSXAPPID  |JMS_IBM_PUTTIME  |JMSXDELIVERYCOUNT  |JMSXUSERID  |TEXT                                |
+-----------+-----------------+-------------------+------------+------------------------------------+
|amqsput    |10302905         |1                  |mqm         |<note> <to>Jani</to> <from>Tove</fro|
|           |                 |                   |            |m> <heading>Reminder 02</heading> <b|
|           |                 |                   |            |ody>Of course I won't!</body> </note|
|           |                 |                   |            |>                                   |
|amqsput    |10302905         |1                  |mqm         |<note> <to>Tove</to> <from>Jani</fro|
|           |                 |                   |            |m> <heading>Reminder 03</heading> <b|
|           |                 |                   |            |ody>Where are you?</body> </note>   |
Enter fullscreen mode Exit fullscreen mode

👾 Try it out!

You can find the code to run this for yourself using Docker Compose on GitHub.

Top comments (1)

Collapse
 
manuelbrs profile image
Juan Manuel Bello

Great, thanks for sharing