TL;DR: go to Use Configuration
Another too fast, too furious post. I have spent a few hours trying to make my event processor multi-threaded, and it's so damn easy that I don't want anyone to spend more than a few minutes.
We are using the Spring Cloud Stream layer to configure our Kafka consumers.
For example, a configuration for a processor named 'reservations-input' connected to a Kafka topic 'reservations-topic' would be similar to this:
spring.cloud.stream:
bindings:
reservations-input:
content-type: application/json
destination: reservations-topic
group: consumer-service-group
And your class to start processing those events:
@EnableBinding({
MessagingConfiguration.ReservationTopic.class})
public class MessagingConfiguration {
public interface ReservationTopic {
String INPUT = "reservations-channel";
@Input(INPUT)
SubscribableChannel input();
}
}
@Service
public class ReservationProcessor {
@StreamListener(MessagingConfiguration.ReservationTopic.INPUT)
public void handle(@Nonnull Message<ReservationEvent> reservationMessage) {
// your stuff
}
Easy peasy. Only problem here is concurrency.
If you have used Kafka before, you would know that the number of partitions in your topic limits the concurrency.
Each partition have 1 single consumer.
I don't know whether (or where) I read that, but I assumed that my application would generate as many threads/consumers as partitions my topic has. But I was wrong. By default, Spring's only generates 1-threaded processor.
Solutions? Get more instances of your application or configure ConcurrentKafkaListenerContainerFactory
to be able to throw more threads (see https://docs.spring.io/spring-kafka/docs/2.3.x/reference/html/#container-factory).
Option 1: create your own instance of ConcurrentKafkaListenerContainerFactory.
The only hint I found in the documentation or stackoverflow but to instance a bean of type ConcurrentKafkaListenerContainerFactory
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
@Nonnull ConsumerFactory<String, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(5);
return factory;
}
I am not very prone to instance my own beans to configure things that seems too obvious. It is easy to overwrite some Spring default values that I am already expecting to use, it is more code to maintain...
There has to be a way through configuration.
Option 2: use configuration
Getting back to configuration, what we write under spring.cloud.stream.bindings.channel-name.consumer
ends in the configuration of Kafka. So that I tried to configure the property concurrency. That is:
spring.cloud.stream:
bindings:
reservations-input:
content-type: application/json
consumer.concurrency: 3
destination: reservations-topic
group: consumer-service-group
Starting our application, we see that we have 3 binders.
Profit!
December 17th 2019, 14:22:57.274 2019-12-17 13:22:57.274 INFO [consumer-service,,,] 1 --- [container-1-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : partitions assigned: [reservations-topic-1]
December 17th 2019, 14:22:57.259 2019-12-17 13:22:57.259 INFO [consumer-service,,,] 1 --- [container-2-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : partitions assigned: [reservations-topic-2]
December 17th 2019, 14:22:57.256 2019-12-17 13:22:57.256 INFO [consumer-service,,,] 1 --- [container-3-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : partitions assigned: [reservations-topic-3]
Top comments (0)