In this short article I want to discuss to different patterns of doing reaction subscriptions in Kotlin via callbacks and via Flow.
Let's assume that you have some kind of object or structure and you want to observe changes done to this structure.
The classic way to this is the following:
fun MyStructure.onChange(block: MyStructure.(Key) -> Unit)
and then use it like this:
myStructure.onChange{ key ->
println(get(key))
}
I've intentionally done it using Kotlin receiver feature because it makes the classic layout even more neat. You pass not only the change place, but also a reference to a changed object so you can use pre-defined block function.
Now let's see how it looks using Kotlin coroutines Flow API:
val MyStructure.changes: Flow<Key>
myStructure.changes.onEach{ key ->
println(myStructure.get(key)
}.launchIn(scope)
It seems like Flow is much harder to use. Also it requires a CoroutineScope
to be used. And since you should not use GlobalScope
for this, you need to think about the scope lifecycle.
I do not want to talk about performance issues here. If you have so many subscribers, it is a performance problem, you need to solve this problem elsewhere, but Flow
overhead is probably larger as well.
On the other hand, you can easily do transformations with events from Flow
. Like this:
myStructure.changes
.filter{ key -> key.startsWith("my")}
.map{ key -> myStructure.get(key)}
.onEach{ value -> println(value)}
.launchIn(scope)
But there is a problem people frequently forget about
If you make a subscription, you probably want to unsubscribe as well. Otherwise it leads to memory leaks (in VM sense) - some objects avoiding garbage collections because somebody holds their subscription handle.
So you write a code like this:
fun MyStructure.onChange(block: MyStructure.(Key) -> Unit)
fun MyStructure.removeChangeListener(block: MyStructure.(Key) -> Unit)
Actually, please do not write a code like this. Because in this case you rely on function equality to remove the correct listener and function equality is not reliable. Moreover, you will not be able to create a function via lambda like I've done above. You need to create it separately and store somewhere.
I usually do it this way:
fun MyStructure.onChange(owner: Any, block: MyStructure.(Key) -> Unit)
fun MyStructure.removeChangeListener(owner: Any)
In this case, I use the owner
object to check equality of a callback holder. If a single owner holds several callbacks, all of them will be removed and it has its uses. The only problem now is not to forget to remove the listener when it is no longer used (and it means that you need to think about lifetimes like in case of Flow
).
The Flow does not require specific unsubscription methods. Usually if you stop consuming the Flow, it is not used anymore. Also you can manually unsubscribe from it by canceling the subscription job:
val job = myStructure.changes.onEach{ key ->
println(myStructure.get(key)
}.launchIn(scope)
job.cancel()
Due to structured concurrency, you usually do not need to think about canceling the job manually. When the external scope is closed, all child jobs are canceled automatically. So the lifetime management is automatic.
Still, you need to manage the lifetime via coroutine scope (which is quite convenient).
Both ways are valid and I can't say that one of them should be always used instead of the other. I think I would prefer the Flow
way for complex systems because there is not risk of forgetting to unsubscribe. But in some cases you do not want to bother with scopes.
Cover image photo by Chris Stenger on Unsplash
Top comments (0)