The TimestampConverter
Single Message Transform lets you work with timestamp fields in Kafka messages. You can convert a string into a native Timestamp type (or Date or Time), as well as Unix epoch - and the same in reverse too.
This is really useful to make sure that data ingested into Kafka is correctly stored as a Timestamp (if it is one), and also enables you to write a Timestamp out to a sink connector in a string format that you choose.
The TimestampConverter
takes three arguments; the name of the field holding the timestamp, the data type to which you want to convert it, and the format of the timestamp to parse (if reading a string and target.type
is unix
/timestamp
/date
/time
) or write (if youβre writing a string and target.type
is string
). The timestamp format is based on SimpleDateFormat
.
"transforms" : "convertTS",
"transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTS.field" : "txn_date",
"transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
"transforms.convertTS.target.type": "Timestamp"
Change string-based timestamp into timestamp
In the payload the txn_date
field looks like a timestamp:
$ docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day8-transactions -C -c5 -o-5 -u -q -J | \
jq '.payload.Gen0.txn_date.string'
"Thu Dec 10 17:06:59 GMT 2020"
"Sat Dec 05 16:39:40 GMT 2020"
"Sat Dec 05 21:43:46 GMT 2020"
"Sun Dec 13 20:30:21 GMT 2020"
"Wed Dec 09 06:18:31 GMT 2020"
But if we have a look at the actual schema for the value (since itβs serialised as Avro; the same would apply for Protobuf or JSON Schema) we can see that it might look like a timestamp, quack like a timestamp, but is in fact not a timestamp per se, but a string:
$ curl -s "http://localhost:8081/subjects/day8-transactions-value/versions/latest" | jq '.schema|fromjson[]'
"null"
{
"type": "record",
"name": "Gen0",
"namespace": "io.mdrogalis",
"fields": [
[β¦]
{
"name": "txn_date",
"type": [
"null",
"string"
]
[β¦]
This means that when the data is used by a consumer, such as a Kafka Connect sink, itβs still handled as a string with the result that the target object type will usually inherit the same. Hereβs an example of the JDBC sink connector (See also π₯ Kafka Connect in Action : JDBC Sink (πΎ demo code
) and π₯ ksqlDB & Kafka Connect JDBC Sink in action (πΎ demo code
)):
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password": "mysqlpw",
"topics" : "day8-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true"
}'
mysql> describe `day8-transactions`;
+------------------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+------+------+-----+---------+-------+
| customer_remarks | text | YES | | NULL | |
| item | text | YES | | NULL | |
| cost | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
| txn_date | text | YES | | NULL | |
+------------------+------+------+-----+---------+-------+
5 rows in set (0.00 sec)
Note that txn_date
is a text
field, which is no use for anyone who wants to use it as the timestamp that it is.
This is where the TimestampConverter
comes in. In our example itβs going to cast the string as it passes through Kafka Connect with the supplied date and time format string to a timestamp. You can also use it to convert between epoch timestamp value, and also to target a string, epoch, date, or time (as well as actual timestamp).
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-01/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day8-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "convertTS,changeTopic",
"transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTS.field" : "txn_date",
"transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
"transforms.convertTS.target.type": "Timestamp",
"transforms.changeTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changeTopic.regex" : "(.*)",
"transforms.changeTopic.replacement": "$1_withTS"
}'
Hereβs the resulting table in MySQL:
mysql> describe `day8-transactions_withTS`;
+------------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+-------------+------+-----+---------+-------+
| customer_remarks | text | YES | | NULL | |
| item | text | YES | | NULL | |
| cost | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
| txn_date | datetime(3) | YES | | NULL | |
+------------------+-------------+------+-----+---------+-------+
5 rows in set (0.00 sec)
As mentioned above, you can also extract just the date or time components of the timestamp by changing the target.type
.
Date only
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-02/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day8-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "convertTS,changeTopic",
"transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTS.field" : "txn_date",
"transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
"transforms.convertTS.target.type": "Date",
"transforms.changeTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changeTopic.regex" : "(.*)",
"transforms.changeTopic.replacement": "$1_withDate"
}'
Resulting table in MySQL:
mysql> describe `day8-transactions_withDate`;
+------------------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+------+------+-----+---------+-------+
| customer_remarks | text | YES | | NULL | |
| item | text | YES | | NULL | |
| cost | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
| txn_date | date | YES | | NULL | |
+------------------+------+------+-----+---------+-------+
5 rows in set (0.01 sec)
mysql> select txn_date from `day8-transactions_withDate` LIMIT 5;
+------------+
| txn_date |
+------------+
| 2020-01-04 |
| 2020-01-04 |
| 2019-12-29 |
| 2020-01-01 |
| 2019-12-29 |
+------------+
5 rows in set (0.00 sec)
Time only
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-03/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day8-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "convertTS,changeTopic",
"transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTS.field" : "txn_date",
"transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
"transforms.convertTS.target.type": "Time",
"transforms.changeTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changeTopic.regex" : "(.*)",
"transforms.changeTopic.replacement": "$1_withTime"
}'
Resulting table in MySQL:
mysql> describe `day8-transactions_withTime`;
+------------------+---------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+---------+------+-----+---------+-------+
| customer_remarks | text | YES | | NULL | |
| item | text | YES | | NULL | |
| cost | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
| txn_date | time(3) | YES | | NULL | |
+------------------+---------+------+-----+---------+-------+
5 rows in set (0.00 sec)
mysql> select txn_date from `day8-transactions_withTime` LIMIT 5;
+--------------+
| txn_date |
+--------------+
| 14:05:19.000 |
| 14:09:11.000 |
| 19:18:25.000 |
| 03:22:06.000 |
| 09:57:44.000 |
+--------------+
5 rows in set (0.00 sec)
Unix Epoch
You can also write an unix epoch:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-04/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day8-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "convertTS,changeTopic",
"transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTS.field" : "txn_date",
"transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
"transforms.convertTS.target.type": "unix",
"transforms.changeTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changeTopic.regex" : "(.*)",
"transforms.changeTopic.replacement": "$1_withUnixEpoch"
}'
Resulting table in MySQL:
mysql> describe `day8-transactions_withUnixEpoch`;
+------------------+--------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+--------+------+-----+---------+-------+
| customer_remarks | text | YES | | NULL | |
| item | text | YES | | NULL | |
| cost | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
| txn_date | bigint | YES | | NULL | |
+------------------+--------+------+-----+---------+-------+
5 rows in set (0.00 sec)
mysql> select txn_date from `day8-transactions_withUnixEpoch` LIMIT 5;
+---------------+
| txn_date |
+---------------+
| 1577973919000 |
| 1577714951000 |
| 1577819905000 |
| 1577762526000 |
| 1577786264000 |
+---------------+
5 rows in set (0.00 sec)
If you have timestamp in unix epoch (bigint) as the source, you can use TimestampConverter
to write it as a timestamp/date/time, and also as a string - if you do the latter then the format
configuration applies to the format in which the string will be written.
Accessing timestamps in nested fields
Unfortunately the TimestampConverter
only works on root-level elements; it canβt be used on timestamp fields that are nested in other fields. Youβd need to either use Flatten
first, or write your own transformation.
Try it out!
You can find the full code for trying this outβincluding a Docker Compose so you can spin it up on your local machineβ πΎ here
Top comments (0)