DEV Community

Robin Moffatt for Confluent

Posted on • Originally published at rmoff.net on

Ingesting XML data into Kafka - Option 2: Kafka Connect plus Single Message Transform

We previously looked at the background to getting XML into Kafka, and potentially how [not] to do it. Now let’s look at the proper way to build a streaming ingestion pipeline for XML into Kafka, using Kafka Connect.

If you’re unfamiliar with Kafka Connect, check out this quick intro to Kafka Connect here. Kafka Connect’s excellent plugable architecture means that we can pair any source connector to read XML from wherever we have it (for example, a flat file, or a MQ, or anywhere else), with a Single Message Transform to transform the XML into a payload with a schema, and finally a converter to serialise the data in a form that we would like to use such as Avro or Protobuf.

👀 Show me 🕵️‍♂️

Note

You can try out all this code by spinning up this Docker Compose

Here’s an example reading XML from a file using the FileStreamSourceConnector (note that this connector plugin is just an example one and not recommended for production use). The output from the source connector is routed through the XML transform that’s configured with the corresponding XSD.

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-file-note-01/config \
    -d '{
    "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "tasks.max": "1",
    "file": "/data/note.xml",
    "topic": "note-01",
    "transforms": "xml",
    "transforms.xml.type": "com.github.jcustenborder.kafka.connect.transform.xml.FromXml$Value",
    "transforms.xml.schema.path": "file:///data/note.xsd",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry:8081"
    }'
Enter fullscreen mode Exit fullscreen mode

The source file looks like this:

<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 01</heading> <body>Don't forget me this weekend!</body> </note>
<note> <to>Jani</to> <from>Tove</from> <heading>Reminder 02</heading> <body>Of course I won't!</body> </note>
<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 03</heading> <body>Where are you?</body> </note>
Enter fullscreen mode Exit fullscreen mode

With this connector running the XML is ingested from the source file and written to the note-01 topic:

docker exec kafkacat kafkacat                      \
        -b broker:29092                            \
        -r http://schema-registry:8081             \
        -s avro                                    \
        -t note-01                                 \
        -C -o beginning -u -q -J | jq -c '.'
Enter fullscreen mode Exit fullscreen mode
{"topic":"note-01","partition":0,"offset":0,"tstype":"create","ts":1601649227632,"broker":1,"key":null,"payload":{"Note":{"to":"Tove","from":"Jani","heading":"Reminder 01","body":"Don't forget me this weekend!"}}}
{"topic":"note-01","partition":0,"offset":1,"tstype":"create","ts":1601649227633,"broker":1,"key":null,"payload":{"Note":{"to":"Jani","from":"Tove","heading":"Reminder 02","body":"Of course I won't!"}}}
{"topic":"note-01","partition":0,"offset":2,"tstype":"create","ts":1601649227633,"broker":1,"key":null,"payload":{"Note":{"to":"Tove","from":"Jani","heading":"Reminder 03","body":"Where are you?"}}}
Enter fullscreen mode Exit fullscreen mode

We’re using Avro to serialise the data here (per value.converter, usually set as a global value in the Kafka Connect worker but included here for clarity), and the purpose of the XML transformation was that it applied the schema as declared in the XSD to the data. Taking one of the messages from the topic and pretty-printing it, it looks like this:

{
    "topic": "note-01",
    "partition": 0,
    "offset": 0,
    "tstype": "create",
    "ts": 1601649227632,
    "broker": 1,
    "key": null,
    "payload": {
    "Note": {
        "to": "Tove",
        "from": "Jani",
        "heading": "Reminder 01",
        "body": "Don't forget me this weekend!"
    }
    }
}
Enter fullscreen mode Exit fullscreen mode

Without this, and if we just ingested the XML into the Kafka topic, it would look like this:

{
    "topic": "note-03",
    "partition": 0,
    "offset": 0,
    "tstype": "create",
    "ts": 1601649524495,
    "broker": 1,
    "key": null,
    "payload": "<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 01</heading> <body>Don't forget me this weekend!</body> </note>"
}
Enter fullscreen mode Exit fullscreen mode

The difference is a payload with a schema that’s ready to be processed by another application, Kafka Connect, or ksqlDB - and a Kafka message that’s a lump of raw XML still.

Notes on kafka-connect-transform-xml

I’ve had a a few challenges getting the SMT to work, in particular with certain schemas. Some things to watch out for:

  • You need to have a clear understanding of two things about how Kafka Connect works:
    1. The source connector will pass a message that it’s read to the Transformation. At this point so far as the XML SMT is concerned it needs to be a complete XML payload. Therefore, for example, if you are using the FileStreamSourceConnector you’ll need to ensure that full XML document is on a single line, since the source connector treats line breaks as message separators, and so the SMT would get a fragment of XML
    2. The SMT will process the whole of the value part of the message. If you have XML as a field within it (for example, reading from a database using the JDBC Source connector, and one field in the table is XML) you’ll need to use the ExtractField transformation in addition (and first) to the XML transform.

