This miniseries aims at teaching how to use the ZIO at a very relaxed pace. Today we are going to build a word count app.
This means that given a text we want to know how many times each word appears.
Sneak Peek
Suppose you have the text
How much wood would a woodpecker peck if a woodpecker would peck wood?
The program should output
(a,2), (peck,2), (woodpecker,2), (would,2), (if,1), (wood,1), (much,1), (How,1), (wood?,1)
Setup
Like last time we are going to use Mill instead of SBT. Why? because it’s easier and takes 5 seconds.
- Create a folder
zio-word-counter
- Inside it create a
build.sc
file with the following content
import mill._, scalalib._
object WordCounter extends ScalaModule {
def scalaVersion = "2.13.0"
val zioVersion = "1.0.0-RC17"
def ivyDeps = Agg(
ivy"dev.zio::zio:$zioVersion",
ivy"dev.zio::zio-streams:$zioVersion"
)
}
And we are done. We will need zio-streams
as well since we are going to treat our text like a stream of words.
A simple stream of words
Create a folder named WordCounter
(it has to match the object name we defined earlier) and a src
folder nested inside. From there we can create a file named Main.scala
with the following empty canvas.
// WordCounter/src/Main.scala
import zio._
import zio.console._
import zio.stream._
object Main extends App {
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
putStrLn("hello world").map(_ => 0)
}
The first thing that we want is to have a decent text source. A string variable will do.
val someText = "How much wood would a woodpecker peck if a woodpecker would peck wood?"
And the run function can become like this
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
chunk <- Stream(someText).run(ZSink.splitOn(" "))
_ <- putStrLn(chunk.mkString(", "))
} yield ()).map(_ => 0)
You can run this with the command mill --watch WordCounter
(the watch will recompile and run when you save the file).
You should see the output
How, much, wood, would, a, woodpecker, peck, if, a, woodpecker, would, peck
The last word wood?
is not printed because it doesn’t end with a space. You can add a space at the end to include it if you want; I will add one before the question mark.
Counting the words
We have our stream of words we need now to do two simple operations. The first operation is to pair words that are the same, the second is to count them.
The plan is to create one stream per word, since new streams are created and tracked by their key, setting their key to the word itself will work.
Counting the words will be done by collecting all the words in a list per stream. At the end we can just count the length of the list that will tell the number of words for that particular word.
streamOfWords.groupByKey(identity) { case (word, stream) =>
stream.aggregate(ZSink.collectAll[String]).map(list => (list.size, word))
}
The identity
function tells the groupByKey
to use the word itself as the key. Then, for every stream we create a tuple of type(Int, String)
that contains the count of the word and the word itself.
The complete run
function looks like this.
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
chunk <- Stream(someText).run(ZSink.splitOn(" "))
counts <- Stream.fromChunk(chunk).groupByKey(identity) { case (word, stream) =>
stream.aggregate(ZSink.collectAll[String]).map(list => (list.size, word))
}.runCollect
_ <- putStrLn(counts.mkString(", "))
} yield ()).map(_ => 0)
The only differnte being that we created a stream from the chunk of words and printed the count instead.
Running this will print
(2,would), (2,woodpecker), (2,peck), (1,How), (2,a), (2,wood), (1,much), (1,if)
Simple to Complex
Admittedly we didn’t accomplish a lot. Just counting a few words is not really impressive. How about a word count of a sizeable file like the Divina Commedia? Let’s stash it in our tmp
folder for later.
wget http://www.gutenberg.org/files/1012/1012-0.txt --output-document /tmp/divina.txt
First we need to read the file. To do it correctly we will take the name of the file from the command line like so
args.headOption
.map(filePath => IO(Paths.get(filePath)))
.getOrElse(IO.fail("No file specified"))
This means that if the argument is not passed the entire program will fail with an error that is represented by the string No file specified
. We are not printing the error anywhere now. To do it we need to adapt the code after the yield.
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
file <- args.headOption
.map(filePath => IO(Paths.get(filePath)))
.getOrElse(IO.fail("No file specified"))
// ... other code ...
} yield ())
.onError(failure => putStrLn(failure.toString))
.fold(_ => 1, _ => 0)
Note that we turned the map
into a fold
that will return 1
to the operating system in case of failure.
After getting the file we create a new InputStream
which is the preferred way to read binary files.
inputStream <- IO(Files.newInputStream(file))
But this is a text file! 😱
We will have to turn the pesky bytes into utf8 character.
counts <- ZStream
.fromInputStream(inputStream)
.chunks
.aggregate(ZSink.utf8DecodeChunk)
// ...
Notes on the notes on the code
Warning From version 1.0.0-RC19
on all streams work by chunk so the above code and below notes are outdated. Just delete chunks
and use a ZTransducer
insted of a ZSink
and it should just work.
Notes on the code
Now I need your attention for a second as it’s not completely clear what is happening in the above code. The ZStream.fromInputStream
returns a StreamEffectChunk
type. This means that it’s a stream (the Stream
part, duh! 🤪) that it emits thunks (the Effect
part) and works with chunks of bytes instead of single bytes. By default the size of the chunks is 4096 but it can be set manually.
The reason it emit thunks (lazily evaluated functions) is because it will read the next chunk of bytes only when the downstream is requesting them. This allows to read files that are bigger than the computer memory by doing it chunk by chunk.
Finally, the reason it uses chunks instead of single bytes is because of speed. Reading 4096 bytes all at once is order of magnitudes faster than reading them one by one. (Why 4096? Usually the file systems can’t access less than that, so if you set it to 1 for example, you are really accessing 4096 bytes and throwing away 4095 of them).
Next we have a choice to make, we can either work with the chunks or with the single bytes. By writing .chunks
we cast to StreamEffect
with Chunk[Byte]
as emitted type. The alternative is .flattenChunks
which casts as well to StreamEffect
but now the emitted type is Byte
.
We go with .chunks
both for performance reasons and because there are some built-in functions that only work on chunks. The first one is aggregation with ZSink.utf8DecodeChunk
.
Counting Words for serial now!
So we now are at a point in which we can generate String
s from the chunks of bytes. The next step is to separate the words. To do it we write
counts <- ZStream
.fromInputStream(inputStream)
.chunks
.aggregate(ZSink.utf8DecodeChunk)
.aggregate(ZSink.splitOn(" "))
.flatMap(ZStream.fromChunk)
.filter(_.matches("[\\w]+"))
// ...
The splitOn
unfortunately returns chunks so we “unchunk” them with .flatMap(ZStream.fromChunk)
. What this does is concatenating all the chunks in strict order and emitting the strings inside them one by one.
Next we filter the strings to make sure they are actual words. The [\w]+
regex basically means “letters of the alphabet and nothing else”.
The last thing we need to do is count the words! By recycling the previous code we get the following program
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
file <- args.headOption
.map(filePath => IO(Paths.get(filePath)))
.getOrElse(IO.fail("No file specified"))
inputStream <- IO(Files.newInputStream(file))
counts <- ZStream
.fromInputStream(inputStream)
.chunks
.aggregate(ZSink.utf8DecodeChunk)
.aggregate(ZSink.splitOn(" "))
.flatMap(ZStream.fromChunk)
.filter(_.matches("[\\w]+"))
.groupByKey(identity) {
case (word, stream) =>
stream
.aggregate(ZSink.collectAll[String])
.map(list => (list.size, word))
}
.runCollect
_ <- putStrLn(counts.sortBy(_._1).reverse.mkString(", "))
} yield ())
.onError(failure => putStrLn(failure.toString))
.fold(_ => 1, _ => 0)
The only thing I did is sorting the results by occurrence.
Launching this (mill --watch WordCounter /tmp/divina.txt
) will take about a minute ☕️ and will print something like:
(3616,e), (3535,che), (2260,la), (1859,a), (1826,di), (1339,non), (1319,per), (1123,in), (1042,si), (777,le), (752,li), (736,mi), (652,il), (623,con), ...
Yes it’s a lot of work but a minute seems really long. I couldn’t figure out why it was so slow but I suspect the groupByKey
is optimized to work with a few of streams, not thousands.
A faster alternative
A small modification will yield a much faster result (⚡️about 50x faster⚡️)
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
file <- args.headOption
.map(filePath => IO(Paths.get(filePath)))
.getOrElse(IO.fail("No file specified"))
inputStream <- IO(Files.newInputStream(file))
counts <- ZStream
.fromInputStream(inputStream)
.chunks
.aggregate(ZSink.utf8DecodeChunk)
.aggregate(ZSink.splitOn(" "))
.flatMap(ZStream.fromChunk)
.filter(_.matches("[\\w]+"))
.fold(HashMap.empty[String, Int])(
(map, word) =>
map.updatedWith(word)(
maybeCounter => maybeCounter.map(_ + 1).orElse(Some(1))
)
)
.map(_.toList)
_ <- putStrLn(counts.sortBy(_._2).reverse.mkString(", "))
} yield ())
.onError(failure => putStrLn(failure.toString))
.fold(_ => 1, _ => 0)
I swapped the groupByKey
with a fold and sorted by occurrence (which is now _2
).
Maybe with older versions of scala it would be even faster with mutable hash maps, but that would not be functional and thus not cool nor hip. I’m not even gonna try.
Conclusion
This was a short journey in very simple streaming applications. The zio-stream
code is still a bit clunky in my opinion but it clearly shows a lot of potential and will surely be more powerful than both fs2
and Akka Streams. In the next tutorial we are going to do some concurrent functional programming.
Top comments (0)