In the last 2 blogs we learned about the 2 modules:
OS
FS
Today we will learn new module which can be used with the fs
module - streams
.
What is streams?
Let me ask question:
In fs
we saw that fs module is perfect for handling small size files. But what is small size?
Answer: the small size means a file which your system can handle in one go and store in the system RAM.
Now, if you have the data which is large and more than your system's RAM then how we are going to handle it?
We need to use 'streams'. Streams are streams of data mainly the arrays of binary data that can be read, write, pause, resume. Instead of caching the data and showing cached data, by using the streams we can show the real time data to the users and applications.
All streams are the instance of the eventEmitter
. They emit events and we can use these events to read and write data.
Type of streams:
1 . writable:
2 . readable
3 . duplex
4 . transform
Where to use streams?
Streams are used in handling the large data or live data. Eg: live streaming.
Relationship between - fs
, and streams
There is a string relationship between fs and streams. One can use both in combination to accomplish the tasks. Eg: one can open a writable
stream, write data in a file using fs
module, and then start a readable
stream to read the data.
Writeable
writeable streams is an abstraction of a destination where the data can be written. All writeable streams has write
and end
method.
All write streams internally get buffered. We can use cork()
to make sure they are getting buffered until we call uncork()
or end()
, this will flush the buffered data.destory()
, we can pass an optional parameter error message that will cause the writeable stream to emit the error event, this method will also cause the writeable stream to emit the close event.
Methods
write
end
cork
uncork
destroy
Important events
drain
finish
pipe/unpipe
error
close
const writeStream = fs.createWriteStream(`streamData.txt`);
writeStream.write('data', 'utf-8');
writeStream.end();
writeStream.on('finish', () => {
console.log('success!!')
})
writeStream.on('error', (err) => {
console.log(err)
})
Readable
readable streams is an abstraction for a data source from where the data can be read.
A readable stream can be in 2 states:
flow
pause
Default state is pause. We can resume a pause state (or move to a flow state) by using resume()
method or we can pause a stream by using pause()
.
The stream in flow mode can be made available to application by using eventEmitter interface on
. The stream in the pause mode can be only read by calling read()
method.
To check the state of the stream we can use:
readable.readableFlowing
The possible mode:
null: there is no data to consume
true: indicates the stream is in flow state. When we use
pipe()
ordata()
false: indicates the stream is in pause state. When we use
unpipe()
Important events
data
end
error
close
readable
const readStream = fs.createReadStream(`${FILE_NAME}`);
readStream.setEncoding('UTF8');
readStream.on('stream', (chunk) => {
data += chunk;
})
readStream.on('end',function() {
console.log(data);
});
readStream.on('error', function(err) {
console.log(err.stack);
});
Duplex
Duplex is both read and write streams. It has 2 internal buffers, one for the readable and one for the writeable. It enables to read and write data concurrently.
Example: chat application, socket server
net.createServer(socket => {
socket.pipe(socket)
}).listen(8001);
Transform
It is duplex but only addition is the data can be transform or modify as it is written or read. It is a powerful way of doing advance work. Eg: you can read the data and convert the data before writing to uppercase, or convert to different language, or zip and write to a destination.
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + '.gz'));
pipe() and pipeline()
pipe
is useful and important feature. Through pipe we can connect the streams. So, we can start a readableStream and while stream is getting read same time we can write the stream to a file. This can be done by using pipe
. Pipe reduce the internal buffering of the data by readable streams. Readable streams are call pipeable streams.
However, do not use pipe() in production. As pipe can introduce memory leaks.
If one of the piped streams is closed or throws an error, pipe() will not automatically destroy the connected streams
pipe() does not automatically forward errors across streams to be handled in one place.
Hence, use pipeline() to cater the above issues. pipeline() accepts a callback function as the last parameter. Any forwarded errors from any of the piped streams will call the callback, so it's easier to handle errors for all streams in one place.
Think like what pipe does in real world they connect and let the flow of water. Similarly we are doing the same here.
For example, it handles errors, end-of-files, and the cases when one stream is slower or faster than the other.
const readerPipe = fs.createReadStream(`${FILE_NAME}`);
const writerPipe = fs.createWriteStream(`output.txt`);
readerPipe.pipeline(writerPipe)
readerPipe.on('stream', (chunk) => {
data += chunk;
})
readerPipe.on('finish', () => {
console.log('success!!');
})
console.log('success')
Backpressuring
Let's take an example, assume you have are reading a big file and writing to a another file using streams and pipe. There could be a situation where the flow of the data is too fast and consume of the data is slow. In this situation, the data can be keep buffering and lead to:
High memory usage
Poor garbage collector performance
May cause application to crash.
So, what should we can do in such situation?
We read that the both stream - readable and writeable stream has internal buffer but the size of the buffer is limited to the available memory of the system. So, if we want to set a indicator to stop the flow of the data and resume later, how we can do it?
We can set a highWatermark
which will put a threshold on the data that will be streamed and once the threshold is crossed we can pause' stream and
resume' it.
PS: this is just a thresholder indicator.
const readableStream = fs.createReadStream('./logs.txt', {
highWaterMark: 10
});
readableStream.on('readable', () => {
process.stdout.write(`[${readableStream.read()}]`);
});
readableStream.on('end', () => {
console.log('DONE');
});
Important
Use streams where the data is large.
Use pipe to have the write and read in parallel.
readable streams are pipeable streams.
Do not use
pipe()
in production. As it could introduce memory leaks. Usepipeline()
We have 2 states of the readable stream: flow and pause. By default the stream is in pause.
highWatermark
puts threshold to control the flow of the data.Streams would be in array of binary data, use JSON.stringify() to convert to string.
Happy Learning!!
Top comments (0)