DEV Community

Pritam Kadam
Pritam Kadam

Posted on

Everything you need to know about kotlin coroutines

Everything you need to know about kotlin coroutines

My agenda for this blog post is to get yourself familiar with different terminologies of coroutines and answer the following questions:

  • Difference between Job and Deferred, launch and async

  • Which coroutine builder you should use?

  • What happens when a exception is thrown in coroutine?

  • At a very high level, how structured concurrency is achieved?

  • A thread-safe shared mutable state with a single-threaded dispatcher

  • IO and CPU bound operations with coroutines

To understand how coroutines work and use them effectively in real-world applications, you need to first understand its core concepts.

We are going to cover the following topics in the rest of the write-up:

Introduction

Coroutines are light-weight threads and the construction of coroutine is very cheap. They do not directly map to native os threads, because of that they are very faster to create and destroy compared to threads. There is no additional overhead of switching context between threads. Practically you can have thousands of or even tens of thousands of coroutines. There might be only one thread having thousands of coroutines.

The two most important building blocks to create/start/run new coroutines are coroutine scope and coroutine builders.

Coroutine scope consists of all the machinery required to run coroutine, for example, it knows where (on which thread) to run coroutine and coroutine builders are used to create a new coroutine.

If I have to give an analogy with threads, coroutine scope can be seen as Java’s ExecutorService and coroutine builders are factories to create Runnable instances.

CoroutineScope is more than just ExecutorService!

To demonstrate that coroutines are light-weight, let’s look at the following examples where we create 100 thousand coroutines vs 100 thousand threads and print(".") in each one of them.

100_000 coroutines:

The above application does not crash and prints 100_000 dots.

Note: Do not worry about launch and runBlocking used in the above example, this is explained in detail in later sections of this article. Just know that runBlocking creates coroutine scope which can be seen as setting up an environment to execute coroutine and launch is a coroutine builder that spawns new coroutine, schedules it for execution in the environment.

100_000 threads:

Above application crashes with java.lang.OutOfMemoryError: unable to create new native thread

Coroutine Scope Builders

CoroutineScope is an interface that has a single abstract property called coroutineContext.

    public interface CoroutineScope {
        public val coroutineContext: CoroutineContext
    }
Enter fullscreen mode Exit fullscreen mode

CoroutineScope is nothing but a CoroutineContext, the only difference is in their intended use. Roman Elizarov explains this in detail in following blog post

The important thing you need to know is that whenever a new coroutine scope is created, a new job gets created and associated with it.

Every coroutine created using this scope becomes the child of this job. This is how a parent-child relationship gets created between coroutines. If any of the coroutines throws an unhandled exception, it’s parent job gets canceled which ultimately cancels all its children. This is called structured concurrency, I highly recommend this blog to know more on this topic.

How to create CoroutineScope

MainScope

This is useful for UI components, it creates SupervisorJob and all the coroutines created with this scope runs on the Main thread. Because it creates SupervisorJob, failure of any coroutine does not trigger the termination of others unlike I have mentioned previously in the context of structured concurrency. Failures of child coroutines can be handled using CoroutineExceptionHandler.

    public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
Enter fullscreen mode Exit fullscreen mode

Note: To work with the Main dispatcher, following additional platform specific runtime dependencies required to be added to your project:

  • kotlinx-coroutines-android — for Android Main thread dispatcher

  • kotlinx-coroutines-javafx — for JavaFx Application thread dispatcher

  • kotlinx-coroutines-swing — for Swing EDT dispatcher

CoroutineScope(ctx)

This creates a coroutine scope from provided coroutine context and makes sure that a job is associated with this scope.

    public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
        ContextScope(if (context[Job] != null) context else context + Job())
Enter fullscreen mode Exit fullscreen mode

The following example demonstrates creating a new CorotuineScope without any Job passed externally but internally it creates one. It also shows a glimpse of structured concurrency. Child coroutine (C*hild-A*) throws exceptions which result in the cancellation of another coroutine (Child-B).

coroutineScope(block)

This is a suspending function that creates a coroutine scope, new Job and calls provided suspending block with this scope. This inherits coroutine scope from the outer scope and overrides the context’s Job. Note that, this is a suspending function and will return when given block and all its child coroutines are completed even though you do not join/await on coroutines.

    public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R
Enter fullscreen mode Exit fullscreen mode

Let’s say you have some asynchronous suspending tasks and you want to wait for results from all of them and then perform some action. In this scenario, you can use coroutineScope, fire all the tasks parallelly and then wait for their results.

