DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 7: TimestampRouter

Just like the RegExRouter, the TimeStampRouter can be used to modify the topic name of messages as they pass through Kafka Connect. Since the topic name is usually the basis for the naming of the object to which messages are written in a sink connector, this is a great way to achieve time-based partitioning of those objects if required. For example, instead of streaming messages from Kafka to an Elasticsearch index called cars, they can be routed to monthly indices e.g. cars_2020-10, cars_2020-11, cars_2020-12, etc.

The TimeStampRouter takes two arguments; the format of the final topic name to generate, and the format of the timestamp to put in the topic name (based on SimpleDateFormat).

"transforms" : "addTimestampToTopic",
"transforms.addTimestampToTopic.type" : "org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.addTimestampToTopic.topic.format" : "${topic}_${timestamp}",
"transforms.addTimestampToTopic.timestamp.format": "YYYY-MM-dd"
Enter fullscreen mode Exit fullscreen mode

Note that the TimeStampRouter uses the timestamp of the Kafka message itself. The message timestamp can be set by the producer API explicitly, or allowed to default to the setting on the broker (log.message.timestamp.type) or topic (message.timestamp.type) which by default is the time on the broker at which the message is created (CreateTime). Message timestamps were added in Apache Kafka 0.10 in KIP-32.

๐Ÿ‘พ Demo code

Route data to target objects named based on the timestamp of the source data

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day7-00/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics" : "day7-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true",
          "transforms" : "addTimestampToTopic",
          "transforms.addTimestampToTopic.type" : "org.apache.kafka.connect.transforms.TimestampRouter",
          "transforms.addTimestampToTopic.topic.format" : "${topic}_${timestamp}",
          "transforms.addTimestampToTopic.timestamp.format": "YYYY-MM-dd"
        }'
Enter fullscreen mode Exit fullscreen mode

The data is read from the source topic (day7-transactions) and passes through the transformation which takes the message timestamp and appends it to the topic, resulting in a table in the target database:

mysql> show tables;
+-------------------------------+
| Tables_in_demo                |
+-------------------------------+
| day7-transactions_2020-12-16  |
+-------------------------------+
Enter fullscreen mode Exit fullscreen mode

Reference: JDBC sink connector, see also ๐ŸŽฅ Kafka Connect in Action : JDBC Sink (๐Ÿ‘พ demo code) and ๐ŸŽฅ ksqlDB & Kafka Connect JDBC Sink in action (๐Ÿ‘พ demo code)

What about using a timestamp value from the message itself?

In the payload of the data itself you may well have a date field that denotes the business value of the date by which you want to partition the data. In the data that we streamed to the database above, for example, you can see txn_date:

mysql> SELECT txn_date, item, cost FROM `day7-transactions_2020-12-11` LIMIT 5;
+------------------------------+------------------------------+-------+
| txn_date                     | item                         | cost  |
+------------------------------+------------------------------+-------+
| Thu Dec 03 02:37:03 GMT 2020 | Hop Rod Rye                  | 34.52 |
| Sat Dec 05 18:53:49 GMT 2020 | Duvel                        | 97.81 |
| Fri Dec 04 22:45:54 GMT 2020 | Trois Pistoles               | 41.45 |
| Fri Dec 04 16:37:25 GMT 2020 | Ten FIDY                     | 63.56 |
| Fri Dec 04 17:16:59 GMT 2020 | Stone Imperial Russian Stout | 94.66 |
+------------------------------+------------------------------+-------+
5 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

This is the difference between 'system date' (when the data was loaded into Kafka), and 'business date' (when the actual event took place).

There is a Single Message Transform called MessageTimestampRouter which is part of Confluent Platform and can be used to route data based on a time field in the message value itself.

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day7-00/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics" : "day7-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true",
          "transforms" : "addTimestampToTopicFromField",
          "transforms.addTimestampToTopicFromField.type" : "io.confluent.connect.transforms.MessageTimestampRouter",
          "transforms.addTimestampToTopicFromField.message.timestamp.keys" : "txn_date",
          "transforms.addTimestampToTopicFromField.message.timestamp.format": "EEE MMM dd HH:mm:ss zzz YYYY",
          "transforms.addTimestampToTopicFromField.topic.format" : "${topic}_${timestamp}",
          "transforms.addTimestampToTopicFromField.topic.timestamp.format" : "YYYY-MM-dd"
        }'
Enter fullscreen mode Exit fullscreen mode

Currently failsโ€ฆ

org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [appending message's timestamp field to topic], found: org.apache.kafka.connect.data.Struct
        at io.confluent.connect.transforms.util.Requirements.requireMap(Requirements.java:30)
        at io.confluent.connect.transforms.MessageTimestampRouter.apply(MessageTimestampRouter.java:132)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)
        ... 14 more

TRACE [sink-jdbc-mysql-day7-00|task-2] Applying transformation io.confluent.connect.transforms.MessageTimestampRouter to SinkRecord{kafkaOffset=2300, timestampType=CreateTime} ConnectRecord{topic='day7-transactions', kafkaPartition=0, key=013e350e-ac03-44cd-bc2b-7b348ec4df6b, keySchema=Schema{STRING}, value=Struct{txn_date=Thu Dec 03 02:25:24 GMT 2020,cost=73.58,item=Delirium Noctorum,card_type=mastercard,customer_remarks=He laid out Biff in one punch. I didn't know he had it in him. He's never stood up to Biff in his life!}, valueSchema=Schema{io.mdrogalis.Gen0:STRUCT}, timestamp=1607681956641, headers=ConnectHeaders(headers=)} (org.apache.kafka.connect.runtime.TransformationChain:47)
Enter fullscreen mode Exit fullscreen mode

The cause of this is that the Single Message Transform currently expects to handle raw JSON formatted records - not Avro/Protobuf/JSON Schema.

Top comments (0)