Getting rid of Callback hell in KafkaProducer
Kotlin's coroutines provides a nice way to write async code. It makes it easy to write and compose asynchronous computation using a very light-weight model.
This post is not about what coroutines are, the link with the docs have a very deep and easy to read explanation of that. Instead I'm offering a solution to using KafkaProducer.send
method.
The issue is send()
leverages a Callback
strategy, and we all know that there's a special place in hell for those who use callbacks.
Fortunately Kotlin coroutines offer a solution: suspendCoroutine
function, that allows us to transform a callback into a suspend function call.
Receiver functions are also another nice treat of Kotlin language. It allows us to augment regular types with custom functions.
I decided to call the new function dispatch
instead of send
because I find a bit confusing when people decide to extend original function using the same name, and imports can get a bit messy.
So the extension function you need to write is very simple:
suspend inline fun <reified K : Any, reified V : Any> KafkaProducer<K, V>.dispatch(record: ProducerRecord<K, V>) =
suspendCoroutine<RecordMetadata> { continuation ->
val callback = Callback { metadata, exception ->
if (metadata == null) {
continuation.resumeWithException(exception!!)
} else {
continuation.resume(metadata)
}
}
this.send(record, callback)
}
Now you can just use it from your regular KafkaProducer
instance:
val props = Properties()
props["bootstrap.servers"] = "localhost:9092"
props["key.serializer"] = StringSerializer::class.java
props["value.serializer"] = JacksonSerializer::class.java
val kafkaProducer = KafkaProducer<String, SensorReading>(props)
async {
kafkaProducer.dispatch(ProducerRecord("sample", SensorReading("Bedroom", 72.0, false)))
}
Just remember that you can only call a suspend
function within the boundaries of a coroutine, hence the need for async
, same could be achieved with launch
or runBlocking
for testing.
Happy coding!
Top comments (2)
Did you manage to git rid of Threads in Consumer side as well?
Consumer is not thread safe, so I don't see how, unless you can 'force' a coroutine to always run on the same underlying thread. But that would kind of defeat the purpose for coroutines.