When sinking unix timestamps from Apache Kafka to OpenSearch/ElasticSearch using the dedicated connector, they are not recognized by default as timestamp in the target tech.
Using the TimestampConverter SMT you can change the format into one that is recognized natively by OpenSearch/ElasticSearch.
Use the TimestampConverter SMT to translate unix timestamps into strings
To have OpenSearch/ElasticSearch automatically recognizing a timestamp we need to push it in the ISO 8601 format yyyy-MM-ddTHH:mm:ss.SSSZ where:
-
yyyy-MM-ddare the year, month, day -
Tis the stringT -
HH:mm:ss.SSSare the hour, minutes, seconds, milliseconds -
Zis the stringZ
You can achieve this by adding a TimestampConverter SMT to the Kafka connect sink definition with:
"transforms.transform-name.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value"
"transforms.transform-name.field": "ts_ms",
"transforms.transform-name.format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
"transforms.transform-name.target.type": "string"
Where:
-
transform-nameis the transformation name, the same name must be referenced in thetransformsparameter -
"transforms.transform-name.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value"will extract the timestamp from part of the message value -
"transforms.transform-name.field": "ts_ms"identifies the fieldts_msas the one to be converted from unix timestamp -
"transforms.transform-name.format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"defines the target ISO 8601 format -
"transforms.transform-name.target.type": "string"defines the target field as string (OS/ES will automatically recognize it as timestamp)
Top comments (0)