loading...
Confluent

Ingesting XML data into Kafka - Option 3: Kafka Connect FilePulse connector

rmoff profile image Robin Moffatt Originally published at rmoff.net on ・6 min read

👉 Ingesting XML data into Kafka - Introduction

We saw in the first post how to hack together an ingestion pipeline for XML into Kafka using a source such as curl piped through xq to wrangle the XML and stream it into Kafka using kafkacat, optionally using ksqlDB to apply and register a schema for it.

The second one showed the use of any Kafka Connect source connector plus the kafka-connect-transform-xml Single Message Transformation. Now we’re going to take a look at a source connector from the community that can also be used to ingest XML data into Kafka.

FilePulse is an Apache 2.0 licensed connector written by Florian Hussonnois. It supports ingestion from flat files in lots of different formats, including XML. Florian wrote a useful blog about it here.

Ingesting XML data into Kafka with Kafka Connect and the FilePulse connector

Using a simple XML source file I first tried this, copied from based on the tutorial

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-filepulse-xml-00/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/data/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.xml$",
        "offset.strategy":"name",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
        "topic":"books-00",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"_connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1
    }'
Enter fullscreen mode Exit fullscreen mode

This failed at the point at which Kafka Connect tried to serialise the root element (x:books) to Avro

Caused by: org.apache.avro.SchemaParseException: Illegal character in: X:books
        at org.apache.avro.Schema.validateName(Schema.java:1530)
        at org.apache.avro.Schema.access$400(Schema.java:87)
        at org.apache.avro.Schema$Name.<init>(Schema.java:673)
        at org.apache.avro.Schema.createRecord(Schema.java:212)
Enter fullscreen mode Exit fullscreen mode

The XML looks like this:

<?xml version="1.0"?>
<x:books xmlns:x="urn:books">
    <book id="bk001">
        <author>Writer</author>
        <title>The First Book</title>
Enter fullscreen mode Exit fullscreen mode

Since we don’t want that root element anyway we can use an XPath to specify which bits we do want, with the xpath.expression configuration element.

A useful way to figure out your XPath is to run xmllint --shell <your xml file> and navigate around the structure to figure it out. The great thing about old established technologies is that there’s a ton of resources on Google from people hitting the same problems in the past - this from 2010 helped me out in writing this! My XPath expression was simply /*/book:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-filepulse-xml-01/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/data/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.xml$",
        "offset.strategy":"name",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
        "xpath.expression": "/*/book",
        "topic":"books-01",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"_connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1
    }'
Enter fullscreen mode Exit fullscreen mode

This worked, and we can confirm that using a consumer against the topic - here I’m using ksqlDB just cos it’s quicker:

ksql> PRINT 'books-01' FROM BEGINNING;
Key format: ¯\_()_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2020/10/02 11:26:45.222 Z, key: <null>, value: {"id": "bk001", "author": "Writer", "title": "The First Book", "genre": "Fiction", "price": "44.95", "pub_date": "2000-10-01", "review": "An amazing story of nothing."}
rowtime: 2020/10/02 11:26:45.226 Z, key: <null>, value: {"id": "bk002", "author": "Poet", "title": "The Poet's First Poem", "genre": "Poem", "price": "24.95", "pub_date": "2000-10-01", "review": "Least poetic poems."}
Enter fullscreen mode Exit fullscreen mode

The value has been serialised as Avro, with the schema inferred from the XML itself. We can verify it by looking it up from the Schema Registry:

docker exec --tty schema-registry \
    curl -s "http://localhost:8081/subjects/books-01-value/versions/1" | \
    jq '.schema|fromjson[1]'
Enter fullscreen mode Exit fullscreen mode
{
    "type": "record",
    "name": "ConnectDefault",
    "namespace": "io.confluent.connect.avro",
    "fields": [
    { "name": "id", "type": [ "null", "string" ], "default": null },
    { "name": "author", "type": [ "null", "string" ], "default": null },
    { "name": "title", "type": [ "null", "string" ], "default": null },
    
Enter fullscreen mode Exit fullscreen mode

Avro is set as the default converter in my Kafka Connect worker configuration; I could override it if I wanted to use Protobuf, for example, by setting the necessary value.converter configuration:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-filepulse-xml-02/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/data/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.xml$",
        "offset.strategy":"name",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
        "xpath.expression": "/*/book",
        "topic":"books-02",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"_connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1,
        "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
        "value.converter.schema.registry.url":"http://schema-registry:8081"
    }'
Enter fullscreen mode Exit fullscreen mode

This time the data’s written as Protobuf, which we can validate also from ksqlDB (it takes a best-guess at the serialisation method when it reads the messages, and automagically picks the appropriate deserialiser):

ksql> PRINT 'books-02' FROM BEGINNING;
Key format: ¯\_()_/¯ - no data processed
Value format: PROTOBUF or KAFKA_STRING
rowtime: 2020/10/02 11:31:34.066 Z, key: <null>, value: id: "bk001" author: "Writer" title: "The First Book" genre: "Fiction" price: "44.95" pub_date: "2000-10-01" review: "An amazing story of nothing."
rowtime: 2020/10/02 11:31:34.068 Z, key: <null>, value: id: "bk002" author: "Poet" title: "The Poet\'s First Poem" genre: "Poem" price: "24.95" pub_date: "2000-10-01" review: "Least poetic poems."
Enter fullscreen mode Exit fullscreen mode

A bit of ksqlDB

With the data streaming into a Kafka topic from flat file, we can do this:

ksql> CREATE STREAM BOOKS WITH (KAFKA_TOPIC='books-02',VALUE_FORMAT='PROTOBUF');

 Message
----------------
 Stream created
----------------
ksql>
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT * FROM BOOKS EMIT CHANGES LIMIT 2;
+--------+---------+-----------------------+---------+--------+------------+----------------------------+
|ID      |AUTHOR   |TITLE                  |GENRE    |PRICE   |PUB_DATE    |REVIEW                      |
+--------+---------+-----------------------+---------+--------+------------+----------------------------+
|bk001   |Writer   |The First Book         |Fiction  |44.95   |2000-10-01  |An amazing story of nothing |
|bk002   |Poet     |The Poet's First Poem  |Poem     |24.95   |2000-10-01  |Least poetic poems.         |
Limit Reached
Query terminated
Enter fullscreen mode Exit fullscreen mode

For more permutations of XML ingest with FilePulse check out this blog.

What are my other options for getting XML into Kafka?

FilePulse worked great here, and it clearly has a lot of flexibility its processing and file handling options. It’s also really handy that it can infer the schema of the payload from the XML without requiring an XSD.

But what if your data isn’t in a flat file? Unfortunately in this situation you will need to reach for another option:

👾 Try it out!

You can find the code to run this for yourself using Docker Compose on GitHub.

Discussion

pic
Editor guide