Spring Cloud Stream is the solution provided by Spring to build applications connected to shared messaging systems.
It offers an abstraction (the binding) that works the same whatever underneath implementation we use (the binder):
- Apache Kafka
- Rabbit MQ
- Kafka Streams
- Amazon Kinesis
- ...
In a previous post Spring Cloud Stream Kafka Streams first steps I got working a simple example using the Kafka Streams binder.
In this one the goal is to use the Kafka Streams binder and the Kafka Streams Processor API to implement the following scenario:
We receive messages with key = userId and value = { userId: string, token: number } from topic pub.user.token
For every userId which we receive token 1, 2, 3, 4 and 5 within under 1 minute, we send a completed event to topic pub.user.state
For every userId which we receive at least one token but not the complete 1, 2, 3, 4 and 5 sequence within under 1 minute, we send an expired event to topic pub.user.state
Ready? Let's code! 🤓
rogervinas / spring-cloud-stream-kafka-streams-processor
🍀 Spring Cloud Stream & Kafka Streams Binder + Processor API
If you run this demo don't forget to read this information about caching in the state stores
- Test-first using kafka-streams-test-utils
- UserStateStream implementation
- Kafka Streams binder configuration
- UserStateStream bean
- Integration Test
Test-first using kafka-streams-test-utils
Once kafka-streams-test-utils is properly setup in our @BeforeEach we can implement this test:
data class UserTokenEvent(val userId: String, val token: Int)
enum class UserStateEventType { COMPLETED, EXPIRED }
data class UserStateEvent(val userId: String, val state: UserStateEventType)
@Test
fun `should publish completed event for one user`() {
topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 1))
topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 2))
topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 3))
topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 4))
topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 5))
topologyTestDriver.advanceWallClockTime(EXPIRATION.minusMillis(10))
assertThat(topicOut.readKeyValuesToList()).singleElement().satisfies(Consumer { topicOutMessage ->
assertThat(topicOutMessage.key).isEqualTo(USERNAME_1)
assertThat(topicOutMessage.value).isEqualTo(UserStateEvent(USERNAME_1, COMPLETED))
})
}
@Test
fun `should publish expired event for one user`() {
topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 1))
topicIn.pipeInput(USERNAME_1, UserTokenEvent(USERNAME_1, 2))
topologyTestDriver.advanceWallClockTime(EXPIRATION.plus(SCHEDULE).plus(SCHEDULE))
assertThat(topicOut.readKeyValuesToList()).singleElement().satisfies(Consumer { topicOutMessage ->
assertThat(topicOutMessage.key).isEqualTo(USERNAME_1)
assertThat(topicOutMessage.value).isEqualTo(UserStateEvent(USERNAME_1, EXPIRED))
})
}
UserStateStream implementation
We start first with our UserStateStream implementation as a Function:
- Which input is a KStream, as we want a String as the Kafka message's key and a UserTokenEvent as the Kafka message's value
- Which output is a KStream, same here, String as the key and UserStateEvent as the value
class UserStateStream(
private val schedule: Duration,
private val expiration: Duration
) : Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> {
override fun apply(input: KStream<String, UserTokenEvent>): KStream<String, UserStateEvent> {
TODO()
}
}
Now step by step ...
1. Aggregation by userId
private const val USER_STATE_STORE = "user-state"
data class UserState(val userId: String = "", val tokens: List<Int> = emptyList()) {
operator fun plus(event: UserTokenEvent) = UserState(event.userId, tokens + event.token)
}
class UserStateStream(
private val schedule: Duration,
private val expiration: Duration
) : Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> {
override fun apply(input: KStream<String, UserTokenEvent>): KStream<String, UserStateEvent> {
return input
.selectKey { _, event -> event.userId } // just in case but the key should be userId already
.groupByKey()
.aggregate(
{ UserState() },
{ userId, event, state ->
logger.info("Aggregate $userId ${state.tokens} + ${event.token}")
state + event // we use the UserState's plus operator
},
Materialized.`as`<String, UserState, KeyValueStore<Bytes, ByteArray>>(USER_STATE_STORE)
.withKeySerde(Serdes.StringSerde())
.withValueSerde(JsonSerde(UserState::class.java))
)
.toStream()
// From here down it is just to avoid compilation errors
.mapValues { userId, _ ->
UserStateEvent(userId, COMPLETED)
}
}
}
2. Completed UserStateEvents
We can generate completed UserStateEvents straightaway once we receive the last UserTokenEvent:
data class UserState(val userId: String = "", val tokens: List<Int> = emptyList()) {
// ...
val completed = tokens.containsAll(listOf(1, 2, 3, 4, 5))
}
class UserStateStream(
private val schedule: Duration,
private val expiration: Duration
) : Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> {
override fun apply(input: KStream<String, UserTokenEvent>): KStream<String, UserStateEvent> {
return input
// ...
.toStream()
.mapValues { state ->
logger.info("State $state")
when {
state.completed -> UserStateEvent(state.userId, COMPLETED)
else -> null
}
}
.filter { _, event -> event != null }
.mapValues { event ->
logger.info("Publish $event")
event!!
}
}
}
3. UserStateProcessor implementation
Our UserStateProcessor will scan periodically the "user-state" store and it will apply our expiration logic to every UserState:
class UserStateProcessor(
private val schedule: Duration,
private val expiration: Duration
) : Processor<String, UserState, Void, Void> {
override fun init(context: ProcessorContext<Void, Void>) {
context.schedule(schedule, PunctuationType.WALL_CLOCK_TIME) { time ->
val stateStore = context.getStateStore<KeyValueStore<String, ValueAndTimestamp<UserState>>>(USER_STATE_STORE)
stateStore.all().forEachRemaining { it : KeyValue<String, ValueAndTimestamp<UserState>> ->
logger.info("Do something with $it!!") // TODO
}
}
}
override fun process(record: Record<String, UserState>?) {
// we do not need to do anything here
}
}
Just apply the expiration logic this way:
data class UserState(val userId: String = "", val tokens: List<Int> = emptyList(), val expired: Boolean = false) {
// ...
fun expire() = UserState(userId, tokens, true)
}
class UserStateProcessor(
private val schedule: Duration,
private val expiration: Duration
) : Processor<String, UserState, Void, Void> {
override fun init(context: ProcessorContext<Void, Void>) {
context.schedule(schedule, PunctuationType.WALL_CLOCK_TIME) { time ->
val stateStore = context.getStateStore<KeyValueStore<String, ValueAndTimestamp<UserState>>>(USER_STATE_STORE)
stateStore.all().forEachRemaining {
val age = Duration.ofMillis(time - it.value.timestamp())
if (age > expiration) {
if (it.value.value().expired) {
// if it is already expired from a previous execution, we delete it
logger.info("Delete ${it.key}")
stateStore.delete(it.key)
} else {
// if it has expired right now, we mark it as expired and we update it
logger.info("Expire ${it.key}")
stateStore.put(it.key, ValueAndTimestamp.make(it.value.value().expire(), it.value.timestamp()))
}
}
}
}
}
}
4. UserStateStream and UserStateProcessor integration
class UserStateStream(
private val schedule: Duration,
private val expiration: Duration
) : Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> {
override fun apply(input: KStream<String, UserTokenEvent>): KStream<String, UserStateEvent> {
return input
// ...
.toStream()
// we add the UserStateProcessor
.apply { process(ProcessorSupplier { UserStateProcessor(schedule, expiration) }, USER_STATE_STORE) }
// downstream we will both receive upstream realtime values as the ones "generated" by the UserStateProcessor
.mapValues { state ->
logger.info("State $state")
when {
// null states are sent downstream by UserStateProcessor when deleting entries from the store
state == null -> null // "null" value generated by UserStateProcessor deleting values from the store
// completed states are sent downstream from upstream
state.completed -> UserStateEvent(state.userId, COMPLETED)
// expired states are sent downstream by UserStateProcessor when updating entries from the store
state.expired -> UserStateEvent(state.userId, EXPIRED)
else -> null
}
}
.filter { _, event -> event != null }
.mapValues { event ->
logger.info("Publish $event")
event!!
}
}
}
And just at this point our UserStreamTest should pass 🟩 👌
Kafka Streams binder configuration
Easy!
spring:
application:
name: "spring-cloud-stream-kafka-streams-processor"
cloud:
stream:
function:
definition: userStateStream
bindings:
userStateStream-in-0: "pub.user.token"
userStateStream-out-0: "pub.user.state"
kafka:
streams:
binder:
applicationId: "${spring.application.name}"
brokers: "localhost:9094"
configuration:
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
With this configuration:
- Spring Cloud Stream will create a Kafka Streams binder connected to localhost:9094
- We need to create a @Bean named userStateStream that should implement Function interface
- This @Bean will connect a KStream subscribed to pub.user.token topic to another KStream publishing to pub.user.state topic
You can find all the available configuration properties documented in Kafka Streams Properties.
UserStateStream bean
As required by our configuration we need to create a @Bean named userStateStream
:
@Configuration
class ApplicationConfiguration {
@Bean
fun userStateStream(
@Value("\${user.schedule}") schedule: Duration,
@Value("\${user.expiration}") expiration: Duration
): Function<KStream<String, UserTokenEvent>, KStream<String, UserStateEvent>> = UserStateStream(schedule, expiration)
}
Integration Test
We already "unit test" our UserStateStream with kafka-streams-test-utils but we need also an integration test using a Kafka container ... Testcontainers to the rescue!
1. Kafka helpers
First we need utility classes to produce to Kafka and consume from Kafka using kafka-clients library:
class KafkaConsumerHelper(bootstrapServers: String, topic: String) {
fun consumeAll(): List<ConsumerRecord<String, String>> {
// ...
}
fun consumeAtLeast(numberOfRecords: Int, timeout: Duration): List<ConsumerRecord<String, String>> {
// ...
}
}
class KafkaProducerHelper(bootstrapServers: String) {
fun send(topic: String?, key: String, body: String) {
// ...
}
}
2. DockerCompose Testcontainer
As described in Testcontainers + Junit5 we can use @Testcontainers
annotation:
@SpringBootTest
@Testcontainers
@ActiveProfiles("test")
class ApplicationIntegrationTest {
companion object {
@Container
val container = DockerComposeContainerHelper().createContainer()
}
// ...
}
3. Tests
And finally the tests, using Awaitility as we are testing asynchronous stuff:
class ApplicationIntegrationTest {
// ...
@Test
fun `should publish completed event`() {
val username = UUID.randomUUID().toString()
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 1}""")
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 2}""")
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 3}""")
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 4}""")
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 5}""")
await().atMost(ONE_MINUTE).untilAsserted {
val record = kafkaConsumerHelper.consumeAtLeast(1, ONE_SECOND)
assertThat(record).singleElement().satisfies(Consumer {
assertThat(it.key()).isEqualTo(username)
JSONAssert.assertEquals("""{"userId": "$username", "state": "COMPLETED"}""", it.value(), true)
})
}
}
@Test
fun `should publish expired event`() {
val username = UUID.randomUUID().toString()
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 1}""")
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 2}""")
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 3}""")
kafkaProducerHelper.send(TOPIC_USER_TOKEN, username, """{"userId": "$username", "token": 4}""")
await().atMost(ONE_MINUTE).untilAsserted {
val record = kafkaConsumerHelper.consumeAtLeast(1, ONE_SECOND)
assertThat(record).singleElement().satisfies(Consumer {
assertThat(it.key()).isEqualTo(username)
JSONAssert.assertEquals("""{"userId": "$username", "state": "EXPIRED"}""", it.value(), true)
})
}
}
}
And just at this point all our tests should pass 🟩 👏
That's it, happy coding! 💙
Top comments (0)