When sinking unix timestamps from Apache Kafka to OpenSearch/ElasticSearch using the dedicated connector, they are not recognized by default as timestamp in the target tech.
Using the TimestampConverter SMT you can change the format into one that is recognized natively by OpenSearch/ElasticSearch.
Use the TimestampConverter SMT to translate unix timestamps into strings
To have OpenSearch/ElasticSearch automatically recognizing a timestamp we need to push it in the ISO 8601 format yyyy-MM-ddTHH:mm:ss.SSSZ
where:
-
yyyy-MM-dd
are the year, month, day -
T
is the stringT
-
HH:mm:ss.SSS
are the hour, minutes, seconds, milliseconds -
Z
is the stringZ
You can achieve this by adding a TimestampConverter SMT to the Kafka connect sink definition with:
"transforms.transform-name.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value"
"transforms.transform-name.field": "ts_ms",
"transforms.transform-name.format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
"transforms.transform-name.target.type": "string"
Where:
-
transform-name
is the transformation name, the same name must be referenced in thetransforms
parameter -
"transforms.transform-name.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value"
will extract the timestamp from part of the message value -
"transforms.transform-name.field": "ts_ms"
identifies the fieldts_ms
as the one to be converted from unix timestamp -
"transforms.transform-name.format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
defines the target ISO 8601 format -
"transforms.transform-name.target.type": "string"
defines the target field as string (OS/ES will automatically recognize it as timestamp)
Top comments (0)