DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

πŸŽ„ Twelve Days of SMT πŸŽ„ - Day 3: Flatten

The Flatten Single Message Transform (SMT) is useful when you need to collapse a nested message down to a flat structure.

To use the Single Message Transform you only need to reference it; there’s no additional configuration required:

"transforms" : "flatten",
"transforms.flatten.type" : "org.apache.kafka.connect.transforms.Flatten$Value"
Enter fullscreen mode Exit fullscreen mode

You can optionally override the default delimiter (.) that’s used:

"transforms.flatten.delimiter" : "_"
Enter fullscreen mode Exit fullscreen mode

πŸ‘Ύ Demo code

Example - JDBC Sink connector

See also πŸŽ₯ Kafka Connect in Action : JDBC Sink (πŸ‘Ύ demo code) and πŸŽ₯ ksqlDB & Kafka Connect JDBC Sink in action (πŸ‘Ύ demo code)

Given a source message that looks like this:

{
  "FULL_NAME": "Opossum, american virginia",
  "ADDRESS": {
    "STREET": "20 Acker Terrace"
    "CITY": "Lynchburg"
    "COUNTY_OR_STATE": "Virginia"
    "ZIP_OR_POSTCODE": "24515"
  }
}
Enter fullscreen mode Exit fullscreen mode

We can’t load it directly into a database because databases expect flat structures. If we try to load it as it is the JDBC Sink connector will fail and throw an error:

…(STRUCT) type doesn't have a mapping to the SQL database column type
Enter fullscreen mode Exit fullscreen mode

So we use the Single Message Transform to flatten the source payload:

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day3-customers-00/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics" : "day3-customers",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true",
          "transforms" : "flatten",
          "transforms.flatten.type" : "org.apache.kafka.connect.transforms.Flatten$Value",
          "transforms.flatten.delimiter" : "_"
        }'
Enter fullscreen mode Exit fullscreen mode

This will work, and you can now see the data in MySQL:

mysql> describe `day3-customers`;
+-------------------------+------+------+-----+---------+-------+
| Field                   | Type | Null | Key | Default | Extra |
+-------------------------+------+------+-----+---------+-------+
| FULL_NAME               | text | YES  |     | NULL    |       |
| ADDRESS_STREET          | text | YES  |     | NULL    |       |
| ADDRESS_CITY            | text | YES  |     | NULL    |       |
| ADDRESS_COUNTY_OR_STATE | text | YES  |     | NULL    |       |
| ADDRESS_ZIP_OR_POSTCODE | text | YES  |     | NULL    |       |
+-------------------------+------+------+-----+---------+-------+
5 rows in set (0.00 sec)

mysql> select * from `day3-customers`;
+----------------------------+-----------------------------+--------------+-------------------------+-------------------------+
| FULL_NAME                  | ADDRESS_STREET              | ADDRESS_CITY | ADDRESS_COUNTY_OR_STATE | ADDRESS_ZIP_OR_POSTCODE |
+----------------------------+-----------------------------+--------------+-------------------------+-------------------------+
| Opossum, american virginia | 20 Acker Terrace            | Lynchburg    | Virginia                | 24515                   |
| Red deer                   | 53 Basil Terrace            | Lexington    | Kentucky                | 40515                   |
| Laughing kookaburra        | 84 Monument Alley           | San Jose     | California              | 95113                   |
| American bighorn sheep     | 326 Sauthoff Crossing       | San Antonio  | Texas                   | 78296                   |
| Skua, long-tailed          | 7 Laurel Terrace            | Manassas     | Virginia                | 22111                   |
| Fox, bat-eared             | 2946 Daystar Drive          | Jamaica      | New York                | 11431                   |
| Greater rhea               | 97 Morning Way              | Charleston   | West Virginia           | 25331                   |
| Vervet monkey              | 7615 Brown Park             | Chicago      | Illinois                | 60681                   |
| White spoonbill            | 7 Fulton Parkway            | Asheville    | North Carolina          | 28805                   |
| Sun gazer                  | 61 Lakewood Gardens Parkway | Pensacola    | Florida                 | 32590                   |
+----------------------------+-----------------------------+--------------+-------------------------+-------------------------+
10 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode

Here’s how to add the key into the target table:

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day3-customers-02/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics" : "day3-customers2",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true",
          "transforms" : "flatten",
          "transforms.flatten.type" : "org.apache.kafka.connect.transforms.Flatten$Value",
          "transforms.flatten.delimiter" : "_",
          "pk.mode" : "record_key",
          "pk.fields" : "id",
          "key.converter" : "org.apache.kafka.connect.converters.LongConverter"
        }'
Enter fullscreen mode Exit fullscreen mode

Top comments (0)