Introduction to Node.js streams
Streams are a fundamental building block of many Node.js functionalities. They provide a way to consume data as soon as itโs available and send the output as soon as the application produces it.
Just think about data-intensive applications which canโt afford to accumulate all data in memory and process them all at once. In this case, applications must chunk data not to overflow the memory, which is what streams do.
A crucial example of stream usage in Node.js is the HTTP server.
HTTP server use streams in order never to buffer entire requests or responses so that the user can stream chunked data.
There are four fundamental stream types within Node.js:
- Writable: streams to which data can be written
- Readable: streams from which data can be consumed
-
Duplex: streams that are both
Readable
andWritable
-
Transform:
Duplex
streams that can transform the data as it is written and read
In this first blog post, we will explore Readable and Writable streams reproducing moon phases over an HTTP server.
๐ ๐ ๐ ๐ ๐๐๐๐๐
Letโs code
The goal is to create a simple server which continuously shows moon phases with emojis. So we need to define two streams:
- A Readable stream from which we can read the emojis
- A Writable stream which is represented by the HTTP server response
Let's start by initializing the project and installing the required dependencies.
For this project, I'm using Node.js v16.17.1
.
npm init -y
npm install node-emoji
The library node-emoji
adds emojis support to Node.js
We need only one file, so let's create an index.js
file. Firstly, we focus on building the Readable stream, which is the core task of this project:
class MoonPhasesStream extends Readable {
constructor (options) {
super(options);
this.moonPhases = [];
this.index = 0;
this.moonPhases.push(emoji.get(':full_moon:'));
this.moonPhases.push(emoji.get(':waning_gibbous_moon:'));
this.moonPhases.push(emoji.get(':last_quarter_moon:'));
this.moonPhases.push(emoji.get(':waning_crescent_moon:'));
this.moonPhases.push(emoji.get(':new_moon:'));
this.moonPhases.push(emoji.get(':waxing_crescent_moon:'));
this.moonPhases.push(emoji.get(':first_quarter_moon:'));
this.moonPhases.push(emoji.get(':moon:'));
}
_read () {
const LAST_MOON_PHASE = 8;
const FIRST_MOON_PHASE = 0;
this.push(this.moonPhases[this.index++]);
if (this.index === LAST_MOON_PHASE) {
this.index = FIRST_MOON_PHASE;
}
}
}
We've just created a custom Readable stream; wow.
Let's break it down:
- First thing first, to define a custom Readable stream, we need to create a class that extends the Readable stream base class
- In the constructor, we initialize an empty array and fill it with the eight moon phases emojis. We also need an index to restart the moon phases when it ends.
- Now we can override the
_read()
method, which is responsible for producing data (emojis in this case). In this method, we just 'produce' data using thepush
method of the Readable stream. When the moon phases flow is over, we restart it setting the index again to 0
Okay, so now that we have our MoonPhasesStream
, it's time to define the consumer of these emojis streams.
Let's define our Writable stream using the Node.js HTTP server:
const server = http.createServer((req, res) => {
const moonPhasesStream = new MoonPhasesStream();
pipeline(moonPhasesStream, res, (error) => console.log(error));
});
server.listen(3000);
Very simple, no?
Let's explain:
- In Node.js, the
request
object is a Readable stream so that we can consume data from it. But the object we are interested in is theresponse
which is the Writable stream on which the emoji will flow. So, we won't need to build a custom Writable stream as we did for the Readable one. - There are several ways to combine streams, and the most used are
pipe()
andpipeline()
. The difference is that withpipe()
if there's an error, we need to end and destroy each stream (to avoid memory leaks) manually:
readableFileStream
.on('error', (err) => readableFileStream.destroy())
.pipe(decompressStream)
.on('error', (err) => decompressStream.destroy())
.pipe(decryptStream)
.on('error', (err) => decryptStream.destroy())
pipeline()
solve this problem by taking care of the stream destruction:
pipeline(
readableFileStream,
decompressStream,
decryptStream,
(err) => {
if (err) {
console.error(err)
} else {
console.log('Process completed')
}
}
)
That's why we also use pipeline()
to compose Readable and Writable streams:
const moonPhasesStream = new MoonPhasesStream();
pipeline(moonPhasesStream, res, (error) => console.log(error));a
Now we're ready to try our server. Just start it:
node index.js
and then open a terminal and run:
curl localhost:3000
You will see an infinite moon phases cycle.
If you want to show just one moon phases cycle, it's possible to stop the stream by pushing a null
value.
Conclusion
In this first blog post, we created a custom Readable stream to produce data and used the HTTP server response, a Writable stream, to consume them.
Here is my GitHub repository: https://github.com/fabrilallo/moon-phases-stream
Stay tuned for the next blog posts. ๐
Follow me on Twitter if you enjoyed this blog post @fabri.lallo
Top comments (0)