loading...

Streaming messages from RabbitMQ into Kafka with Kafka Connect

rmoff profile image Robin Moffatt Originally published at rmoff.net on ・7 min read

This was prompted by a question on StackOverflow to which I thought the answer would be straightforward but turned out not to be so. And then I got a bit carried away and ended up with a nice example of how you can handle schema-less data coming from a system such as RabbitMQ and apply a schema to it.

What? Why?

aeveltstra had this very good question. I mean, RabbitMQ and Kafka, why would you want to integrate them? Seems weird, right? Here's my quick take on it:

What? Why?

Often RabbitMQ is in use already and it's easier to stream the messages from it into Kafka (e.g. to use with ksqlDB, drive other Kafka apps, persist for analysis elsewhere, etc) than it is to re-plumb the existing application(s) that are using RabbitMQ.

Why would you choose to implement Kafka next to an existing RabbitMQ? Don't they serve the same purpose?

There is overlap for sure. Kafka tends to come into its own for things like scale, persistence, stream processing, integration with other technologies, and so on. In a large organisation (or even not-so-large) you'll often find different teams adopting different technologies, and so whilst it may seem odd to have both, you'll quite often see these similar (in part) technologies side-by-side.

NOTE: If you want to learn more about this, check out Comparing Confluent Platform with Traditional Messaging Middleware

Setup (optional)

If you want to follow along with this example, you can use Docker Compose to create the environment.

  1. Clone the repo
git clone https://github.com/confluentinc/demo-scene.git
cd demo-scene/rabbitmq-into-kafka
  1. Bring up the test environment
docker-compose up -d

Create queue and test message on RabbitMQ

This uses the Management API which has been enabled on the Docker container automagically.

  1. Create the queue
curl --user guest:guest \
      -X PUT -H 'content-type: application/json' \
      --data-binary '{"vhost":"/","name":"test-queue-01","durable":"true","auto_delete":"false","arguments":{"x-queue-type":"classic"}}' \
      'http://localhost:15672/api/queues/%2F/test-queue-01'
  1. Confirm that the queue has been created
curl -s --user guest:guest \
        -X GET -H 'content-type: application/json' \
        'http://localhost:15672/api/queues/%2F/' | jq '.[].name'

"test-queue-01"
  1. Send a test message
echo '{"vhost":"/","name":"amq.default","properties":{"delivery_mode":1,"headers":{}},"routing_key":"test-queue-01","delivery_mode":"1","payload":"{\"transaction\": \"PAYMENT\", \"amount\": \"$125.0\", \"timestamp\": \"'$(date)'\" }","headers":{},"props":{},"payload_encoding":"string"}' |
curl --user guest:guest \
      -X POST -H 'content-type: application/json' \
      --data-binary @- \
      'http://localhost:15672/api/exchanges/%2F/amq.default/publish'
  1. Test consuming messages from the queue
curl --silent --user guest:guest \
        -X POST -H 'content-type: application/json' \
        --data-binary '{"ackmode":"ack_requeue_true","encoding":"auto","count":"10"}' \
        'http://localhost:15672/api/queues/%2F/test-queue-01/get' | jq '.[].payload|fromjson'

{
  "transaction": "PAYMENT",
  "amount": "$125.0",
  "timestamp": "Wed 8 Jan 2020 10:41:45 GMT"
}

You can see the RabbitMQ Web UI (login guest/guest) at http://localhost:15672/#/

Create the Kafka Connect connector

This uses the RabbitMQ plugin for Kafka Connect, which has been installed in the Docker container already. You can install it yourself from Confluent Hub.

curl -i -X PUT -H "Content-Type:application/json" \
    http://localhost:8083/connectors/source-rabbitmq-00/config \
    -d '{
        "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
        "kafka.topic" : "rabbit-test-00",
        "rabbitmq.queue" : "test-queue-01",
        "rabbitmq.username": "guest",
        "rabbitmq.password": "guest",
        "rabbitmq.host": "rabbitmq",
        "rabbitmq.port": "5672",
        "rabbitmq.virtual.host": "/",
        "confluent.license":"",
        "confluent.topic.bootstrap.servers":"kafka:29092",
        "confluent.topic.replication.factor":1,
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    } '