GlobalScope

This is a singleton and not associated with any Job. This launches top-level coroutines and highly discouraged to use because if you launch coroutines in the global scope, you lose all the benefits you get from structured concurrency.

    public object GlobalScope : CoroutineScope {
        override val coroutineContext: CoroutineContext get() = EmptyCoroutineContext
    }
Enter fullscreen mode Exit fullscreen mode

Following blog post explains more reasons to why you should avoid using GlobalScope

The following example demonstrates that there is no Job associated with GlobalScope and the exception is thrown in one of the coroutines launched using GlobalScope does not affect other coroutines hence we loose structured concurrency.

CoroutineContext

It is simply a map between Key and Element (Key -> Element) where

  • Key: Key for the elements of type CoroutineContext

  • Element: Subtype of CoroutineContext, for example, Job, Deferred, CoroutineDispacher, ActorCoroutine, etc.

Problem Statement

Let’s say we have mutable state accessed from different coroutines concurrently and we want this state to be thread-safe.

There are multiple ways to achieve this but I am going to show using single-threaded executor service.

In the following example, 1000 coroutines are created concurrently and each coroutine increments counter value. In the end, asserting that counters value is 1000 proves that there are no race conditions in the solution. You can try printing thread name in the incrementAsyncfunction and verify that all running on the same thread.

Job & Structured Concurrency

Background job responsible for solely doing side-effects. Conceptually Job is a cancellable thing with lifecycle associated with it.

Coroutines implement Job interface which is responsible for maintaining the lifecycle of a coroutine.

Jobs are designed to have a parent-child relationship and this helps a lot in achieving structured concurrency.

Let’s say we have three jobs A, B, and C. Job B and C are children of Job A. Now if for example Job C fails with an exception other than CancellationException, parent Job A gets notified and A immediately sends termination to other children in this case to Job B. This way structured concurrency is achieved.

Note: If the cause of the cancellation is CancellationExcpetion then it is considered to be canceled normally hence such cancellations need not start sending termination signals to other siblings via parent.

A parent-child relation has the following effect:

  • Cancellation of a parent with [cancel] or its exceptional completion (failure)immediately cancels all its children.

  • Parent cannot complete until all its children are complete.

  • Parent waits for all its children to complete in completing or canceling state.

  • Uncaught exception in a child, by default, cancels parent. In particular, this applies to children created with [launch] [CoroutineScope.launch] coroutine builder.

  • Note that [async][CoroutineScope.async] and other future-like coroutine builders do not have uncaught exceptions by definition since all their exceptions are caught and are encapsulated in their result.

Deferred

From the docs: Deferred value is a non-blocking cancellable future; it is a Job with a result.

Deferred has few additional methods than Job to get completed results either successful or exceptional.

Coroutine Builders

There are multiple possible ways to create coroutines based on different requirements. In this section, let’s take a look at a few of them.

The two most commonly used coroutine builders are launch and async.

Note that these are extensions to CoroutineScope which means, without coroutine scope, you can not create new coroutine.

Launch

This creates new coroutine and returns a reference to coroutine as Job. Using this handle, you can manually cancel launched coroutine using the cancel method available on Job.

Launch is used to perform asynchronous fire and forget type of operations where you are not interested in the result of the operation.

    public fun CoroutineScope.launch(
      context: CoroutineContext = EmptyCoroutineContext,
      start: CoroutineStart = CoroutineStart.DEFAULT,
      block: suspend CoroutineScope.() -> Unit
    ): Job
Enter fullscreen mode Exit fullscreen mode

Async

This creates new coroutine and returns a reference to coroutine as Deferred. Using this handle, you can manually cancel launched coroutine using the cancel method available on Deferred.

Async is used to perform asynchronous computation where you expect a result of the computation in the future. Once the result is available, you want to perform other operations using this result.

    public fun <T> CoroutineScope.async(
      context: CoroutineContext = EmptyCoroutineContext,
      start: CoroutineStart = CoroutineStart.DEFAULT,
      block: suspend CoroutineScope.() -> T
    ): Deferred<T>
Enter fullscreen mode Exit fullscreen mode

It is very important to note that, when you lunch a coroutine with these constructs, the new job gets created. It inherits the coroutine context and corresponding parent job from where this was launched. The newly created job gets attached to the parent as a child node.

