DEV Community

Cover image for Master Python Coroutines: Create Custom Async Tools for Powerful Concurrent Apps
Aarav Joshi
Aarav Joshi

Posted on

Master Python Coroutines: Create Custom Async Tools for Powerful Concurrent Apps

Coroutines in Python are a powerful tool for writing asynchronous code. They've revolutionized how we handle concurrent operations, making it easier to build scalable and efficient applications. I've spent a lot of time working with coroutines, and I'm excited to share some insights on creating custom asynchronous primitives.

Let's start with the basics. Coroutines are special functions that can be paused and resumed, allowing for cooperative multitasking. They're the foundation of Python's async/await syntax. When you define a coroutine, you're essentially creating a function that can yield control back to the event loop, allowing other tasks to run.

To create a custom awaitable object, you need to implement the await method. This method should return an iterator. Here's a simple example:

class CustomAwaitable:
    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield
        return self.value

async def use_custom_awaitable():
    result = await CustomAwaitable(42)
    print(result)  # Output: 42
Enter fullscreen mode Exit fullscreen mode

This CustomAwaitable class can be used with the await keyword, just like built-in awaitables. When awaited, it yields control once, then returns its value.

But what if we want to create more complex asynchronous primitives? Let's look at implementing a custom semaphore. Semaphores are used to control access to a shared resource by multiple coroutines:

import asyncio

class CustomSemaphore:
    def __init__(self, value=1):
        self._value = value
        self._waiters = []

    async def acquire(self):
        while self._value <= 0:
            fut = asyncio.get_running_loop().create_future()
            self._waiters.append(fut)
            await fut
        self._value -= 1

    def release(self):
        self._value += 1
        if self._waiters:
            asyncio.get_running_loop().call_soon_threadsafe(self._waiters.pop().set_result, None)

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self.release()

async def worker(semaphore, num):
    async with semaphore:
        print(f"Worker {num} acquired the semaphore")
        await asyncio.sleep(1)
    print(f"Worker {num} released the semaphore")

async def main():
    semaphore = CustomSemaphore(2)
    tasks = [asyncio.create_task(worker(semaphore, i)) for i in range(5)]
    await asyncio.gather(*tasks)

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

This CustomSemaphore class implements the acquire and release methods, as well as the async context manager protocol (aenter and aexit). It allows a maximum of two coroutines to acquire the semaphore simultaneously.

Now, let's talk about creating efficient event loops. While Python's asyncio provides a robust event loop implementation, there might be cases where you need a custom one. Here's a basic example of a custom event loop:

import time
from collections import deque

class CustomEventLoop:
    def __init__(self):
        self._ready = deque()
        self._stopping = False

    def call_soon(self, callback, *args):
        self._ready.append((callback, args))

    def run_forever(self):
        while not self._stopping:
            self._run_once()

    def _run_once(self):
        ntodo = len(self._ready)
        for _ in range(ntodo):
            callback, args = self._ready.popleft()
            callback(*args)

    def stop(self):
        self._stopping = True

    def run_until_complete(self, coro):
        def _done_callback(fut):
            self.stop()

        task = self.create_task(coro)
        task.add_done_callback(_done_callback)
        self.run_forever()
        return task.result()

    def create_task(self, coro):
        task = Task(coro, self)
        self.call_soon(task._step)
        return task

class Task:
    def __init__(self, coro, loop):
        self._coro = coro
        self._loop = loop
        self._done = False
        self._result = None
        self._callbacks = []

    def _step(self):
        try:
            if self._done:
                return
            result = self._coro.send(None)
            if isinstance(result, SleepHandle):
                result._task = self
                self._loop.call_soon(result._wake_up)
            else:
                self._loop.call_soon(self._step)
        except StopIteration as e:
            self.set_result(e.value)

    def set_result(self, result):
        self._result = result
        self._done = True
        for callback in self._callbacks:
            self._loop.call_soon(callback, self)

    def add_done_callback(self, callback):
        if self._done:
            self._loop.call_soon(callback, self)
        else:
            self._callbacks.append(callback)

    def result(self):
        if not self._done:
            raise RuntimeError('Task is not done')
        return self._result

class SleepHandle:
    def __init__(self, duration):
        self._duration = duration
        self._task = None
        self._start_time = time.time()

    def _wake_up(self):
        if time.time() - self._start_time >= self._duration:
            self._task._loop.call_soon(self._task._step)
        else:
            self._task._loop.call_soon(self._wake_up)

async def sleep(duration):
    return SleepHandle(duration)

async def example():
    print("Start")
    await sleep(1)
    print("After 1 second")
    await sleep(2)
    print("After 2 more seconds")
    return "Done"

