DEV Community

Robin Moffatt
Robin Moffatt

Posted on • Originally published at rmoff.net on

πŸŽ„ Twelve Days of SMT πŸŽ„ - Day 9: Cast

The Cast Single Message Transform lets you change the data type of fields in a Kafka message, supporting numerics, string, and boolean.

Cast takes one argument listing the field(s) that you would like to transform and the target data type. Multiple fields can be specified by separating each specification with a comma.

"transforms" : "castTypes",
"transforms.castTypes.type" : "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.castTypes.spec" : "cost:float32,units:int16"
Enter fullscreen mode Exit fullscreen mode

πŸ‘Ύ Demo code

Changing field data types as they pass through Kafka Connect

Because we’re developers who appreciate the finer things in life, including the importance of schemas, we’re using a suitable serialisation method for our data - Avro, Protobuf, or JSON Schema. The beauty of this is that the full schema is persisted at ingest, and available for any consumer. It also means that we want to make sure that the schema is correct for the data.

Let’s use the REST API of the Schema Registry to take a look at the schema of the data that our source connector is streaming into Kafka:

$ curl -s "http://localhost:8081/subjects/day9-transactions-value/versions/latest" | jq '.schema|fromjson[]'

"fields": [
  {
    "name": "cost",
    "type": ["null", "string"],
  },
  {
    "name": "customer_remarks",
    "type": ["null", "string"],
  },
  {
    "name": "units",
    "type": ["null", "string"],
  },
  {
    "name": "card_type",
    "type": ["null", "string"],
  },
  {
    "name": "txn_date",
    "type": ["null", "string"],
  },
  {
    "name": "item",
    "type": ["null", "string"],
  }
]
Enter fullscreen mode Exit fullscreen mode

(I’ve cut bits of the schema shown above for the sake of space)

This data streamed as-is to a target system willβ€”unless the source system has something like Elasticsearch’s dynamic field mappingβ€”remain in its string/text types:

mysql> describe `day9-transactions`;
+------------------+------+------+-----+---------+-------+
| Field            | Type | Null | Key | Default | Extra |
+------------------+------+------+-----+---------+-------+
| cost             | text | YES  |     | NULL    |       |
| customer_remarks | text | YES  |     | NULL    |       |
| card_type        | text | YES  |     | NULL    |       |
| txn_date         | text | YES  |     | NULL    |       |
| item             | text | YES  |     | NULL    |       |
| units            | text | YES  |     | NULL    |       |
+------------------+------+------+-----+---------+-------+
6 rows in set (0.01 sec)
Enter fullscreen mode Exit fullscreen mode

We saw previously how we handled the txn_date being a string when it should be a timestamp. Now we will use the Cast Single Message Transform to cast the two fields currently held as string to their correct numeric types.

We can do this at ingest or egress; if the conversion is to rectify an incorrect type then logically this should be done at ingest in the source connector. If it’s to fix a specific requirement in a technology being written to in a sink connector then it should be done there instead.

Here we’ll apply the fix to the source connector. Note that we’re using a new target topic (day9-01) because otherwise you’ll quite rightly get an error (io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "day9-transactions-value"; error code: 409).

curl -i -X PUT -H "Content-Type:application/json" \
    http://localhost:8083/connectors/source-voluble-datagen-day9-01/config \
    -d '{
        "connector.class" : "io.mdrogalis.voluble.VolubleSourceConnector",
        "genkp.day9-01-transactions.with" : "#{Internet.uuid}",
        "genv.day9-01-transactions.cost.with" : "#{Commerce.price}",
        "genv.day9-01-transactions.units.with" : "#{number.number_between '\''1'\'','\''99'\''}",
        "genv.day9-01-transactions.txn_date.with" : "#{date.past '\''10'\'','\''DAYS'\''}",
        "genv.day9-01-transactions.card_type.with" : "#{Business.creditCardType}",
        "genv.day9-01-transactions.customer_remarks.with": "#{BackToTheFuture.quote}",
        "genv.day9-01-transactions.item.with" : "#{Beer.name}",
        "topic.day9-01-transactions.throttle.ms" : 1000,
        "transforms" : "castTypes",
        "transforms.castTypes.type" : "org.apache.kafka.connect.transforms.Cast$Value",
        "transforms.castTypes.spec" : "cost:float32,units:int16"
        }'
Enter fullscreen mode Exit fullscreen mode

Now the schema of the data in Kafka is correct for the data:

"fields": [
  {
    "name": "txn_date",
    "type": ["null", "string"],
  },
  {
    "name": "units",
    "type": [
      "null", { "type": "int", "connect.type": "int16" } ],
  },
  {
    "name": "customer_remarks",
    "type": ["null", "string"],
  },
  {
    "name": "cost",
    "type": ["null", "float"],
  },
  {
    "name": "item",
    "type": [ "null", "string"
    ],
  },
  {
    "name": "card_type",
    "type": ["null", "string"],
  }
]
Enter fullscreen mode Exit fullscreen mode

and when it’s used in a sink connector the data is correctly stored in the target system:

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day9-01/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password": "mysqlpw",
          "topics" : "day9-01-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true"}'
Enter fullscreen mode Exit fullscreen mode
mysql> describe `day9-01-transactions`;
+------------------+----------+------+-----+---------+-------+
| Field            | Type     | Null | Key | Default | Extra |
+------------------+----------+------+-----+---------+-------+
| txn_date         | text     | YES  |     | NULL    |       |
| units            | smallint | YES  |     | NULL    |       |
| customer_remarks | text     | YES  |     | NULL    |       |
| cost             | float    | YES  |     | NULL    |       |
| item             | text     | YES  |     | NULL    |       |
| card_type        | text     | YES  |     | NULL    |       |
+------------------+----------+------+-----+---------+-------+
6 rows in set (0.00 sec)
Enter fullscreen mode Exit fullscreen mode
mysql> select item, units, cost from `day9-01-transactions` LIMIT 5;
+-----------------------+-------+-------+
| item                  | units | cost  |
+-----------------------+-------+-------+
| Alpha King Pale Ale   |    29 | 17.49 |
| Brooklyn Black        |    36 | 92.88 |
| St. Bernardus Abt 12  |    17 | 94.04 |
| Celebrator Doppelbock |    63 | 58.64 |
| Ten FIDY              |    85 | 60.53 |
+-----------------------+-------+-------+
5 rows in set (0.00 sec)

Enter fullscreen mode Exit fullscreen mode

Try it out!

You can find the full code for trying this outβ€”including a Docker Compose so you can spin it up on your local machineβ€” πŸ‘Ύ here

Top comments (0)