DEV Community

Red John
Red John

Posted on

Asynchronously Iterating Over Event Emitters in TypeScript with Async Generators

Introduction

In modern web development, we often deal with events, whether it's handling incoming WebSocket messages, server-sent events (SSE), or data streams from services like Redis Pub/Sub. While Node.js provides event-driven capabilities, it lacks an out-of-the-box way to asynchronously iterate over events using for await...of loops.

In this post, I'll walk you through a simple yet powerful way to create an asynchronous event iterator using TypeScript and AsyncGenerator. This approach is designed to allow you to consume events from any kind of event emitter in a clean and predictable way, with full control over cancellation and cleanup logic.

The Use Case: Redis Pub/Sub

In one of my recent projects, I needed to listen to Redis Pub/Sub channels and dispatch server-sent events (SSE) asynchronously to connected clients. The challenge was handling incoming events without overwhelming the system while allowing the consumer to cancel the event stream at any time.

The solution? An event iterator that converts any event emitter (such as Redis Pub/Sub) into an asynchronous iterable. This allows us to process events in a controlled manner and gracefully handle cancellation when necessary.

Let’s dive into the implementation.

The Code

export type Context<T> = {
    emit: (value: T) => void;
    cancel: () => void;
};

export type CleanupFn = () => void | Promise<void>;

export type Subscriber<T> = (
    context: Context<T>,
) => void | CleanupFn | Promise<CleanupFn | void>;

export async function* createEventIterator<T>(
    subscriber: Subscriber<T>,
): AsyncGenerator<T> {
    const events: T[] = [];
    let cancelled = false;

    // Create a promise that resolves whenever a new event is added to the events array
    let resolveNext: (() => void) | null = null;

    const emit = (event: T) => {
        events.push(event);
        // If we are awaiting for a new event, resolve the promise
        if (resolveNext) {
            resolveNext();
            resolveNext = null;
        }
    };

    const cancel = () => {
        cancelled = true;
    };

    const unsubscribe = await subscriber({ emit, cancel });

    try {
        while (!cancelled) {
            // If there are events in the queue, yield the next event
            if (events.length > 0) {
                yield events.shift()!;
            } else {
                // Wait for the next event
                await new Promise<void>((resolve) => {
                    resolveNext = resolve;
                });
            }
        }

        // Process any remaining events that were emitted before cancellation.
        while (events.length > 0) {
            yield events.shift()!;
        }
    } finally {
        await unsubscribe?.();
    }
}
Enter fullscreen mode Exit fullscreen mode

How It Works

This function accepts a subscriber function that you can hook into any event emitter or pub/sub system. The subscriber provides two essential methods:

  1. emit: Allows the subscriber to push new events into the iterator.
  2. cancel: Provides a way to signal that the iteration should stop.

The function returns an AsyncGenerator<T>, allowing you to iterate over events using a for await...of loop.

Breaking Down the Code

  1. Context Object:
    The Context<T> type provides an interface to emit new events or cancel the subscription. The subscriber uses this context to control the flow of events.

  2. Event Queue:
    The events: T[] array serves as a buffer to store emitted events. The generator will process these events one by one. If there are no events in the queue, it will wait for the next event to be emitted.

  3. Emit Logic:
    The emit function adds new events to the queue and resolves any pending promise (i.e., if the generator is waiting for new events).

  4. Cancellation:
    If the cancel function is called, it sets a flag (cancelled = true) to signal that the loop should exit. Any remaining events in the queue will still be processed before the generator completes.

  5. Cleanup:
    After cancellation, the generator will invoke the unsubscribe function (if provided) to perform any necessary cleanup. This is especially important for unsubscribing from external systems like Redis or cleaning up resources.

Example: Listening to Redis Pub/Sub

Let’s see how we can use this event iterator to listen to Redis Pub/Sub and asynchronously iterate over the incoming messages.

import Redis from 'ioredis';

function redisEventIterator(channel: string) {
    const client = new Redis();

    return createEventIterator<string>(({ emit, cancel }) => {
        const messageHandler = (channel: string, message: string) => {
            emit(message);
        };

        // Subscribe to the channel
        client.subscribe(channel);
        client.on('message', messageHandler);

        // Cleanup function to unsubscribe and disconnect
        return async () => {
            client.off('message', messageHandler);
            await client.unsubscribe(channel);
            await client.quit();
        };
    });
}

// Usage
(async () => {
    for await (const message of redisEventIterator('my-channel')) {
        console.log('New message:', message);

        // You can cancel the event stream if needed
        if (message === 'STOP') {
            break;
        }
    }
})();
Enter fullscreen mode Exit fullscreen mode

In this example, we use createEventIterator to subscribe to a Redis Pub/Sub channel and asynchronously iterate over the messages. Each time a new message arrives, it is emitted into the generator, where we can process it in real-time. If a specific message (e.g., "STOP") is received, we break the loop and unsubscribe from Redis.

Example: Using EventEmitter

Here's how you can use createEventIterator with Node.js's EventEmitter:

import { EventEmitter } from 'events';

function eventEmitterIterator(emitter: EventEmitter, eventName: string) {
    return createEventIterator<string>(({ emit, cancel }) => {
        const eventHandler = (data: string) => emit(data);

        emitter.on(eventName, eventHandler);

        // Cleanup function to remove the listener
        return () => {
            emitter.off(eventName, eventHandler);
        };
    });
}

// Usage
(async () => {
    const emitter = new EventEmitter();

    // Simulate event emissions
    setTimeout(() => emitter.emit('data', 'First event'), 1000);
    setTimeout(() => emitter.emit('data', 'Second event'), 2000);
    setTimeout(() => emitter.emit('data', 'STOP'), 3000);

    for await (const event of eventEmitterIterator(emitter, 'data')) {
        console.log('Received event:', event);

        if (event === 'STOP') {
            break;
        }
    }
})();
Enter fullscreen mode Exit fullscreen mode

In this example:

  • We use EventEmitter to emit events, which are captured by createEventIterator.
  • The iterator listens for the 'data' event and processes it asynchronously.
  • Similar to the Redis example, we can stop the iteration when a specific event ('STOP') is received.

Benefits of This Approach

  • Asynchronous Control: By leveraging the AsyncGenerator, we can handle events asynchronously, process them at our own pace, and pause processing when needed.

  • Cancellation: The ability to cancel the event stream at any time makes this approach flexible, especially in real-world scenarios where connections may need to be closed gracefully.

  • General-Purpose: This iterator can be used for any event emitter or Pub/Sub system, making it versatile for different applications.

Conclusion

Event-driven architectures are a cornerstone of many modern web applications, but they can become tricky to manage when we need to control the flow of events asynchronously. With the power of AsyncGenerator in TypeScript, you can build elegant solutions like this event iterator, making your event-handling code cleaner and easier to maintain.

I hope this post helps you get started with async iterators for your own event emitters. If you have any questions or thoughts, feel free to share them in the comments!

Top comments (0)