Apache Kafka 2.6 included KIP-585 which adds support for defining predicates against which transforms are conditionally executed, as well as a Filter
Single Message Transform to drop messages - which in combination means that you can conditionally drop messages.
As part of Apache Kafka, Kafka Connect ships with pre-built Single Message Transforms and Predicates, but you can also write you own. The API for each is documented: Transformation
/ Predicate
. The predicates that ship with Apache Kafka are:
RecordIsTombstone
- The value part of the message is null (denoting a tombstone message)HasHeaderKey
- Matches if a header exists with the name givenTopicNameMatches
- Matches based on topic
It can be used in both a source and sink connector depending on requirements.
Conditionally renaming fields based on the topic name
Consider two topics holding logically identical entities, but with varying field names. Awful, isnβt it π±
-
Topic
day11-sys01
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -C -c1 -o-1 -u -q -J \ -t day11-sys01 | \ jq '.payload.Gen0' { "txn_date": { "string": "Sun Dec 13 02:18:44 GMT 2020" }, "amount" : { "string": "68.32" }, "product" : { "string": "Yeti Imperial Stout" }, "units" : { "string": "56" }, "source" : { "string": "SYS01" } }
-
Topic
day11-systemB
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -C -c1 -o-1 -u -q -J \ -t day11-systemB | \ jq '.payload.Gen0' { "txn_date": { "string": "Mon Dec 07 16:22:03 GMT 2020" }, "cost" : { "string": "63.39" }, "units" : { "string": "27" }, "item" : { "string": "St. Bernardus Abt 12" }, "source" : { "string": "SYSTEM_B" } }
Our esteemed data engineers have determined that in these two systems cost
is the same as amount
and item
the same as product
.
With Kafka Connect and Single Message Transform we can stream this data to a single harmonious home downstream from one connector. This is enabled through two Single Message Transforms.
-
Conditionally modifying the field names
- The transform includes the
predicate
property ("transforms.renameSystemBFields.predicate": "isSystemBTopic"
) which resolves to thepredicates.isSystemBTopic
configuration. - Predicate configuration follows a similar pattern to transforms;
predicate
prefix, followed by a label (isSystemBTopic
), atype
(org.apache.kafka.connect.transforms.predicates.TopicNameMatches
) and then additional configuration properties as required by the particular predicate being used.
- The transform includes the
-
Renaming the topic (and thus target table name) for all messages to
transactions
.- Note that this happens in the
transforms
list after the field renaming, otherwise the predicate will not match the topic for.*-systemB
since the topic will already be calledtransactions
- Note that this happens in the
The sink connector here picks up both topics ("topics.regex" : "day11-.*"
) but only applies the ReplaceField
renames
operation to messages from the day11-systemB
topic.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics.regex" : "day11-.*",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "renameSystemBFields,renameTargetTopic",
"transforms.renameSystemBFields.type" : "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameSystemBFields.renames" : "item:product,cost:amount",
"transforms.renameSystemBFields.predicate": "isSystemBTopic",
"transforms.renameTargetTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.renameTargetTopic.regex" : "day11-.*",
"transforms.renameTargetTopic.replacement": "transactions",
"predicates" : "isSystemBTopic",
"predicates.isSystemBTopic.type" : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isSystemBTopic.pattern" : ".*-systemB"
}'
The resulting table is in the target database looks like this:
mysql> describe transactions;
+----------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+----------+------+------+-----+---------+-------+
| txn_date | text | YES | | NULL | |
| amount | text | YES | | NULL | |
| units | text | YES | | NULL | |
| product | text | YES | | NULL | |
| source | text | YES | | NULL | |
+----------+------+------+-----+---------+-------+
5 rows in set (0.01 sec)
with data from both topics present (identifiable by the different source
values):
mysql> SELECT * FROM transactions LIMIT 5;
+------------------------------+----------+--------+-------------------------------+-------+
| txn_date | source | amount | product | units |
+------------------------------+----------+--------+-------------------------------+-------+
| Tue Dec 08 14:27:13 GMT 2020 | SYS01 | 10.03 | Stone IPA | 39 |
| Tue Dec 15 23:09:20 GMT 2020 | SYSTEM_B | 7.24 | Ruination IPA | 72 |
| Wed Dec 09 06:26:34 GMT 2020 | SYS01 | 92.66 | Bells Expedition | 55 |
| Thu Dec 10 19:38:26 GMT 2020 | SYSTEM_B | 65.11 | Sierra Nevada Celebration Ale | 5 |
| Fri Dec 11 01:38:48 GMT 2020 | SYS01 | 55.52 | Sierra Nevada Celebration Ale | 31 |
+------------------------------+----------+--------+-------------------------------+-------+
5 rows in set (0.00 sec)
Filtering out null records
Consider a source topic in which there are tombstone (null) records being produced. These may be by design, or by error - but either way, we want to exclude them from the sink connector pipeline.
docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -C -o-10 -u -q -J \
-t sys02 | \
jq -c '[.offset,.key,.payload]'
[88,"0d011ee6-4424-4cb6-8665-61b46918b3d9",null]
[89,"b859f443-e92e-4599-a426-91c4bc6b1d28",null]
[90,"5633d30f-5b08-4a94-8690-b576e3e3d978",null]
[91,"aa0efeae-9dac-43a9-854b-1da3b589dee7",{"Gen0":{"amount":{"string":"73.66"},"txn_date":{"string":"Sun Dec 13 01:21:10 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Kirin Inchiban"},"units":{"string":"67"}}}]
[92,"4de86341-8165-42ca-bbea-276875cc9585",{"Gen0":{"amount":{"string":"6.86"},"txn_date":{"string":"Tue Dec 08 16:42:27 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Trappistes Rochefort 8"},"units":{"string":"61"}}}]
[93,"478dd272-a0cb-4f36-9dcb-73dd5bba245a",{"Gen0":{"amount":{"string":"30.50"},"txn_date":{"string":"Sun Dec 13 11:03:59 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Edmund Fitzgerald Porter"},"units":{"string":"11"}}}]
[94,"50a2e247-1a2b-4321-bc3e-a3980df83c23",{"Gen0":{"amount":{"string":"19.18"},"txn_date":{"string":"Fri Dec 11 03:48:47 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Samuel Smithβs Imperial IPA"},"units":{"string":"4"}}}]
[95,"6f2172b7-d3b2-4890-a295-82a889e9a5b7",null]
[96,"fdfc9d85-fe02-4846-86a7-e31d1acdf26c",{"Gen0":{"amount":{"string":"7.27"},"txn_date":{"string":"Thu Dec 10 09:53:55 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Stone IPA"},"units":{"string":"87"}}}]
[97,"2b307e28-ff01-4f01-9a7e-529c60afb8ce",{"Gen0":{"amount":{"string":"53.49"},"txn_date":{"string":"Wed Dec 16 15:05:38 GMT 2020"},"source":{"string":"SYS02"},"product":{"string":"Samuel Smithβs Imperial IPA"},"units":{"string":"3"}}}]
Hereβs a sink connector similar to above, again using predicate
to apply a transform selectively. In this instance itβs the Filter
transform (which drops a record), applied only when isNullRecord
predicate is true.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-01/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "sys02",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "dropNullRecords",
"transforms.dropNullRecords.type" : "org.apache.kafka.connect.transforms.Filter",
"transforms.dropNullRecords.predicate": "isNullRecord",
"predicates" : "isNullRecord",
"predicates.isNullRecord.type" : "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone"
}'
Filtering based on the contents of a message
Confluent Platform includes its own Filter
Single Message Transform. Instead of being intended for use in combination with a predicate
(as the org.apache.kafka.connect.transforms.Filter
transform is), the one in Confluent Platform uses JSON path to specify a predicate based on the data in the message itself.
Hereβs an example that filters out all messages except those that include Stout
in the product field:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-02/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day11-sys01",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "filterStout",
"transforms.filterStout.type" : "io.confluent.connect.transforms.Filter$Value",
"transforms.filterStout.filter.condition": "$[?(@.product =~ /.*Stout/)]",
"transforms.filterStout.filter.type" : "include"
}'
The resulting data in MySQL has just the expected messages in:
mysql> select * from `day11-sys01`;
+------------------------------+--------+--------+------------------------------+-------+
| txn_date | source | amount | product | units |
+------------------------------+--------+--------+------------------------------+-------+
| Fri Dec 11 07:27:51 GMT 2020 | SYS01 | 58.75 | Stone Imperial Russian Stout | 67 |
| Sat Dec 12 05:15:18 GMT 2020 | SYS01 | 28.66 | Oak Aged Yeti Imperial Stout | 43 |
| Tue Dec 15 10:56:00 GMT 2020 | SYS01 | 73.17 | Storm King Stout | 28 |
| Tue Dec 15 12:46:52 GMT 2020 | SYS01 | 55.06 | Stone Imperial Russian Stout | 68 |
| Tue Dec 15 09:04:27 GMT 2020 | SYS01 | 0.34 | Bourbon County Stout | 33 |
| Wed Dec 09 02:12:24 GMT 2020 | SYS01 | 88.97 | Bourbon County Stout | 28 |
| Sun Dec 13 04:18:51 GMT 2020 | SYS01 | 6.29 | Samuel Smiths Oatmeal Stout | 7 |
| Thu Dec 10 10:51:51 GMT 2020 | SYS01 | 6.95 | Samuel Smiths Oatmeal Stout | 1 |
+------------------------------+--------+--------+------------------------------+-------+
8 rows in set (0.00 sec)
If you want to filter on numerics then make sure the data type is correct; use Cast
if necessary, as shown here. In this case, the order of the "transforms"
is important:
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day11-02/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day11-sys01",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "castTypes,filterSmallOrder",
"transforms.filterSmallOrder.type" : "io.confluent.connect.transforms.Filter$Value",
"transforms.filterSmallOrder.filter.condition": "$[?(@.amount < 42)]",
"transforms.filterSmallOrder.filter.type" : "include",
"transforms.castTypes.type" : "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTypes.spec" : "amount:float32"
}'
In the resulting data you can see that all the values in amount
are less than 42, per the specified filter
mysql> select * from `day11-sys01` LIMIT 10;
+------------------------------+--------+--------+-------------------------------------------+-------+
| txn_date | source | amount | product | units |
+------------------------------+--------+--------+-------------------------------------------+-------+
| Thu Dec 10 00:57:55 GMT 2020 | SYS01 | 3.53 | Sierra Nevada Celebration Ale | 26 |
| Mon Dec 14 01:01:00 GMT 2020 | SYS01 | 10.19 | Racer 5 India Pale Ale, Bear Republic Bre | 26 |
| Wed Dec 09 13:57:03 GMT 2020 | SYS01 | 20.29 | Hennepin | 32 |
| Wed Dec 09 19:58:35 GMT 2020 | SYS01 | 33.27 | 90 Minute IPA | 44 |
| Fri Dec 11 14:21:57 GMT 2020 | SYS01 | 14.87 | Yeti Imperial Stout | 52 |
| Wed Dec 09 17:19:18 GMT 2020 | SYS01 | 28.58 | Yeti Imperial Stout | 60 |
| Wed Dec 09 18:59:01 GMT 2020 | SYS01 | 34.28 | Two Hearted Ale | 67 |
| Mon Dec 07 18:47:19 GMT 2020 | SYS01 | 14.62 | Shakespeare Oatmeal | 47 |
| Sat Dec 12 23:07:38 GMT 2020 | SYS01 | 35.98 | Samuel Smiths Oatmeal Stout | 31 |
| Fri Dec 11 19:14:25 GMT 2020 | SYS01 | 32.12 | Founders Breakfast Stout | 73 |
+------------------------------+--------+--------+-------------------------------------------+-------+
10 rows in set (0.00 sec)
Try it out!
You can find the full code for trying this outβincluding a Docker Compose so you can spin it up on your local machineβ πΎ here
Top comments (0)