What is the default behavior of launch and async?

  • The execution of coroutine starts immediately

  • You can override this behavior by passing the different CoroutineStart argument while launching coroutine, for example, start = CoroutineStart.LAZY

  • Exceptions in coroutine cancel the parent job in the context which in turn cancels all other coroutines in the same scope

  • You can override this behavior by providing explicit SupervisorJob while creating CoroutineScope but this is out of scope for this write-up

  • Coroutines get executed on Default CoroutineDispatcher. This is backed by a shared pool of threads and the maximum number of threads is equal to the number of CPU cores (at least two). For example, if you want to perform some IO operation then you can override this behavior by passing custom CoroutineContext like context = Dispatchers.IO

withContext

We often come across a scenario where we want to perform both CPU and IO-bound operations. And it is very important to perform IO-bound operations on a different thread pool (possibly unbounded) than the CPU bound (threads = number of CPU cores). You can find more details on why should we have such segregation in this article

withContext construct is exactly designed for this purpose. Following example demonstrate the usage of this:

Above example, withContext uses dispatcher from the new context, shifting execution of the block into the different thread if a new dispatcher is specified, and back to the original dispatcher when it completes.

But if you look at the first two lines of output, both got executed on the same thread but the third line got executed on a different thread. This is because IO dispatcher shares threads with a Default dispatcher, so using withContext(Dispatchers.IO) does not lead to an actual switching to another thread & typically execution continues in the same thread. This avoids thread switching costs. But that thread gets marked as IO thread and gets removed from a thread pool. Hence you see line three got executed on a different thread.

    ========= Output =======
    DefaultDispatcher-worker-1 [@coroutine#1] doing CPU work
    DefaultDispatcher-worker-1 [@coroutine#1] doing IO work
    DefaultDispatcher-worker-2 [@coroutine#1] back to doing CPU work
Enter fullscreen mode Exit fullscreen mode

runBlocking

runBlocking has very limited use cases and highly discouraged. Avoid using this unless it is the last solution to your problem and you know what you are doing.

runBlocking starts a new coroutine and blocks the current thread until it’s completion. It is designed to bridge regular blocking code and libraries that are written in suspending/non-blocking style. It has very limited use cases, for example,

  • you can use runBlocking to block main application so that it will not terminate until all the coroutines launched within the application are completed either successfully or exceptionally

  • runBlocking is very useful in tests, you can wrap your tests in runBlocking. This will make sure your test code execute sequentially on the same thread and will not terminate until all coroutines are completed. You do not need to explicitly join or await them. Your tests look similar to the tests for synchronous code. In the following example, we want to test the increment function which increments a counter by 1 in async and non-blocking style. Test-should able to increment a counter, calls increment function 50 times and asserts that counter == 50

Find more details on this here.

Suspend

We have already come across suspend keyword many times in this post. Let’s go deep into what suspending functions are!

suspend is a keyword in kotlin which indicates function can be paused and resumed later point in time. You can use suspending functions to call long-running computations without blocking the main thread.

Rules for calling suspending functions:

  • from other suspending functions

  • from coroutine (suspending functions inherits coroutine context from coroutine from where it is invoked)

Eventually, Kotlin code gets converted to JVM bytecode and there is no notion of suspend keyword on JVM. Under the hood, the kotlin compiler converts suspend functions to another function without the suspend keyword, which takes an additional parameter of type Continuation which is nothing but a callback.

Following example shows kotlin suspending function and its compiled version in JVM

    // kotlin
    suspend fun updateUserInfo(name: String, id: Long): User

    // JVM
    public final Object updateUserInfo(String name, long id, Continuation<User> $completion)
Enter fullscreen mode Exit fullscreen mode

Continuation

Continuation is a simple interface defined in kotlin standard library which has only one method resumeWith(result)

    public interface Continuation<in T> {
        public val context: CoroutineContext
        public fun resumeWith(result: Result<T>)
    }
Enter fullscreen mode Exit fullscreen mode

Since v1.3, Continuation has only one method resumeWith(result: Result) , earlier it used to have two methods resume(value: T) & resumeWithException(exception: Throwable)
Result models eithersuccess with value of typeT or failure with the exception of type Throwable

You can very well relate this with callback style programming where you have methods like onSuccess and onFailure

If you are really interested to know how all these things work behind the scenes, I highly recommend you to watch Roman Elizarov talk — Deep Dive into Coroutines.

In this talk, Roman Elizarov mentions:

Continuation is a generic callback interface.
Whenever you invoke suspending function, you actually invoke callback, its just callback is implicit and you don't see it in code. You just code it in nice direct style with callback behind the scenes.

References

Top comments (0)