DEV Community

JackMacWindows
JackMacWindows

Posted on • Originally published at gist.github.com

Implementing coroutines in Swift using Swift Concurrency

One of my favorite features of Lua is its first-class support for coroutines. Recently, I started writing a new project using Swift, and I wanted to be able to use coroutines natively in my Swift code. In most Lua VMs, coroutines are a complex feature that require a lot of environmental support to be able to save and restore function calls. However, Swift includes the async and await keywords for pausing functions built into the language. Because of this, I decided to take a crack at using them to implement coroutines natively.

What are coroutines?

A coroutine is an object that represents a function which can be paused and resumed. This function may pause (or yield) itself at any time, which will return execution to the code that last resumed the coroutine. The coroutine can then be resumed later, and the code will pick up right where it left off.

A coroutine also represents a call stack, or a thread of execution. The main function can call other functions that can yield, at which point the entire coroutine pauses without affecting any of the calling functions. For example, assume a coroutine was initialized with a function foo. If foo calls bar, and at some point bar yields, foo's status will be completely unaffected, and in fact, foo will have no idea that bar even yielded.

In Lua's coroutines, yielding and resuming can also pass values between each coroutine. When yielding, a function can pass values back to its parent as return values from the corresponding resume call, and resuming can pass values to the corresponding yield call as well. This can be used to implement various design patterns, such as iterators, using a single function.

An example of an iterator using coroutines.

local function iterator(array)
    -- Loop over the array normally.
    for i = 1, #array do
        -- Send the index and value back to the function.
        coroutine.yield(i, array[i])
    end
    -- Send back nil to end the iteration.
    return nil
end

local array = {"foo", "bar", "baz"}
-- Loop using a coroutine as an iterator function.
-- The function returned by `coroutine.wrap` is called with `array` for each iteration.
for index, value in coroutine.wrap(iterator), array do
    print(index, value)
end
Enter fullscreen mode Exit fullscreen mode

Coroutines can be used to implement a rudimentary form of cooperative multitasking. A set of tasks can each be placed in their own coroutines, and a master "coroutine manager" can resume each of those coroutines in order until they finish. When one coroutine yields, it lets another one continue its work. This can give the impression that the functions are running in parallel - they can update each other's states between yields, and may process information out-of-order from each other. Using this basic structure, more complex forms of multitasking can be implemented, like OS processes and threads.

A diagram of how execution flows between two parallel coroutines.

Coroutine flowchart

Swift Concurrency

Swift Concurrency is a feature of the Swift language added in Swift 5.5. Its main feature is the async and await keywords, which allow functions to delegate long-running tasks to separate threads while the main code continues. This allows cooperative multitasking in a structured manner. These keywords may be familiar to JavaScript or Python developers, who have likely used this construct in those languages before.

Functions which are designated async may use the await keyword to call another async function with set arguments. This pauses execution of the current function, and allows other asynchronous tasks to complete. Once the awaited function returns, the original function is resumed with the return value (if used) sent back as the result of await.

To call an async function without waiting for a result, a Task object is used. The Task constructor takes a single async function, which may then call other async functions. A Task wraps around a call stack of async functions, representing a single thread of execution which can be paused and resumed. Tasks are created in an execution pool, which schedules when each task will be run. Using await or calling Task.yield() will pause the current task and allows other Tasks to resume.

A model of how async functions can pause and resume in a single thread of execution.

async await

Some of these things may sound like parts of coroutines as discussed above. Both coroutines and async functions are able to be paused and resumed at certain points. Both coroutines and await can pass values to and from a subtask. Both coroutines and Tasks represent a single thread of resumable execution.

However, there is one very important distinction between the two: coroutines have a parent-child relationship, where a coroutine resumes a child and yields to its parent; while Tasks and their main async functions are run in a pool, and thus have no parents. To mitigate this, I decided to implement my own Coroutine class to hold the parent-child relationship.

Initial implementation

To start, I created a Coroutine class with an initializer, a resume method, and a static yield function. The resume and yield methods take and return arrays of any value, which allows passing and returning multiple values. The initializer takes an async function for the body, which takes and returns [Any] arrays as well. To keep track of parent coroutines, I added a static running property, which holds the currently running coroutine.

