DEV Community

Cover image for Asynchronous Generators and Pipelines in JavaScript
Nested Software
Nested Software

Posted on • Edited on • Originally published at nestedsoftware.com

Asynchronous Generators and Pipelines in JavaScript

Introducing Asynchronous Generators

Both this article and the last one, The Iterators Are Coming, which deals with asynchronous iterators, were motivated by a question that occurred to me as I was programming with some async functions: Would it be possible to yield in an async function? In other words, can we combine an async function with a generator function?

To explore this question, let’s start with a normal synchronous generator function, numberGenerator:

const random = (min, max) => Math.floor(Math.random() * (max - min + 1)) + min

const getValue = () => {
    return random(1,10)
}

const numberGenerator = function* () {
    for (let i=0; i<5; i++) {
        const value = getValue() 
        yield value**2
    }
}

const main = () => {
    const numbers = numberGenerator()
    for (const v of numbers) {
        console.log('number = ' + v)
    }
}

main()
Enter fullscreen mode Exit fullscreen mode

This code produces the expected squares of 5 random numbers:

C:\dev>node gen.js
number = 1
number = 64
number = 36
number = 25
number = 49
Enter fullscreen mode Exit fullscreen mode

My idea was to change getValue to return a promise and to modify numberGenerator to await this promise, then yield a value. I tried something like the following:

const random = (min, max) => Math.floor(Math.random() * (max - min + 1)) + min

const getValue = () => {
    //return promise instead of value
    return new Promise(resolve=>{
        setTimeout(()=>resolve(random(1,10)), 1000)
    })
}

const numberGenerator = function* () {
    for (let i=0; i<5; i++) {
        const value = await getValue() //await promise
        yield value**2
    }
}

const main = () => {
    const numbers = numberGenerator()
    for (const v of numbers) {
        console.log('number = ' + v)
    }
}

main()
Enter fullscreen mode Exit fullscreen mode

Let's see what happens:

C:\dev\gen.js:12
                const value = await getValue() //await promise
                              ^^^^^

SyntaxError: await is only valid in async function
    at new Script (vm.js:51:7)
Enter fullscreen mode Exit fullscreen mode

Okay, that makes sense: We need to make our numberGenerator function async. Let's try that!

