Coroutines provides combine, zip and flattenMerge operators is used to combine emissions from multiple flows
Combine
Combine operator takes the latest emission from two flows and gives result
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
// gives 1..3 in every 300ms
val f1 = flowOf(1,2,3).onEach { delay(300) }
val f2 = flowOf("x", "y", "z").onEach { delay(400) }
val startTime = System.currentTimeMillis()
f1.combine(f2) { num, str -> "$num -> $str" }
.collect { result ->
println("$result at ${System.currentTimeMillis() - startTime} ms from start")
}
}
/**
1 -> x at 424 ms from start
2 -> x at 627 ms from start
2 -> y at 826 ms from start
3 -> y at 927 ms from start
3 -> z at 1226 ms from start
*/
Zip -
Let's take an example as above. Each time emission occurs, zip operators waits for emission from other flow , when it occurs zip provide results in lambda expression as numbers and letters
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val f1 = flowOf(1,2,3).onEach { delay(300) }
val f2 = flowOf("x", "y", "z").onEach { delay(400) }
val startTime = System.currentTimeMillis()
f1.zip(f2) { num, str -> "$num -> $str" }
.collect { result ->
println("$result at ${System.currentTimeMillis() - startTime} ms from start")
}
}
/*
1 -> x at 437 ms from start
2 -> y at 837 ms from start
3 -> z at 1239 ms from start
*/
it will stop execution when one of the flow is completed
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit>{
val f1 = (1..4).asFlow()
val f2 = flowOf("Hi", "Hello", )
f1.zip(f2){ a,b -> "$a -> $b"}
.collect{
println(it)
}
}
/*
1 -> Hi
2 -> Hello
*/
flattenMerge
It executes them as single flow ,it doesn't combines , it will not stop execution when one of the flow is completed (But zip operator does)
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val f1 = flowOf(1,2,3,4).onEach{ delay(200)}
val f2 = flowOf("H","O","L","A").onEach{ delay(400)}
val startTime = System.currentTimeMillis()
flowOf(f1,f2).flattenMerge().collect {
println("$it at ${System.currentTimeMillis() - startTime} ms from start")
}
}
/**
* 1 at 238 ms from start
H at 438 ms from start
2 at 438 ms from start
3 at 639 ms from start
O at 838 ms from start
4 at 839 ms from start
L at 1239 ms from start
A at 1640 ms from start
*/
Top comments (0)