public class Coroutine {
    public static var running: Coroutine? = nil
    public static func yield(with args: [Any]) async -> [Any] {}

    public init(_ body: ([Any]) async -> [Any]) {}
    public func resume(with args: [Any]) async -> [Any] {}
}
Enter fullscreen mode Exit fullscreen mode

Each coroutine holds a single Task variable, which is what runs the functions and holds the call stack. To keep track of whether the coroutine is paused, running, normal (running but waiting on another coroutine), or dead, it also has a state property, which is an enum type.

    public enum State {
        case suspended
        case running
        case normal
        case dead
    }
    public var state: State = .suspended
    private var task: Task<Void, Never>! = nil // no return value, never throws
Enter fullscreen mode Exit fullscreen mode

To implement the resuming and yielding functionality, I used the state variable to determine whether a task should continue. Resuming a coroutine involved setting the coroutine's status to running, and then waiting until it was no longer running. Likewise, yielding a coroutine would set the coroutine's status to suspended, and then waited until it was no longer suspended. This would ensure that only one coroutine was running at a time. An additional private member held the return values on each end.

    private var results: [Any] = [Any]()

    public func resume(with args: [Any] = [Any]()) async -> [Any] {
        // Set the currently running coroutine, and make the previous coroutine have normal status.
        let old = Coroutine.running
        old?.state = .normal
        Coroutine.running = self

        // Set the coroutine to running, pass the return values, and wait for its task to yield.
        results = args
        state = .running
        while state == .running {
            await Task.yield()
        }

        // Reset the running coroutine to its previous value, and return with the yield's return values.
        Coroutine.running = old
        old?.state = .running
        return results
    }

    public static func yield(with args: [Any] = [Any]()) async -> [Any] {
        if let coro = Coroutine.running {
            coro.results = args
            coro.state = .suspended
            while coro.state != .running {
                await Task.yield()
            }
            return coro.results
        }
    }
Enter fullscreen mode Exit fullscreen mode

The initializer function simply created a new Task using a small wrapper to handle the first resume and last return.

    public init(_ body: ([Any]) async -> [Any]) {
        task = Task {
            // Wait for the task to be resumed for the first time.
            while self.state != .running {
                await Task.yield()
            }
            // Call the body function.
            let res = await body(self.results)
            // Set the coroutine as dead and set return values.
            self.state = .dead
            self.results = res
        }
    }
Enter fullscreen mode Exit fullscreen mode

This approach worked, and my small test suite passed properly. However, astute readers will notice a huge hole in this approach. Task.yield() does not wait for anything - it simply lets other tasks step forward, and then resumes itself, which is why the while loop is required. This means that every coroutine is consuming 100% CPU until they get resumed, and because tasks can be delegated to multiple CPU cores, this can quickly overload the system.

Obviously, this isn't a suitable approach for a complete application. But luckily, there's a mechanism included in the concurrency features that helps fix this issue.

Continuations

In JavaScript, many older asynchronous functions use a callback parameter to specify what code to run once the async task completes. The function itself would return immediately, but the callback function would be called (often with result parameters) after the asynchronous task was finished, which would continue the program's execution. But this often led to callback hell, a situation where a program gets extremely deeply nested because it used multiple asynchronous functions in series:

fooAsyncCallback(a, b => {
    barAsyncCallback(b, c => {
        bazAsyncCallback(c, d => {
            d.processCallback(res => {
                console.log(res)
            })
        })
    })
})
Enter fullscreen mode Exit fullscreen mode

To fix this, JavaScript introduced the Promise type, which allowed async functions to be called in a serial manner using chains of .then calls:

fooAsyncPromise(a)
    .then(b => barAsyncPromise(b))
    .then(c => bazAsyncPromise(c))
    .then(d => d.processPromise())
    .then(res => console.log(res))
Enter fullscreen mode Exit fullscreen mode

Later on, async/await wrapped around this functionality by automatically breaking an async function into Promise callbacks during compilation, allowing true structured programming:

