loading...

Ingesting XML data into Kafka - Option 1: The Dirty Hack

rmoff profile image Robin Moffatt Originally published at rmoff.net on ・10 min read

👉 Ingesting XML data into Kafka - Introduction

What would a blog post from me be if it didn’t include the dirty hack option? 😁

The secret to dirty hacks is that they are often rather effective and when needs must, they can suffice. If you’re prototyping and need to JFDI, a dirty hack is just fine. If you’re looking for code to run in Production, then a dirty hack probably is not fine.

Let’s assume we’ve got XML data sat somewhere that we can access with a shell tool that will output it to stdout. In this example it’s a REST endpoint somewhere that we can poll, like this (courtesy of TfL OpenData). We’re going to use the power of unix pipelines to string together some powerful tools:

  • curl to pull the data from the REST endpoint

  • xq - like the well-known jq tool, but for XML, and outputs JSON

  • kafkacat - takes input from stdin and produces it to a Kafka topic

Wrangling the XML data and streaming it into Kafka

Note

You can try out all this code by spinning up this Docker Compose

Let’s start by checking what we actually want to send to Kafka. The raw payload from the REST endpoint looks like this:

<?xml version="1.0" encoding="utf-8"?>
<stations lastUpdate="1601312340962" version="2.0">
    <station>
        <id>1</id>
        <name>River Street , Clerkenwell</name>
        <terminalName>001023</terminalName>
        <lat>51.52916347</lat></station>
    <station>
        <id>2</id>
        <name>Phillimore Gardens, Kensington</name>
        <terminalName>001018</terminalName>
        <lat>51.49960695</lat>
…
…

Using xq we use the same kind of construction as we would with jq to construct a target JSON object:

curl --show-error --silent https://tfl.gov.uk/tfl/syndication/feeds/cycle-hire/livecyclehireupdates.xml | \
    xq '.'

This gives us a JSON structure that looks like this