const numberGenerator = async function* () { //added async
Enter fullscreen mode Exit fullscreen mode

Does it work?

C:\dev\gen.js:10
const numberGenerator = async function* () { //added async
                                      ^

SyntaxError: Unexpected token *
    at new Script (vm.js:51:7)
Enter fullscreen mode Exit fullscreen mode

Ouch, it didn't work. This is what led me to do some online searching on the topic. It turns out this kind of functionality is going to be released in ES2018, and we can use it already in a recent version of node with the --harmony-async-iteration flag.

Let's see this in action:

const timer = () => setInterval(()=>console.log('tick'), 1000)

const random = (min, max) => Math.floor(Math.random() * (max - min + 1)) + min

const getValue = () => {
    //return promise instead of value
    return new Promise(resolve=>{
        setTimeout(()=>resolve(random(1,10)), 1000)
    })
}

const numberGenerator = async function* () { //added async
    for (let i=0; i<5; i++) {
        const value = await getValue() //await promise
        yield value**2
    }
}

//main is 'async'
const main = async () => {
    const t = timer()
    const numbers = numberGenerator()

    //use 'for await...of' instead of 'for...of'
    for await (const v of numbers) {
        console.log('number = ' + v)
    }

    clearInterval(t)
}

main()
Enter fullscreen mode Exit fullscreen mode

There are a few small changes from the previous version of the code:

  • The main function's for...of loop becomes a for await...of loop.
  • Since we are using await, main has to be marked as async

A timer was also added so we can confirm that the generator is indeed asynchronous.

Let's take a look at the results:

C:\dev>node --harmony-async-iteration gen.js
tick
number = 16
tick
number = 1
tick
number = 100
tick
number = 100
tick
number = 49
Enter fullscreen mode Exit fullscreen mode

It worked!

The yield in an async generator function is similar to the yield in a normal (synchronous) generator function. The difference is that in the regular version, yield produces a {value, done} tuple, whereas the asynchronous version produces a promise that resolves to a {value, done} tuple.

If you yield a promise, the JavaScript runtimes does something a bit sneaky: It still produces its own promise that resolves to a {value, done} tuple, but the value attribute in that tuple will be whatever your promise resolves to.

Pipelining Asynchronous Generators Together

Let's look at a neat little application of this technology: We will create an asynchronous generator function that drives another one to produce statistics on an asynchronous stream of numbers.

This kind of pipeline can be used to perform arbitrary transformations on asynchronous data streams.

First we'll write an asynchronous generator that produces an endless stream of values. Every second it generates a random value between 0 and 100:

const random = (min, max) => Math.floor(Math.random() * (max - min + 1)) + min

const asyncNumberGenerator = async function* () {
    while (true) {
        const randomValue = random(0,100)

        const p = new Promise(resolve=>{
            setTimeout(()=>resolve(randomValue), 1000)
        })      

        yield p
    }
}
Enter fullscreen mode Exit fullscreen mode

Now we'll write a function, createStatsReducer. This function returns a callback function, exponentialStatsReducer, that will be used to iteratively calculate the exponential moving average on this stream of data:

const createStatsReducer = alpha => { 
    const beta = 1 - alpha

    const exponentialStatsReducer = (newValue, accumulator) => {
        const redistributedMean = beta * accumulator.mean

        const meanIncrement = alpha * newValue

        const newMean = redistributedMean + meanIncrement

        const varianceIncrement = alpha * (newValue - accumulator.mean)**2

        const newVariance = beta * (accumulator.variance + varianceIncrement)

        return {
            lastValue: newValue,
            mean: newMean,
            variance: newVariance
        }
    }

    return exponentialStatsReducer
}
Enter fullscreen mode Exit fullscreen mode

Next up we have a second asynchronous generator function, asyncReduce. This one applies a reducer to an asynchronous iterable. It works like JavaScript's built-in Array.prototype.reduce. However, the standard version goes through an entire array to produce a final value, whereas our version applies the reduction lazily. This allows us to use an infinite sequence of values (our asynchronous number generator above) as the data source:

const asyncReduce = async function* (iterable, reducer, accumulator) {
    for await (const item of iterable) {
        const reductionResult = reducer(item, accumulator)

        accumulator = reductionResult

        yield reductionResult
    }
}
Enter fullscreen mode Exit fullscreen mode

Let's tie this all together. The code below will pipe an endless sequence of asynchronously-generated numbers into our asynchronous reduce. We will loop through the resulting values (forever), obtaining the updated mean, variance, and standard deviation as new values arrive:

const timer = () => setInterval(()=>console.log('tick'), 1000)

const main = async () => {
    const t = timer()

    const numbers = asyncNumberGenerator()

    const firstValue = await numbers.next()

    //initialize the mean to the first value
    const initialValue = { mean: firstValue.value, variance: 0 }

    console.log('first value = ' + firstValue.value)

    const statsReducer = createStatsReducer(0.1)

    const reducedValues = asyncReduce(numbers, statsReducer, initialValue)

    for await (const v of reducedValues) {
        const lastValue = v.lastValue
        const mean = v.mean.toFixed(2)
        const variance = v.variance.toFixed(2)
        const stdev = Math.sqrt(v.variance).toFixed(2)

        console.log(`last value = ${lastValue}, stats = { mean: ${mean}`
            + `, variance: ${variance}, stdev: ${stdev} }`)
    }

    clearInterval(t)
}

main()
Enter fullscreen mode Exit fullscreen mode

Let's take a look at some sample output:

C:\dev>node --harmony-async-iteration async_stats.js
tick
first value = 51
tick
last value = 97, stats = { mean: 55.60, variance: 190.44, stdev: 13.80 }
tick
last value = 73, stats = { mean: 57.34, variance: 198.64, stdev: 14.09 }
tick
last value = 11, stats = { mean: 52.71, variance: 372.05, stdev: 19.29 }
tick
last value = 42, stats = { mean: 51.64, variance: 345.16, stdev: 18.58 }
tick
last value = 42, stats = { mean: 50.67, variance: 319.00, stdev: 17.86 }
tick
last value = 60, stats = { mean: 51.60, variance: 294.93, stdev: 17.17 }
^C
Enter fullscreen mode Exit fullscreen mode

We now get continually updating statistics on our asynchronous stream of values. Neat!

I think that asynchronous generator functions will be especially useful to do processing on sources of asynchronous data along these lines.

Let me know what you think, or if you have ideas for other ways asynchronous generators and iterators can be used!

References:

Related:

Top comments (0)