I have a Spring Boot application that uses Kafka-Streams. In details, there is a stream that filters the messages it receives using the results of a query performed in MongoDB. The code is something similar to the following.
final KStream<String, String> stream =
kStreamBuilder.stream(Serdes.String(), Serdes.String(), inputTopic)
.filter((s, message) -> service.hasSomeProperty(message))
…
Top comments (0)