With the connector created we check that it’s running:

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
           jq '. | to_entries[] | [.value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
           column -s : -t| sed 's/\"//g'| sort

source | source-rabbitmq-00 | RUNNING | RUNNING | io.confluent.connect.rabbitmq.RabbitMQSourceConnector

And then we can check the topic that’s being written to. Here I’m using kafkacat but you can use any Kafka consumer:

docker exec kafkacat \
  kafkacat -b kafka:29092 \
           -t rabbit-test-00
           -C -u

The message we sent to RabbitMQ shows up in Kafka:

{"transaction": "PAYMENT", "amount": "$125.0", "timestamp": "Wed 8 Jan 2020 10:41:45 GMT"}

If you open another window and use the same curl statement (bottom pane) above to send more messages to RabbitMQ, you’ll see them appear in the Kafka topic (top pane) straight away:

One of the important things to note in the configuration of the connector is that we’re using the ByteArrayConverter for the value of the message, which just takes whatever bytes are on the RabbitMQ message and writes them to the Kafka message. Whilst on first look it appears that we’ve got a JSON message on RabbitMQ and so would evidently use the JsonConverter, this is not the case. If we do that, the converter will try to encode the bytes as JSON, and we’ll end up with this:

"eyJ0cmFuc2FjdGlvbiI6ICJQQVlNRU5UIiwgImFtb3VudCI6ICIkNDcuMyIsICJ0aW1lc3RhbXAiOiAiV2VkIDggSmFuIDIwMjAgMTM6MDE6MjEgR01UIiB9"

To understand more about converters and serialisation see this article: Kafka Connect Deep Dive – Converters and Serialization Explained

We can dig into the payload further with kafkacat to examine the headers etc:

docker exec kafkacat \
  kafkacat -b kafka:29092 -t rabbit-test-00 -C -u -q \
  -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\nHeaders: %h\nKey (%K bytes): %k\nPayload (%S bytes): %s\n--\n'

The output looks like this:

Topic rabbit-test-00 / Partition 0 / Offset: 48 / Timestamp: 1578480718010
Headers: rabbitmq.consumer.tag=amq.ctag--gWoke550mjIPbeJhquS9g,rabbitmq.content.type=NULL,rabbitmq.content.encoding=NULL,rabbitmq.delivery.mode=1,rabbitmq.priority=0,rabbitmq.correlation.id=NULL,rabbitmq.reply.to=NULL,rabbitmq.expiration=NULL,rabbitmq.message.id=NULL,rabbitmq.timestamp=NULL,rabbitmq.type=NULL,rabbitmq.user.id=NULL,rabbitmq.app.id=NULL,rabbitmq.delivery.tag=45,rabbitmq.redeliver=false,rabbitmq.exchange=,rabbitmq.routing.key=test-queue-01
Key (-1 bytes):
Payload (91 bytes): {"transaction": "PAYMENT", "amount": "$125.0", "timestamp": "Wed 8 Jan 2020 10:51:57 GMT" }

Apply a schema and manipulate the data in ksqlDB

So far we’ve got the message (which happens to be JSON) from RabbitMQ into a Kafka topic. Now let’s actually declare the schema so that we can work with the data. For that we’re going to use ksqlDB to do a little bit of stream processing.

Fire up the ksqlDB CLI:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

Inspect the raw topic contents:

ksql> PRINT 'rabbit-test-00' FROM BEGINNING;
Format:JSON
{"ROWTIME":1578477403591,"ROWKEY":"null","transaction":"PAYMENT","amount":"$125.0"}
{"ROWTIME":1578477598555,"ROWKEY":"null","transaction":"PAYMENT","amount":"$125.0"}
{"ROWTIME":1578478171540,"ROWKEY":"null","transaction":"PAYMENT","amount":"$125.0"}
{"ROWTIME":1578480036859,"ROWKEY":"null","transaction":"PAYMENT","amount":"$125.0","timestamp":"Wed 8 Jan 2020 10:40:36 GMT"}
{"ROWTIME":1578480105771,"ROWKEY":"null","transaction":"PAYMENT","amount":"$125.0","timestamp":"Wed 8 Jan 2020 10:41:45 GMT"}

Declare the stream (which is just the existing Kafka topic with an explicit schema):

