The Cast
Single Message Transform lets you change the data type of fields in a Kafka message, supporting numerics, string, and boolean.
Cast
takes one argument listing the field(s) that you would like to transform and the target data type. Multiple fields can be specified by separating each specification with a comma.
"transforms" : "castTypes",
"transforms.castTypes.type" : "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTypes.spec" : "cost:float32,units:int16"
Changing field data types as they pass through Kafka Connect
Because weโre developers who appreciate the finer things in life, including the importance of schemas, weโre using a suitable serialisation method for our data - Avro, Protobuf, or JSON Schema. The beauty of this is that the full schema is persisted at ingest, and available for any consumer. It also means that we want to make sure that the schema is correct for the data.
Letโs use the REST API of the Schema Registry to take a look at the schema of the data that our source connector is streaming into Kafka:
$ curl -s "http://localhost:8081/subjects/day9-transactions-value/versions/latest" | jq '.schema|fromjson[]'
"fields": [
{
"name": "cost",
"type": ["null", "string"],
},
{
"name": "customer_remarks",
"type": ["null", "string"],
},
{
"name": "units",
"type": ["null", "string"],
},
{
"name": "card_type",
"type": ["null", "string"],
},
{
"name": "txn_date",
"type": ["null", "string"],
},
{
"name": "item",
"type": ["null", "string"],
}
]
(Iโve cut bits of the schema shown above for the sake of space)
This data streamed as-is to a target system willโunless the source system has something like Elasticsearchโs dynamic field mappingโremain in its string
/text
types:
mysql> describe `day9-transactions`;
+------------------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+------+------+-----+---------+-------+
| cost | text | YES | | NULL | |
| customer_remarks | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
| txn_date | text | YES | | NULL | |
| item | text | YES | | NULL | |
| units | text | YES | | NULL | |
+------------------+------+------+-----+---------+-------+
6 rows in set (0.01 sec)
We saw previously how we handled the txn_date
being a string when it should be a timestamp. Now we will use the Cast
Single Message Transform to cast the two fields currently held as string to their correct numeric types.
We can do this at ingest or egress; if the conversion is to rectify an incorrect type then logically this should be done at ingest in the source connector. If itโs to fix a specific requirement in a technology being written to in a sink connector then it should be done there instead.
Here weโll apply the fix to the source connector. Note that weโre using a new target topic (day9-01
) because otherwise youโll quite rightly get an error (io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "day9-transactions-value"; error code: 409
).
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-voluble-datagen-day9-01/config \
-d '{
"connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
"genkp.day9-01-transactions.with" : "#{Internet.uuid}",
"genv.day9-01-transactions.cost.with" : "#{Commerce.price}",
"genv.day9-01-transactions.units.with" : "#{number.number_between '\''1'\'','\''99'\''}",
"genv.day9-01-transactions.txn_date.with" : "#{date.past '\''10'\'','\''DAYS'\''}",
"genv.day9-01-transactions.card_type.with" : "#{Business.creditCardType}",
"genv.day9-01-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
"genv.day9-01-transactions.item.with" : "#{Beer.name}",
"topic.day9-01-transactions.throttle.ms" : 1000,
"transforms" : "castTypes",
"transforms.castTypes.type" : "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTypes.spec" : "cost:float32,units:int16"
}'
Now the schema of the data in Kafka is correct for the data:
"fields": [
{
"name": "txn_date",
"type": ["null", "string"],
},
{
"name": "units",
"type": [
"null", { "type": "int", "connect.type": "int16" } ],
},
{
"name": "customer_remarks",
"type": ["null", "string"],
},
{
"name": "cost",
"type": ["null", "float"],
},
{
"name": "item",
"type": [ "null", "string"
],
},
{
"name": "card_type",
"type": ["null", "string"],
}
]
and when itโs used in a sink connector the data is correctly stored in the target system:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day9-01/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password": "mysqlpw",
"topics" : "day9-01-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true"}'
mysql> describe `day9-01-transactions`;
+------------------+----------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+----------+------+-----+---------+-------+
| txn_date | text | YES | | NULL | |
| units | smallint | YES | | NULL | |
| customer_remarks | text | YES | | NULL | |
| cost | float | YES | | NULL | |
| item | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
+------------------+----------+------+-----+---------+-------+
6 rows in set (0.00 sec)
mysql> select item, units, cost from `day9-01-transactions` LIMIT 5;
+-----------------------+-------+-------+
| item | units | cost |
+-----------------------+-------+-------+
| Alpha King Pale Ale | 29 | 17.49 |
| Brooklyn Black | 36 | 92.88 |
| St. Bernardus Abt 12 | 17 | 94.04 |
| Celebrator Doppelbock | 63 | 58.64 |
| Ten FIDY | 85 | 60.53 |
+-----------------------+-------+-------+
5 rows in set (0.00 sec)
Try it out!
You can find the full code for trying this outโincluding a Docker Compose so you can spin it up on your local machineโ ๐พ here
Top comments (0)