{
    "stations": {
    "@lastUpdate": "1601462461108",
    "@version": "2.0",
    "station": [
        {
        "id": "1",
        "name": "River Street , Clerkenwell",
        "terminalName": "001023",
        "lat": "51.52916347",
        
        },
        {
        "id": "2",
        "name": "Phillimore Gardens, Kensington",
        "terminalName": "001018",
        "lat": "51.49960695",
        
        },

We need to decide how to carve up the data, since we’ve effectively got a batch of data here and Kafka works on the concept of messages/records. Therefore we’re going to do this:

  • Take each station element as its own message

  • Add in the lastUpdate value from the stations element into each station message (i.e. denormalise the payload somewhat)

We can use some xq magic to do this, extracting each element from the station array into its own root-level object (.stations.station[]) and adding in the lastUpdate field (+ {lastUpdate: .stations."@lastUpdate"}). If you want to learn more about the power of jq (on which xq is modelled) you can try out this code here.

So with the source REST API data piped through xq we get this:

curl --show-error --silent https://tfl.gov.uk/tfl/syndication/feeds/cycle-hire/livecyclehireupdates.xml | \
    xq '.stations.station[] + {lastUpdate: .stations."@lastUpdate"}'
{
    "id": "1",
    "name": "River Street , Clerkenwell",
    "terminalName": "001023",
    "lat": "51.52916347",

    "lastUpdate": "1601462700830"
}
{
    "id": "2",
    "name": "Phillimore Gardens, Kensington",
    "terminalName": "001018",
    "lat": "51.49960695",

    "lastUpdate": "1601462700830"
}

If we send the data to Kafka in this form using kafkacat we’ll end up with garbled data because each line will be taken as its own message (the line break would act as the default message delineator). To fix this we’ll use the -c flag with xq:

curl --show-error --silent https://tfl.gov.uk/tfl/syndication/feeds/cycle-hire/livecyclehireupdates.xml | \
    xq -c '.stations.station[] + {lastUpdate: .stations."@lastUpdate"}'

Here are our nicely wrangled and presented messages from the source XML, one message per line.

{"id":"1","name":"River Street , Clerkenwell","terminalName":"001023","lat":"51.52916347",,"lastUpdate":"1601462880994"}
{"id":"2","name":"Phillimore Gardens, Kensington","terminalName":"001018","lat":"51.49960695",,"lastUpdate":"1601462880994"}

We’re now in a position to stream this into a Kafka topic, by adding kafkacat to the pipeline:

curl --show-error --silent https://tfl.gov.uk/tfl/syndication/feeds/cycle-hire/livecyclehireupdates.xml | \
    xq -c '.stations.station[] + {lastUpdate: .stations."@lastUpdate"}' | \
    kafkacat -b localhost:9092 -t livecyclehireupdates_01 -P

We can use kafkacat as a consumer too (-C), here specifying -c1 to consume just one message so that we can smoke-test the pipeline:

kafkacat -b localhost:9092 -t livecyclehireupdates_01 -C -c1
{"id":"1","name":"River Street , Clerkenwell","terminalName":"001023","lat":"51.52916347",,"lastUpdate":"1601464200733"}

👍 looks good.

What about keys?

Kafka messages are key/value, and we’ve specified a value but no key. This is where the hack gets just that little bit more hacky. We’re going to use xq to write the id field from the XML payload as a prefix to each message, with a separator so that kafkacat can identify where the key ends and the value stops.

I wrote a separate blog about how this technique works, check it out if you want to know more about it.

Our xq invocation now looks like this:

xq -rc --arg sep $'\x1c' '.stations.station[] + { lastUpdate: .stations."@lastUpdate"} |  [ .id + $sep, tostring] |  join("")'

Which combined with kafkacat looks like this:

curl --show-error --silent https://tfl.gov.uk/tfl/syndication/feeds/cycle-hire/livecyclehireupdates.xml | \
    xq -rc --arg sep $'\x1c' '.stations.station[] + { lastUpdate: .stations."@lastUpdate"} |  [ .id + $sep, tostring] |  join("")' | \
    kafkacat -b localhost:9092 -t livecyclehireupdates_02 -P -K$'\x1c'

Checking the data in the topic with kafkacat we can see that we’ve now set the key as we wanted, taking the value of the id field:

kafkacat -b localhost:9092 \
            -t livecyclehireupdates_02 \
            -C -c2 \
            -f 'Key: %k, Payload: %s\n'

    Key: 1, payload: {"id":"1","name":"River Street , Clerkenwell","terminalName":"001023","lat":"51.52916347",…"lastUpdate":"1601485080861"}
    Key: 2, payload: {"id":"2","name":"Phillimore Gardens, Kensington","terminalName":"001018","lat":"51.49960695",…"lastUpdate":"1601485080861"}

We’ve got data, but no schema

So we now have a Kafka topic with the XML-sourced data in it, but held in plain JSON. For it to be really useful, we want it in a form that is usable by consumers with little-or-no input from the producer of the data, and for that we want to declare and store the schema. I’m going to use ksqlDB for this - you can use other stream processing options such as Kafka Streams if you’d rather.

To start with I’ll declare the schema itself, on top of the topic.

Note

You hopefully see straightaway why serialisation methods that include a schema declaration (Avro/Protobuf/JSON Schema) are easier for the consumer, if only because they don’t have to type the schema in!

CREATE STREAM CYCLE_HIRE_SRC (
        id           VARCHAR KEY
    ,name         VARCHAR
    ,terminalName VARCHAR
    ,lat          DOUBLE
    ,long         DOUBLE
    ,installed    VARCHAR
    ,locked       VARCHAR
    ,installDate  BIGINT
    ,removalDate  BIGINT
    ,temporary    VARCHAR
    ,nbBikes      INT
    ,nbEmptyDocks INT
    ,nbDocks      INT
    ,lastUpdate   BIGINT
) WITH (KAFKA_TOPIC='livecyclehireupdates_02',
        VALUE_FORMAT='JSON',
        TIMESTAMP='lastUpdate');

Now we can project certain fields from the topic to see the schema in action:

SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS LASTUPDATE,
        ID,
        NAME,
        NBBIKES,
        NBEMPTYDOCKS,
        NBDOCKS
    FROM CYCLE_HIRE_SRC
    EMIT CHANGES LIMIT 5;
+---------------------+-------+-------------------------------------+----------+--------------+--------+
|LASTUPDATE           |ID     |NAME                                 |NBBIKES   |NBEMPTYDOCKS  |NBDOCKS |
+---------------------+-------+-------------------------------------+----------+--------------+--------+
|2020-10-01 14:45:00  |1      |River Street , Clerkenwell           |2         |16            |19      |
|2020-10-01 14:45:00  |2      |Phillimore Gardens, Kensington       |13        |24            |37      |
|2020-10-01 14:45:00  |3      |Christopher Street, Liverpool Street |6         |26            |32      |
|2020-10-01 14:45:00  |4      |St. Chad's Street, King's Cross      |14        |7             |23      |
|2020-10-01 14:45:00  |5      |Sedding Street, Sloane Square        |26        |0             |27      |
Limit Reached
Query terminated

A few things to note:

  1. The ID field is taken from the Kafka message key - in theory we could have omitted it from the payload of the message

  2. We’re telling ksqlDB to use the lastUpdate field as the timestamp field for the messages. By default it will simply take the timestamp of the Kafka message itself (by default, when it hit the broker), so this is a useful thing to do, particularly if we do things like time-based windowing or joins. In the query above we’ve validated that it’s worked by showing the ROWTIME field in the selection.

  3. Whilst fields like installed and locked are boolean, they are seen as a string in the JSON model and so need declaring as such. We can fix this in subsequent processing.

At this stage we could just build a stream processing application to continually serialise the data to a new topic with something like Protobuf:

-- Make sure we process all records in the topic
SET 'auto.offset.reset' = 'earliest';

-- Populate a new stream (and thus Kafka topic) with everything from
-- the source stream, serialised to Protobuf
CREATE STREAM CYCLE_HIRE_PROTOBUF_01
    WITH (KAFKA_TOPIC='livecyclehireupdates_protobuf_01',
            VALUE_FORMAT='PROTOBUF') AS
    SELECT * FROM CYCLE_HIRE_SRC;

Looking at the topics on the broker now we can see that there is a new topic livecyclehireupdates_protobuf_01. If we try to read the data as normal it won’t work, and we can see it looks weird, because it’s binary data being read by something that expects just normal strings:

kafkacat -b localhost:9092 \
            -t livecyclehireupdates_protobuf_01 \
            -C -c1

River Street , Clerkenwell001023I@!'H'*true2false8˹%JfalsePX`h.

The correct thing to do is use a Protobuf consumer against it to validate that the data is there and correct:

kafka-protobuf-console-consumer --bootstrap-server localhost:9092 \
                                --from-beginning \
                                --topic livecyclehireupdates_protobuf_01 \
                                --max-messages 1
{"NAME":"River Street , Clerkenwell","TERMINALNAME":"001023","LAT":51.52916347,"LONG":-0.109970527,"INSTALLED":"true","LOCKED":"false","INSTALLDATE":"1278947280000","REMOVALDATE":"0","TEMPORARY":"false","NBBIKES":2,"NBEMPTYDOCKS":16,"NBDOCKS":19,"LASTUPDATE":"1601559900874"}

Wrangling the data

Above I’ve shown you how to simply apply a schema to a Kafka topic that’s in JSON format (it’d work with delimited data too) and serialise it to a new topic in a format that will store the schema in the Schema Registry for use by any consumer.

There are a few things in the data though that would probably benefit from a bit of wrangling, such as:

  • Casting the boolean fields ingested as VARCHAR to BOOLEAN

  • Nesting the lat/long fields into a single location field

You can do that with ksqlDB to do that here too - meaning that anyone wanting to use the data downstream can do so on a cleansed datastream instead of the raw one.

CREATE STREAM CYCLE_HIRE_PROTOBUF_02
    WITH (KAFKA_TOPIC='livecyclehireupdates_protobuf_02',
            VALUE_FORMAT='PROTOBUF') AS
    SELECT ID,
            NAME,
            TERMINALNAME,
            LASTUPDATE,
            STRUCT(LATITUDE := LAT, LONGITUDE:= LONG) AS LOCATION,
            CAST(CASE
                WHEN LCASE(INSTALLED)='false' THEN FALSE
                WHEN LCASE(INSTALLED)='true' THEN TRUE
            END AS BOOLEAN) AS INSTALLED,
            CAST(CASE
                WHEN LCASE(LOCKED)='false' THEN FALSE
                WHEN LCASE(LOCKED)='true' THEN TRUE
            END AS BOOLEAN) AS LOCKED,
            INSTALLDATE,
            REMOVALDATE,
            CAST(CASE
                WHEN LCASE(TEMPORARY)='false' THEN FALSE
                WHEN LCASE(TEMPORARY)='true' THEN TRUE
            END AS BOOLEAN) AS TEMPORARY,
            NBBIKES,
            NBEMPTYDOCKS,
            NBDOCKS
        FROM CYCLE_HIRE_SRC
        EMIT CHANGES;

From this we now have a nice Kafka topic (livecyclehireupdates_protobuf_02) that any consumer can use with full access to a schema to use however they want. The topic is driven by any changes to the source topic - call it streaming ETL, if you like.

ksql> SHOW TOPICS;

 Kafka Topic                      | Partitions | Partition Replicas
--------------------------------------------------------------------
 livecyclehireupdates_02          | 1          | 1
 livecyclehireupdates_protobuf_02 | 1          | 1
--------------------------------------------------------------------
ksql> SELECT NAME, LOCATION, INSTALLED FROM CYCLE_HIRE_PROTOBUF_02 EMIT CHANGES LIMIT 5;
+-------------------------------------+-----------------------------------------------+----------+
|NAME                                 |LOCATION                                       |INSTALLED |
+-------------------------------------+-----------------------------------------------+----------+
|River Street , Clerkenwell           |{LATITUDE=51.52916347, LONGITUDE=-0.109970527} |true      |
|Phillimore Gardens, Kensington       |{LATITUDE=51.49960695, LONGITUDE=-0.197574246} |true      |
|Christopher Street, Liverpool Street |{LATITUDE=51.52128377, LONGITUDE=-0.084605692} |true      |
|St. Chad's Street, King's Cross      |{LATITUDE=51.53005939, LONGITUDE=-0.120973687} |true      |
|Sedding Street, Sloane Square        |{LATITUDE=51.49313, LONGITUDE=-0.156876}       |true      |
Limit Reached
Query terminated

Obligatory ksqlDB materialised view demo

I can’t open up ksqlDB to show streaming ETL like the above without also showing materialised views. These are so cool because they let you take a stream of data in a Kafka topic, and build it into state that you can query, and is kept up to date automagically as any new messages arrive on the underlying topic.

CREATE TABLE CYCLE_HIRE AS
    SELECT ID,
            LATEST_BY_OFFSET(NAME) AS NAME,
            LATEST_BY_OFFSET(NBBIKES) AS NBBIKES,
            LATEST_BY_OFFSET(NBEMPTYDOCKS) AS NBEMPTYDOCKS,
            LATEST_BY_OFFSET(NBDOCKS) AS NBDOCKS,
            LATEST_BY_OFFSET(LASTUPDATE) AS LAST_UPDATE_TS
        FROM CYCLE_HIRE_PROTOBUF_02
        GROUP BY ID;
ksql> SELECT TIMESTAMPTOSTRING(LAST_UPDATE_TS,'yyyy-MM-dd HH:mm:ss','Europe/London') AS TS,
                NAME,
                NBBIKES,
                NBEMPTYDOCKS
        FROM CYCLE_HIRE
        WHERE ID='42';
+------------------------+---------+-------------+
|NAME                    |NBBIKES  |NBEMPTYDOCKS |
+------------------------+---------+-------------+
|Wenlock Road , Hoxton   |2        |26           |
ksql>

This is called a pull query and you can run it from any client application using the REST API. As new messages arrive, the materialised view updates automagically and is reflected whenever its subsequently queried. What about if you want to know as soon as they’re updated? For that you can use a push query, in which you effectively subscribe to any changes, denoted by the EMIT CHANGES clause. As with the pull query you can run this over the REST API too.

In the above image the pull query is in the top half - note how the query exits once complete, and can be re-run to query the current state. The push query is in the lower half, and once run will emit any changes as soon as they are received.

🤔 🧐 This sounds like a bit of a hack - what are my other options for getting XML into Kafka?

This was option 1, or perhaps I should have called it option zero. It’s okay, it does a job, but you’d not bet your call-out rota on it, right? Shell scripts and bits of string have a habit of working great right up until the moment they don’t usually at 0400 in the morning on Christmas Day, or just before a high-profile business event like Black Friday…

So, what are the other options to ingest XML into Kafka, and to do it properly?

👾 Try it out!

You can find the code to run this for yourself using Docker Compose on GitHub.

Discussion

pic
Editor guide