CREATE STREAM rabbit (transaction VARCHAR,
                      amount VARCHAR,
                      timestamp VARCHAR)
  WITH (KAFKA_TOPIC='rabbit-test-00',
        VALUE_FORMAT='JSON');

Now we can query the stream of data, starting at the beginning:

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.

ksql> SELECT transaction, amount, timestamp FROM rabbit EMIT CHANGES;
+------------+---------+----------------------------+
|TRANSACTION |AMOUNT   |TIMESTAMP                   |
+------------+---------+----------------------------+
|PAYMENT     |$125.0   |null                        |
|PAYMENT     |$125.0   |null                        |
|PAYMENT     |$125.0   |null                        |
|PAYMENT     |$125.0   |Wed 8 Jan 2020 10:40:36 GMT |
|PAYMENT     |$125.0   |Wed 8 Jan 2020 10:41:45 GMT |

The AMOUNT column is clearly a currency, but the source data is a character string ($125.0). Let’s write a stream processor to split these into more appropriate columns, and also drop messages with no timestamp (that we’ll class as invalid data for this example):

CREATE STREAM TRANSACTIONS WITH (VALUE_FORMAT='AVRO') AS
  SELECT TRANSACTION AS TX_TYPE,
         SUBSTRING(AMOUNT,1,1) AS CURRENCY,
         CAST(SUBSTRING(AMOUNT,2,LEN(AMOUNT)-1) AS DECIMAL(9,2)) AS TX_AMOUNT,
         TIMESTAMP AS TX_TIMESTAMP
    FROM rabbit
   WHERE TIMESTAMP IS NOT NULL
    EMIT CHANGES;

This creates a new Kafka topic, populated by the transformed data driven by the original Kafka topic populated from RabbitMQ:

ksql> SELECT TX_TYPE, CURRENCY, TX_AMOUNT, TX_TIMESTAMP FROM TRANSACTIONS EMIT CHANGES;
+--------+----------+----------+----------------------------+
|TX_TYPE |CURRENCY  |TX_AMOUNT |TX_TIMESTAMP                |
+--------+----------+----------+----------------------------+
|PAYMENT |$         |125.00    |Wed 8 Jan 2020 10:40:36 GMT |
|PAYMENT |$         |125.00    |Wed 8 Jan 2020 10:41:45 GMT |

Note that the messages without a timestamp are not present in the new stream.

Compare our source schema:

ksql> DESCRIBE rabbit;

Name : RABBIT
 Field | Type
-----------------------------------------
 ROWTIME | BIGINT (system)
 ROWKEY | VARCHAR(STRING) (system)
 TRANSACTION | VARCHAR(STRING)
 AMOUNT | VARCHAR(STRING)
 TIMESTAMP | VARCHAR(STRING)
-----------------------------------------

with the transformed schema

ksql> DESCRIBE TRANSACTIONS;

Name : TRANSACTIONS
 Field | Type
------------------------------------------
 ROWTIME | BIGINT (system)
 ROWKEY | VARCHAR(STRING) (system)
 TX_TYPE | VARCHAR(STRING)
 CURRENCY | VARCHAR(STRING)
 TX_AMOUNT | DECIMAL
 TX_TIMESTAMP | VARCHAR(STRING)
------------------------------------------

Because we’ve applied a schema to the data we can now make better sense of it, as well as do useful things like write it to a database. Since we have a proper schema for the data (stored for us in the Schema Registry because we’re using Avro) Kafka Connect can actually build the target database table that it’s going to write data to:

CREATE SINK CONNECTOR SINK_POSTGRES WITH (
    'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url' = 'jdbc:postgresql://postgres:5432/',
    'connection.user' = 'postgres',
    'connection.password' = 'postgres',
    'topics' = 'TRANSACTIONS',
    'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
    'auto.create' = 'true',
    'transforms' = 'dropSysCols',
    'transforms.dropSysCols.type' = 'org.apache.kafka.connect.transforms.ReplaceField$Value',
    'transforms.dropSysCols.blacklist' = 'ROWKEY,ROWTIME'
  );

Now in Postgres we have the data almost as soon as it’s written to RabbitMQ, with the light transformation applied to it:

Posted on by:

rmoff profile

Robin Moffatt

@rmoff

Robin Moffatt is a Developer Advocate at Confluent, and regular conference speaker. He also likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Discussion

markdown guide