DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

πŸŽ„ Twelve Days of SMT πŸŽ„ - Day 4: RegExRouter

If you want to change the topic name to which a source connector writes, or object name that’s created on a target by a sink connector, the RegExRouter is exactly what you need.

To use the Single Message Transform you specify the pattern in the topic name to match, and its replacement. To drop a prefix of test- from a topic you would use:

"transforms" : "dropTopicPrefix",
"transforms.dropTopicPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex" : "test-(.*)",
"transforms.dropTopicPrefix.replacement" : "$1"
Enter fullscreen mode Exit fullscreen mode

πŸ‘Ύ Demo code

Changing the topic name to which a source connector writes

XKCD

Source connectors will stream data to a Kafka topic based on properties define in the particular connector. For example, the JDBC source connector uses the table name and prefixes it with the mandatory value configured in topic.prefix. Other connectors will use the name of the source message queue being read from, the source file, etc etc.

Often, you’ll want to route data to a topic name that matches conventions that you have in your organisation for topic naming. Here’s an example of a JDBC source connector, and we want to drop the prefix that it uses:

{
  "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
  "connection.url" : "jdbc:mysql://mysql:3306/demo",
  "connection.user" : "mysqluser",
  "connection.password" : "mysqlpw",
  "topic.prefix" : "mysql-02-",
  "poll.interval.ms" : 1000,
  "tasks.max" : 1,
  "table.whitelist" : "customers",
  "mode" : "incrementing",
  "incrementing.column.name" : "id",
  "validate.non.null" : false,
  "transforms" : "dropTopicPrefix",
  "transforms.dropTopicPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.dropTopicPrefix.regex" : "mysql-02-(.*)",
  "transforms.dropTopicPrefix.replacement": "$1"
}
Enter fullscreen mode Exit fullscreen mode

This is using RegEx to match the prefix mysql-02- and to store everything else .* in a capture group ( ), which is then referenced in the replacement $1.

To learn more about RegEx, and experiment with patterns, check out the excellent RegExr.com

Changing the object name to which a source connector writes

Many sink connectors will use the topic name as the basis for the naming of the target object that it populates. The JDBC Sink connector creates a table named after the topic. The Elasticsearch sink connector creates an index named after the topic. And so on.

You can use the RegExRouter to customise the name of the object that sink connectors that follow this pattern will write to.

Here’s an example of streaming data to MySQL, using the JDBC sink connector. (See also πŸŽ₯ Kafka Connect in Action : JDBC Sink (πŸ‘Ύ demo code) and πŸŽ₯ ksqlDB & Kafka Connect JDBC Sink in action (πŸ‘Ύ demo code)

We’re going to read data from a topic called day4-transactions:

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day4-transactions-00/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics" : "day4-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true"
        }'
Enter fullscreen mode Exit fullscreen mode

This works; you get a table created in MySQL:

mysql> show tables;
+-------------------+
| Tables_in_demo    |
+-------------------+
| day4-transactions |
+-------------------+
1 row in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

What data’s in the table?

mysql> select * from day4-transactions;
ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '-transactions' at line 1
Enter fullscreen mode Exit fullscreen mode

Turns out a hyphen in the table name does not make your life easy in MySQL. You can quote is with a backtick, but it is not ideal

mysql> select * from `day4-transactions` LIMIT 1;
+-----------+-------+---------------------------+
| card_type | cost  | item                      |
+-----------+-------+---------------------------+
| switch    | 98.77 | Westmalle Trappist Tripel |
+-----------+-------+---------------------------+
1 row in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

By default the JDBC Sink connector takes the topic name as the name of the table to create. Let’s modify the above connector to route data to a table called transactions instead, and drop the day4- prefix.

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day4-transactions-00/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics" : "day4-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true",
          "transforms" : "dropTopicPrefix",
          "transforms.dropTopicPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.dropTopicPrefix.regex" : "day4-(.*)",
          "transforms.dropTopicPrefix.replacement" : "$1"
        }'
Enter fullscreen mode Exit fullscreen mode

Since we’ve PUT the above configuration it updates the existing connector, and now we have a table in MySQL without the day4- prefix that’s much easier to work with:

mysql> show tables;
+-------------------+
| Tables_in_demo    |
+-------------------+
| day4-transactions |
| transactions      |
+-------------------+
2 rows in set (0.00 sec)

mysql> select * from transactions limit 1;
+-----------+-------+-----------------+
| card_type | cost  | item            |
+-----------+-------+-----------------+
| dankort   | 27.12 | Sapporo Premium |
+-----------+-------+-----------------+
1 row in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)