To troubleshoot the connector check the Kafka Connect worker log in which you’ll usually see an error explaining the problem. Sometimes you might need to dig deeper and for that there are a couple of useful loggers within the worker that you can bump up to see more of what’s going on:

curl -s -X PUT http://localhost:8083/admin/loggers/org.apache.kafka.connect.runtime.TransformationChain -H "Content-Type:application/json" -d '{"level": "TRACE"}' | jq '.'
curl -s -X PUT http://localhost:8083/admin/loggers/com.github.jcustenborder.kafka.connect -H "Content-Type:application/json" -d '{"level": "TRACE"}' | jq '.'
Enter fullscreen mode Exit fullscreen mode

With these set you’ll now capture the details of where Kafka Connect passes the payload to the transformation, which is very useful for sense-checking if the SMT has the correct data with which to work:

TRACE [source-file-note-02|task-0] Applying transformation
com.github.jcustenborder.kafka.connect.transform.xml.FromXml$Value to
SourceRecord{sourcePartition={filename=/data/note.xml}, sourceOffset={position=443}}
ConnectRecord{topic='note-02', kafkaPartition=null, key=null, keySchema=null, value=<note>
<to>Jani</to> <from>Tove</from> <heading>Reminder 04</heading> <body>I forgot 🤷‍♂️
</body> </note>, valueSchema=Schema{STRING}, timestamp=1601649677317,
headers=ConnectHeaders(headers=)}
(org.apache.kafka.connect.runtime.TransformationChain:47)
Enter fullscreen mode Exit fullscreen mode

You also get to see how the SMT itself is handling the data:

TRACE [source-file-note-02|task-0] process() - input.value() has as schema. schema = Schema{STRING} (com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation:140)
TRACE [source-file-note-02|task-0] toString() - field = 'to' value = 'Jani' (com.github.jcustenborder.kafka.connect.xml.ConnectableHelper:87)
TRACE [source-file-note-02|task-0] toString() - field = 'from' value = 'Tove' (com.github.jcustenborder.kafka.connect.xml.ConnectableHelper:87)
TRACE [source-file-note-02|task-0] toString() - field = 'heading' value = 'Reminder 04' (com.github.jcustenborder.kafka.connect.xml.ConnectableHelper:87)
TRACE [source-file-note-02|task-0] toString() - field = 'body' value = 'I forgot 🤷‍♂️' (com.github.jcustenborder.kafka.connect.xml.ConnectableHelper:87)
Enter fullscreen mode Exit fullscreen mode

Building something useful: Streaming XML messages from IBM MQ into Kafka into MongoDB

Let’s imagine we have XML data on a queue in IBM MQ, and we want to ingest it into Kafka to then use downstream, perhaps in an application or maybe to stream to a NoSQL store like MongoDB.

Streaming XML messages from IBM MQ through Apache Kafka into MongoDB

The configuration to ingest from IBM MQ into Kafka using the IbmMQSourceConnector and XML Transformation looks like this (note the use of the ExtractField transformation as discussed above):

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-ibmmq-note-01/config \
    -d '{
    "connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
    "kafka.topic":"ibmmq-note-01",
    "mq.hostname":"ibmmq",
    "mq.port":"1414",
    "mq.queue.manager":"QM1",
    "mq.transport.type":"client",
    "mq.channel":"DEV.APP.SVRCONN",
    "mq.username":"app",
    "mq.password":"password123",
    "jms.destination.name":"DEV.QUEUE.1",
    "jms.destination.type":"queue",
    "confluent.license":"",
    "confluent.topic.bootstrap.servers":"broker:29092",
    "confluent.topic.replication.factor":"1",
    "transforms": "extractPayload,xml",
    "transforms.extractPayload.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.extractPayload.field": "text",
    "transforms.xml.type": "com.github.jcustenborder.kafka.connect.transform.xml.FromXml$Value",
    "transforms.xml.schema.path": "file:///data/note.xsd",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry:8081"
    }'
Enter fullscreen mode Exit fullscreen mode

For more details on this see this article.

Is this my best option for getting data into Kafka?

I reckon it is. The plug 'n play nature of the Kafka Connect components means that you can happily pair up your connector for the source (be it IBM MQ, JMS, Oracle, or anywhere else you have you data) with the XML transformation, and then serialise the resulting data how you choose (Avro/Protobuf/JSON Schema recommended) using the appropriate converter.

The only downside to the XML transform other than a few glitches is that it requires an XSD, rather than being able to infer and work with XPath in the way the Kafka Connect FilePulse connector does.

The other two options are either a bit of a hack, or the Kafka Connect FilePulse connector. The latter is good but constrained to flat-file input only.

👾 Try it out!

You can find the code to run this for yourself using Docker Compose on GitHub.

Top comments (0)