loop = CustomEventLoop()
result = loop.run_until_complete(example())
print(result)
Enter fullscreen mode Exit fullscreen mode

This custom event loop implements basic functionality like running tasks, handling coroutines, and even a simple sleep function. It's not as feature-rich as Python's built-in event loop, but it demonstrates the core concepts.

One of the challenges in writing asynchronous code is managing task priorities. While Python's asyncio doesn't provide built-in priority queues for tasks, we can implement our own:

import asyncio
import heapq

class PriorityEventLoop(asyncio.AbstractEventLoop):
    def __init__(self):
        self._ready = []
        self._stopping = False
        self._clock = 0

    def call_at(self, when, callback, *args, context=None):
        handle = asyncio.Handle(callback, args, self, context)
        heapq.heappush(self._ready, (when, handle))
        return handle

    def call_later(self, delay, callback, *args, context=None):
        return self.call_at(self._clock + delay, callback, *args, context=context)

    def call_soon(self, callback, *args, context=None):
        return self.call_at(self._clock, callback, *args, context=context)

    def time(self):
        return self._clock

    def stop(self):
        self._stopping = True

    def is_running(self):
        return not self._stopping

    def run_forever(self):
        while self._ready and not self._stopping:
            self._run_once()

    def _run_once(self):
        if not self._ready:
            return
        when, handle = heapq.heappop(self._ready)
        self._clock = when
        handle._run()

    def create_task(self, coro):
        return asyncio.Task(coro, loop=self)

    def run_until_complete(self, future):
        asyncio.futures._chain_future(future, self.create_future())
        self.run_forever()
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')
        return future.result()

    def create_future(self):
        return asyncio.Future(loop=self)

async def low_priority_task():
    print("Low priority task started")
    await asyncio.sleep(2)
    print("Low priority task finished")

async def high_priority_task():
    print("High priority task started")
    await asyncio.sleep(1)
    print("High priority task finished")

async def main():
    loop = asyncio.get_event_loop()
    loop.call_later(0.1, loop.create_task, low_priority_task())
    loop.call_later(0, loop.create_task, high_priority_task())
    await asyncio.sleep(3)

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

This PriorityEventLoop uses a heap queue to manage tasks based on their scheduled execution time. You can assign priorities by scheduling tasks with different delays.

Handling cancellation gracefully is another important aspect of working with coroutines. Here's an example of how to implement cancellable tasks:

import asyncio

async def cancellable_operation():
    try:
        print("Operation started")
        await asyncio.sleep(5)
        print("Operation completed")
    except asyncio.CancelledError:
        print("Operation was cancelled")
        # Perform any necessary cleanup
        raise  # Re-raise the CancelledError

async def main():
    task = asyncio.create_task(cancellable_operation())
    await asyncio.sleep(2)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Main: task was cancelled")

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

In this example, the cancellable_operation catches the CancelledError, performs any necessary cleanup, and then re-raises the exception. This allows for graceful handling of cancellation while still propagating the cancellation status.

Let's explore implementing custom async iterators. These are useful for creating sequences that can be iterated over asynchronously:

class AsyncRange:
    def __init__(self, start, stop, step=1):
        self.start = start
        self.stop = stop
        self.step = step

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.start >= self.stop:
            raise StopAsyncIteration
        value = self.start
        self.start += self.step
        await asyncio.sleep(0.1)  # Simulate some async work
        return value

async def main():
    async for i in AsyncRange(0, 5):
        print(i)

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

This AsyncRange class implements the async iterator protocol, allowing it to be used in async for loops.

Finally, let's look at implementing custom async context managers. These are useful for managing resources that need to be acquired and released asynchronously:

class AsyncResource:
    async def __aenter__(self):
        print("Acquiring resource")
        await asyncio.sleep(1)  # Simulate async acquisition
        return self

    async def __aexit__(self, exc_type, exc, tb):
        print("Releasing resource")
        await asyncio.sleep(1)  # Simulate async release

async def main():
    async with AsyncResource() as resource:
        print("Using resource")
        await asyncio.sleep(1)

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

This AsyncResource class implements the aenter and aexit methods, allowing it to be used with the async with statement.

In conclusion, Python's coroutine system provides a powerful foundation for building custom asynchronous primitives. By understanding the underlying mechanisms and protocols, you can create tailored solutions for specific asynchronous challenges, optimize performance in complex concurrent scenarios, and extend Python's async capabilities. Remember, while these custom implementations are great for learning and specific use cases, Python's built-in asyncio library is highly optimized and should be your go-to for most scenarios. Happy coding!


Our Creations

Be sure to check out our creations:

Investor Central | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)