If you want to make sure your expected String key is what you think it is, using BytesDeserializer with your console consumers is better than StringDeserializer.
Introduction
The Confluent Avro Serializer and Deserializer leverages storing the unique ID of the schema in the message. When unexpected characters show up in a string, a type mismatch would be more obvious. But what about non-printable characters? How do they show up? Will the issue then be obvious?
Demonstration
A simple demonstration can be done with the Datagen Source Connector. Create a connector with Avro as the key. The data type for the Datagen's quickstart users
is a string. The Avro serializer will write this as an Avro primitive. Typically, when Avro is used, the top-level object is a Record, but the serializer has custom code for supporting primitives.
The Configuration
The Datagen connector is configured with the key being represented at Avro.
{
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"tasks.max": "1",
"kafka.topic": "users",
"quickstart": "users",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url" : "http://schema-registry:8081",
"key.converter.schemas.enable": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url" : "http://schema-registry:8081",
"value.converter.schemas.enable": "true",
"max.interval": 100,
"iterations": 10000000
}
Scenario
You write a Kafka Streams application where you read the key as a Serdes.String()
, the default you used for your application. You forget to change the serde for reading users
from the default serde to an Avro Serde. You now join your stream of orders with users, and none of the joins succeeds.
Investigation...
If you are me, the first thing you do is you use kafka-avro-console-consumer
to see what is going on.
kafka-avro-console-consumer \
--bootstrap-server localhost:19092 \
--property schema.registry.url="http://localhost:8081" \
--property print.key=true \
--property key.separator="|" \
--from-beginning \
--skip-message-on-error \
--key-deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--topic users
The result has content that looks pretty normal and expected:
User_9|{"registertime":1489457902486,"userid":"User_9","regionid":"Region_1","gender":"OTHER"}
User_1|{"registertime":1500277798184,"userid":"User_1","regionid":"Region_2","gender":"OTHER"}
Now there could be extra blank lines show up if the non-printable bytes triggers that; but that doesn't always stick out and an obvious issue (at least not obvious to me).
What if your key deserializer was BytesDeserializer
, what would you have seen?
kafka-avro-console-consumer \
--bootstrap-server localhost:19092 \
--property schema.registry.url="http://localhost:8081" \
--property print.key=true \
--property key.separator="|" \
--from-beginning \
--skip-message-on-error \
--key-deserializer=org.apache.kafka.common.serialization.BytesDeserializer \
--topic users
The serializer's magic byte (0x00) and the bytes for the schema-id show up in printable hex characters:
\x00\x00\x00\x00\x03\x0CUser_9|{"registertime":1489457902486,"userid":"User_9","regionid":"Region_1","gender":"OTHER"}
\x00\x00\x00\x00\x03\x0CUser_1|{"registertime":1500277798184,"userid":"User_1","regionid":"Region_2","gender":"OTHER"}
Now it is easy to see the issue, the key is Avro (a primitive Avro string as defined by the serializer). Solution: update the connector to use a String, or update the streams application to re-key the data.
NOTE
Running containers for demonstrations is great, but the mis-match of URLs can be confusing. localhost:port
is used for connecting to services from the host machine (your laptop) via port mapping. The actual hostname is used when you are accessing the service from another container. Therefore, you will see http://schema-registry:8081
within the connect configuration, and http://localhost:8081
for commands running from the host machine. I do not translate here as these scripts align with the demo code.
Useful Shell Aliases
I have these defined in my .zshrc
.
alias kcc='kafka-console-consumer \
--bootstrap-server localhost:19092 \
--key-deserializer=org.apache.kafka.common.serialization.BytesDeserializer \
--property print.key=true \
--property key.separator="|" \
--from-beginning \
--topic'
alias kacc='kafka-avro-console-consumer \
--bootstrap-server localhost:19092 \
--property schema.registry.url="http://localhost:8081" \
--property print.key=true \
--property key.separator="|" \
--from-beginning \
--skip-message-on-error \
--key-deserializer=org.apache.kafka.common.serialization.BytesDeserializer \
--topic'
Takeaways
While this may seem obvious to you, and you would immediately inspect the connector configuration and uncover the problem; you want to make things easy for everyone on your team. Allowing them to troubleshoot and find issues easy is a win for you and a win for them.
This demonstration is available within the
key-mismatch
demo within dev-local-demos.
Reach out
I would enjoy hearing more about the development improvements you use.
Reach out at contact us.
Top comments (0)