I had to do this recently and found little to no documentation on what was required to consume avro logs from kafka and deserialize them with the Glue schema registry so I'll document my findings here. For reference, Confluent has one that just works out of the box.
Requirements/Pre-requisites/Assumptions/?:
-
Apache Kafka
- I think I was using
3.3.1
with Scala2.13
- I think I was using
- Kafka cluster with topics that have avro messages
- We are using an AWS MSK cluster with kafka
v2.6.x
- This also assumes the messages are being serialized using the following headers
- We are using an AWS MSK cluster with kafka
- AWS Glue Registry with avro schema you're using
- You can find documentation on how to set this up with some schemas
- jars from https://repo1.maven.org/maven2/org/apache/
- this also assumes you have your iam role permissions/policies configured
Caveat: This is what worked with our setup and the jars we needed. I don't have any examples or screenshots.
Download required jars into <location of kafka>/libs/
:
List of jars:
Group Id | Artifact Id | Version |
---|---|---|
software.amazon.glue |
schema-registry-common |
1.1.14 |
software.amazon.glue |
schema-registry-serde |
1.1.14 |
software.amazon.awssdk |
glue |
2.17.12 |
software.amazon.awssdk |
arns |
2.17.12 |
software.amazon.awssdk |
url-connection-client |
2.17.12 |
org.apache.avro |
avro |
1.11.0 |
com.google.guava |
guava |
30.0-jre |
com.google.guava |
failureaccess |
1.0.1 |
Configuring kafka-console-consumer.sh
:
You'll need to supply some properties to the glue deserializer; I've used a wrapper script kafka-glue-avro-console-consumer.sh
:
#!/bin/sh
kafka-console-consumer.sh --value-deserializer com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer \
--property value.deserializer.region=<hardcoded-aws-region> \
--property value.deserializer.dataFormat=AVRO \
--property value.deserializer.avroRecordType=GENERIC_RECORD "$@"
You'll need to supply other properties such as --bootstrap-server <list of servers:port>
, --topic <topic-name>
, --consumer.config <config-file>
.
Example:
$ ./kafka-glue-avro-console-consumer.sh --bootstrap-server broker-01:9098 --topic my-topic --consumer.config my-config --max-message 1 --from-beginning
If you want to specify the glue registry and schema you can do that with:
--property value.deserializer.registry.name=<registryName>
--property value.deserializer.schemaName=<schemaName>
You can check the source code for additional properties you can use: https://github.com/awslabs/aws-glue-schema-registry/blob/v1.1.14/common/src/main/java/com/amazonaws/services/schemaregistry/common/configs/GlueSchemaRegistryConfiguration.java
Here is a snippet of ansible
code to grab all the jars:
- name: install artifacts from maven to consume AVRO records
maven_artifact:
group_id: "{{ item.group_id }}"
artifact_id: "{{ item.artifact_id }}"
version: "{{ item.version }}"
dest: /home/ubuntu/kafka_{{ kafka_client_version }}/libs/{{ item.artifact_id }}-{{ item.version }}.jar
group: ubuntu
owner: ubuntu
mode: 0664
loop:
- { group_id: 'software.amazon.glue', artifact_id: 'schema-registry-common', version: 1.1.14 }
- { group_id: 'software.amazon.glue', artifact_id: 'schema-registry-serde', version: 1.1.14 }
- { group_id: 'software.amazon.awssdk', artifact_id: 'glue', version: 2.17.122 }
- { group_id: 'software.amazon.awssdk', artifact_id: 'arns', version: 2.17.122 }
- { group_id: 'software.amazon.awssdk', artifact_id: 'url-connection-client', version: 2.17.122 }
- { group_id: 'org.apache.avro', artifact_id: 'avro', version: 1.11.0 }
- { group_id: 'com.google.guava', artifact_id: 'guava', version: 30.0-jre }
- { group_id: 'com.google.guava', artifact_id: 'failureaccess', version: 1.0.1 }
(we are testing some things which is why everything is in the user's home directory!)
Top comments (1)
Hi, I got below error. I tried to add software.amazon.awssdk.utils package to libs, but it didn't solve the issue.
Exception in thread "main" java.lang.NoClassDefFoundError: software/amazon/awssdk/utils/internal/EnumUtils
at software.amazon.awssdk.services.glue.model.Compatibility.(Compatibility.java:42)
at com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants.(AWSSchemaRegistryConstants.java:114)
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.validateAndSetCompatibility(GlueSchemaRegistryConfiguration.java:168)
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.buildSchemaRegistryConfigs(GlueSchemaRegistryConfiguration.java:93)
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.buildConfigs(GlueSchemaRegistryConfiguration.java:82)
at com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration.(GlueSchemaRegistryConfiguration.java:74)
at com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer.configure(GlueSchemaRegistryKafkaDeserializer.java:89)
at kafka.tools.DefaultMessageFormatter.getDeserializerProperty(ConsoleConsumer.scala:586)
at kafka.tools.DefaultMessageFormatter.$anonfun$configure$22(ConsoleConsumer.scala:492)
at kafka.tools.DefaultMessageFormatter.configure(ConsoleConsumer.scala:592)
at kafka.tools.ConsoleConsumer$ConsumerConfig.(ConsoleConsumer.scala:317)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:51)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Caused by: java.lang.ClassNotFoundException: software.amazon.awssdk.utils.internal.EnumUtils
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 13 more