let b = await fooAsyncPromise(a)
let c = await barAsyncPromise(b)
let d = await bazAsyncPromise(c)
let res = await d.processPromise()
console.log(res)
Enter fullscreen mode Exit fullscreen mode

But this requires functions to implement a function that returns Promises. If you're stuck with an old callback-based function, you normally have to break the chain and start a new one inside the callback. This is where the Promise constructor comes in. It takes another callback as an argument - but this callback is used to call the async function. The callback receives an argument called the resolver, which is used as the callback for the async function. This allows using callback-based functions with Promises and async.

let b = await fooAsyncPromise(a)
let c = await barAsyncPromise(b)
let d = await bazAsyncPromise(c)
let res = await new Promise(resolve => {
    d.processCallback(resolve) // call the function using the resolver function
    // calling resolve() will cause the await statement to continue
})
console.log(res)
Enter fullscreen mode Exit fullscreen mode

Like JavaScript, Swift also has a procedure for using callback-style functions with async/await. A continuation represents the same thing as a JavaScript Promise, and works in a similar way.

To create a continuation, you use one of the with*Continuation global functions. There are four different functions, depending on whether you want a checked or unsafe continuation (more on that later), and a throwing or non-throwing callback function. These functions take a single block/closure, which takes a continuation object, which is then resumed inside the async function's callback.

Here's a translation of the above JavaScript code into Swift using closures:

let b = await fooAsync(a)
let c = await barAsync(b)
let d = await bazAsync(c)
let res = await withCheckedContinuation { continuation in
    d.process { result in
        continuation.resume(returning: result)
    }
}
print(res)
Enter fullscreen mode Exit fullscreen mode

One useful feature of the continuation functions is that the currently running task gets paused until the continuation is resumed. This is super handy when we're looking for a way to pause a task unconditionally. But a drawback of continuations is that they need to be resumed exactly once - the task can't just wait on the same continuation multiple times for multiple yields, and it also can't just leave the task hanging if the continuation's no longer needed, or the task will leak resources.

