DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

๐ŸŽ„ Twelve Days of SMT ๐ŸŽ„ - Day 9: Cast

The Cast Single Message Transform lets you change the data type of fields in a Kafka message, supporting numerics, string, and boolean.

Cast takes one argument listing the field(s) that you would like to transform and the target data type. Multiple fields can be specified by separating each specification with a comma.

"transforms" : "castTypes",
"transforms.castTypes.type" : "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTypes.spec" : "cost:float32,units:int16"
Enter fullscreen mode Exit fullscreen mode

๐Ÿ‘พ Demo code

Changing field data types as they pass through Kafka Connect

Because weโ€™re developers who appreciate the finer things in life, including the importance of schemas, weโ€™re using a suitable serialisation method for our data - Avro, Protobuf, or JSON Schema. The beauty of this is that the full schema is persisted at ingest, and available for any consumer. It also means that we want to make sure that the schema is correct for the data.

Letโ€™s use the REST API of the Schema Registry to take a look at the schema of the data that our source connector is streaming into Kafka:

$ curl -s "http://localhost:8081/subjects/day9-transactions-value/versions/latest" | jq '.schema|fromjson[]'

"fields": [
  {
    "name": "cost",
    "type": ["null", "string"],
  },
  {
    "name": "customer_remarks",
    "type": ["null", "string"],
  },
  {
    "name": "units",
    "type": ["null", "string"],
  },
  {
    "name": "card_type",
    "type": ["null", "string"],
  },
  {
    "name": "txn_date",
    "type": ["null", "string"],
  },
  {
    "name": "item",
    "type": ["null", "string"],
  }
]
Enter fullscreen mode Exit fullscreen mode

(Iโ€™ve cut bits of the schema shown above for the sake of space)

This data streamed as-is to a target system willโ€”unless the source system has something like Elasticsearchโ€™s dynamic field mappingโ€”remain in its string/text types:

mysql> describe `day9-transactions`;
+------------------+------+------+-----+---------+-------+
| Field            | Type | Null | Key | Default | Extra |
+------------------+------+------+-----+---------+-------+
| cost             | text | YES  |     | NULL    |       |
| customer_remarks | text | YES  |     | NULL    |       |
| card_type        | text | YES  |     | NULL    |       |
| txn_date         | text | YES  |     | NULL    |       |
| item             | text | YES  |     | NULL    |       |
| units            | text | YES  |     | NULL    |       |
+------------------+------+------+-----+---------+-------+
6 rows in set (0.01 sec)
Enter fullscreen mode Exit fullscreen mode

We saw previously how we handled the txn_date being a string when it should be a timestamp. Now we will use the Cast Single Message Transform to cast the two fields currently held as string to their correct numeric types.

We can do this at ingest or egress; if the conversion is to rectify an incorrect type then logically this should be done at ingest in the source connector. If itโ€™s to fix a specific requirement in a technology being written to in a sink connector then it should be done there instead.

Here weโ€™ll apply the fix to the source connector. Note that weโ€™re using a new target topic (day9-01) because otherwise youโ€™ll quite rightly get an error (io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "day9-transactions-value"; error code: 409).

curl -i -X PUT -H "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day9-01/config \
    -d '{
        "connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day9-01-transactions.with" : "#{Internet.uuid}",
        "genv.day9-01-transactions.cost.with" : "#{Commerce.price}",
        "genv.day9-01-transactions.units.with" : "#{number.number_between '\''1'\'','\''99'\''}",
        "genv.day9-01-transactions.txn_date.with" : "#{date.past '\''10'\'','\''DAYS'\''}",
        "genv.day9-01-transactions.card_type.with" : "#{Business.creditCardType}",
        "genv.day9-01-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
        "genv.day9-01-transactions.item.with" : "#{Beer.name}",
        "topic.day9-01-transactions.throttle.ms" : 1000,
        "transforms" : "castTypes",
        "transforms.castTypes.type" : "org.apache.kafka.connect.transforms.Cast$Value",
        "transforms.castTypes.spec" : "cost:float32,units:int16"
        }'
Enter fullscreen mode Exit fullscreen mode

Now the schema of the data in Kafka is correct for the data:

"fields": [
  {
    "name": "txn_date",
    "type": ["null", "string"],
  },
  {
    "name": "units",
    "type": [
      "null", { "type": "int", "connect.type": "int16" } ],
  },
  {
    "name": "customer_remarks",
    "type": ["null", "string"],
  },
  {
    "name": "cost",
    "type": ["null", "float"],
  },
  {
    "name": "item",
    "type": [ "null", "string"
    ],
  },
  {
    "name": "card_type",
    "type": ["null", "string"],
  }
]
Enter fullscreen mode Exit fullscreen mode

and when itโ€™s used in a sink connector the data is correctly stored in the target system:

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day9-01/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password": "mysqlpw",
          "topics" : "day9-01-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true"}'
Enter fullscreen mode Exit fullscreen mode
mysql> describe `day9-01-transactions`;
+------------------+----------+------+-----+---------+-------+
| Field            | Type     | Null | Key | Default | Extra |
+------------------+----------+------+-----+---------+-------+
| txn_date         | text     | YES  |     | NULL    |       |
| units            | smallint | YES  |     | NULL    |       |
| customer_remarks | text     | YES  |     | NULL    |       |
| cost             | float    | YES  |     | NULL    |       |
| item             | text     | YES  |     | NULL    |       |
| card_type        | text     | YES  |     | NULL    |       |
+------------------+----------+------+-----+---------+-------+
6 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode
mysql> select item, units, cost from `day9-01-transactions` LIMIT 5;
+-----------------------+-------+-------+
| item                  | units | cost  |
+-----------------------+-------+-------+
| Alpha King Pale Ale   |    29 | 17.49 |
| Brooklyn Black        |    36 | 92.88 |
| St. Bernardus Abt 12  |    17 | 94.04 |
| Celebrator Doppelbock |    63 | 58.64 |
| Ten FIDY              |    85 | 60.53 |
+-----------------------+-------+-------+
5 rows in set (0.00 sec)

Enter fullscreen mode Exit fullscreen mode

Try it out!

You can find the full code for trying this outโ€”including a Docker Compose so you can spin it up on your local machineโ€” ๐Ÿ‘พ here

Top comments (0)