(This is where the checked/unsafe variants come into play - checked continuations have built-in checks to make sure they are resumed exactly once, while unsafe continuations don't. Checked continuations are usually used while debugging, and can be migrated to unsafe continuations to optimize for speed.)

Throwing continuations can also have errors passed as resume values using the resume(throwing: Error) method, which propagates the error back to the with*ThrowingContinuation function. The coroutine can use this to send errors back to the parent coroutine.

The coroutine will store a property that holds a continuation for later use. To pause a task, the coroutine will create a new continuation, and store it in the continuation property. After that, it'll resume the old continuation with the results to send. Finally, the with*Continuation function's block returns, which pauses the task, and waits for the continuation to be resumed.

Putting it together

First, we'll update the resume function with continuations. We'll use withCheckedThrowingContinuation to create a checked, throwable continuation, which'll allow us to propagate errors back to the resume call.

    public enum CoroutineError: Error {
        case notSuspended
        case noCoroutine
        case cancel
    }

    private var continuation: CheckedContinuation<Void, Error>!
    public func resume(with args: [Any] = [Any]()) async throws -> [Any] {
        // Error if the coroutine isn't currently suspended.
        if state != .suspended {
            throw CoroutineError.notSuspended
        }

        // Set the currently running coroutine, and make the previous coroutine have normal status.
        let old = Coroutine.running
        old?.state = .normal
        Coroutine.running = self

        // NEW: Create a continuation, resume the coroutine, and wait for the coroutine to finish.
        self.state = .running
        let res = try await withCheckedThrowingContinuation {nextContinuation in
            let c = continuation!
            continuation = nextContinuation
            c.resume(returning: args)
        }

        // Reset the running coroutine to its previous value, and return with the yield's return values.
        Coroutine.running = old
        old?.state = .running
        return res
    }
Enter fullscreen mode Exit fullscreen mode

The yield function will work in a similar way. We'll take advantage of throwing errors later on.

    public static func yield(with args: [Any] = [Any]()) async throws -> [Any] {
        // Yielding does not work if there is no currently running coroutine.
        if Coroutine.running == nil {
            throw CoroutineError.noCoroutine
        }
        // Set the currently running coroutine as suspended.
        let coro = Coroutine.running
        coro!.state = .suspended
        // Create a new continuation, and wait for its response.
        return try await withCheckedThrowingContinuation {continuation in
            let c = coro!.continuation!
            coro!.continuation = continuation
            c.resume(returning: args)
        }
    }
Enter fullscreen mode Exit fullscreen mode

The initializer function will also be modified to use a continuation, instead of busy waiting for the first resume. But we need to wait a little bit for the task to store the coroutine - otherwise, the resume method could be called before the continuation is set.

    public init(for body: @escaping ([Any]) async throws -> [Any]) async {
        // Create the task.
        task = Task {
            // Create the continuation for the first resume.
            let args = try await withCheckedThrowingContinuation {continuation in
                self.continuation = continuation
            }
            do {
                // Call the body function.
                let res = try await body(args)

                // Set the coroutine as dead, and send the result back as the final yield.
                self.state = .dead
                self.continuation.resume(returning: res)
            } catch {
                // Catch any thrown errors, and throw them back to the parent resume.
                self.state = .dead
                self.continuation.resume(throwing: error)
            }
        }
        // Wait for the continuation to be created in the other task.
        while continuation == nil {
            await Task.yield()
        }
    }
Enter fullscreen mode Exit fullscreen mode

This code will work fine for running coroutines normally. However, if a coroutine is deleted before its body returns or errors, the task will be left hanging because the continuation was never resumed. This will also print a warning message to the console, since we're using checked continuations.

To resolve this, we'll add a deinitializer to resume the continuation. We'll resume it with an error to make the function exit as quickly as possible. This means that body functions will need to make sure to propagate the .cancel error up to the main function, which is a bit annoying, but I haven't figured out to get around this yet.

    deinit {
        if _state == .suspended {
            continuation.resume(throwing: CoroutineError.cancel)
        }
    }
Enter fullscreen mode Exit fullscreen mode

Finally, we need to tweak the initializer and yield to use a weak reference to the coroutine, as these will make the task enter a retain cycle until the task completes.

    public init(for body: @escaping ([Any]) async throws -> [Any]) async {
        // Create the task.
        // NEW: Use a weak self to avoid retaining the coroutine inside itself.
        task = Task { [weak self] in
            // Create the continuation for the first resume.
            let args = try await withCheckedThrowingContinuation {continuation in
                self!.continuation = continuation
            }
            do {
                // Call the body function.
                let res = try await body(args)

                // Set the coroutine as dead, and send the result back as the final yield.
                self?.state = .dead
                self?.continuation.resume(returning: res)
            } catch {
                // Catch any thrown errors, and throw them back to the parent resume.
                self?.state = .dead
                self?.continuation.resume(throwing: error)
            }
        }
        // Wait for the continuation to be created in the other task.
        while continuation == nil {
            await Task.yield()
        }
    }

    public static func yield(with args: [Any] = [Any]()) async throws -> [Any] {
        // Yielding does not work if there is no currently running coroutine.
        if Coroutine.running == nil {
            throw CoroutineError.noCoroutine
        }
        // Set the currently running coroutine as suspended.
        // NEW: Use an unowned reference to avoid retaining the coroutine after yielding.
        unowned let coro = Coroutine.running
        coro!.state = .suspended
        // Create a new continuation, and wait for its response.
        return try await withCheckedThrowingContinuation {continuation in
            let c = coro!.continuation!
            coro!.continuation = continuation
            c.resume(returning: args)
        }
    }
Enter fullscreen mode Exit fullscreen mode

Wrapping up

Coroutines are a useful primitive for various tasks both synchronous and asynchronous. Using Swift's comprehensive concurrency model, we can implement a coroutine object in less than 100 lines of code. This approach can also be used in other languages that have similar constructs, including JavaScript.

The complete library source is listed in this Gist. This version includes a couple of additions, like the ability to call the coroutine directly to resume it. It's donated to the public domain, so feel free to use it in any project, but I'd appreciate a link back to this article as a reference.

Top comments (0)