The Iterators
For anyone having been in programming a couple of years, chances are you've run into an interview question or an assignment like iterator for binary tree.
There are variations like for the "pre-order", "post-order" or "in-order" traversal; or sometimes you got a n-ary tree instead of a bnary tree.
A reason this kind of questions are used in interviews is because they require some meticulous bookkeeping, and there are loads of edge cases to bite you if you aren't careful.
Fine, it's 2024 and we have Stream
now. How about having some fun and creating an infinite stream to generate the Fibonacci sequence?
The one thing in common? They are all boringly painful to write and read. Playing with them for the interview is cool; but having to read them in real life is no fun.
Recursion is easy
Imagine you are writing in-order traversal the most naive way:
void traverseInOrder(Tree tree) {
if (tree.left != null) traverseInOrder(tree.left);
System.out.println(tree.value);
if (tree.right != null) traverseInOrder(tree.right);
}
void fibonacci(int a, int b) {
System.out.println(a);
fibonacci(b, a + b);
}
fibonacci(0, 1);
Except the hardcoding of System.out.println()
, and never mind the stack overflow caused by the infinite recursion, at least it's easy, right?
Why can't we have some meat - scratch that - why can't we keep the simplicity and just fix the hardcoding and the stack overflow?
Design the API
This is my favorite part: to dream about what I would really really want, if everything just magically works.
Let's see... I don't want the hardcoding, so maybe this?
void traverseInOrder(Tree tree) {
if (tree.left != null) traverseInOrder(tree.left);
generate(tree.value);
if (tree.right != null) traverseInOrder(tree.right);
}
void fibonacci(int a, int b) {
generate(a);
fibonacci(b, a + b);
}
fibonacci(0, 1);
The imagined generate()
call will put the elements in the final stream.
It doesn't solve the stack overflow though.
The very nature of an infinite stream means that it has to be lazy. The caller could decide to use .limit(50)
to only print the first 50 numbers and the recursive call shouldn't progress beyond the first 50 numbers.
So I need something that delays the recursion call. The idiomatic way in Java to model laziness is to wrap it in a callback interface. So let's create one:
interface Continuation {
void evaluate();
}
And then I need a method similar to the generate()
call but accepts a Continuation
. The syntax should look like:
void traverseInOrder(Tree tree) {
if (tree.left != null) {
yield(() -> traverseInOrder(tree.left));
}
generate(tree.value);
if (tree.right != null) {
yield(() -> traverseInOrder(tree.right));
}
}
I stole the yield
keyword from other languages like C# with native generator support (it means to yield control to the runtime and also yield the evaluation result into the stream). Recent Java versions have made it a reserved word so you'll likely need to call it using this.yield()
instead.
By wrapping the recursive call in a lambda passed to yield()
, I should get the effect of lazy evaluation. The implementation's job will be to ensure the right order between the directly generated elements and the lazily generated ones.
In summary, the draft API we have come up with so far:
class Iteration<T> {
void generate(T element);
void yield(Continuation continuation);
/** Turns the Iteration into a stream */
Stream<T> iterate();
}
In case it wasn't obvious, we need the iterate()
method to package it all up as a lazy stream.
The final syntax we want:
class InOrderTraversal<T> extends Iteration<T> {
InOrderTraversal<T> traverseInOrder(Tree tree) {
if (tree.left != null) {
this.yield(() -> traverseInOrder(tree.left));
}
generate(tree.value);
if (tree.right != null) {
this.yield(() -> traverseInOrder(tree.right));
}
return this;
}
}
Stream<T> stream =
new InOrderTraversal<T>().traverseInOrder(tree).iterate();
class Fibonacci extends Iteration<Integer> {
Fibonacci startFrom(int a, int b) {
generate(a);
this.yield(() -> startFrom(b, a + b));
return this;
}
}
Stream<Integer> stream =
new Fibonacci().startFrom(0, 1).iterate();
Making it
It's nice to have a dream once in a while. We'll be able to use such API to turn many recursive algorithms trivially into lazy streams.
But aside from polishing up an API surface I like, there is nothing in concrete. How do I make it actually work?
First of all, if the thing only needs to handle generate()
, it'd be a pointlessly trivial wrapper around a Queue<T>
.
But we need to handle yield()
with its lazy evaluation. Imagine we are effectively running this sequence:
generate(1);
generate(2);
yield(() -> { // line 3
generate(3);
generate(4);
});
generate(5); // line 7
At line 3, can we enqueue the Continuation into the same queue before moving on to line 7?
If I do so, after line 7, the queue will look like [1, 2, Continuation, 5]
.
Now if we call iterate()
and start to consume elements from the stream, 1
and 2
will be popped out, and then the Continuation
object, with the number 5
still remaining in the queue.
Once the Continuation
object is consumed, it needs to be evaluated, which will in turn generate 3
and 4
. The question is where to put the two numbers?
We can't keep enqueueing them after the number 5
because it'll be out of order; we can't treat the queue as a stack and push them in FILO order because then we'll get [4, 3, 5]
.
Stack or Queue?
There are several ways to go about this.
One possibility is to create a stack of queues, where each time a Continuation
is to be evaluated, push a new queue onto the stack. The stream will always consume elements from the top queue of the stack.
The downside is that you might end up creating and discarding many instances of ArrayDeque
, which can be wasteful.
With some micro-optimization in mind, another approach is to use the two-stacks-as-a-queue trick.
The are two stacks: the inbox
for writing and outbox
for reading. When either generate()
or yield()
is called, we push into the
inbox
stack; when the stream tries to consume, it flushes everything out of inbox
into the outbox
and then consumes one-by-one from the outbox
.
To put it in context, upon seeing [Continuation, 5]
in the outbox
, the Continuation
is evaluated, which puts [4, 3]
in the inbox
stack.
On the other side, the stream tries to consume. It pops and pushes [4, 3]
onto the outbox
stack, resulting in [3, 4, 5]
.
Implmentation-wise, we get to allocate no more than two ArrayDeque
s.
public class Iteration<T> {
private final Deque<Object> outbox = new ArrayDeque<>();
private final Deque<Object> inbox = new ArrayDeque<>(8);
public final <T> void generate(T element) {
if (element instanceof Continuation) {
throw new IllegalArgumentException("Do not stream Continuation objects");
}
inbox.push(element);
}
public final void yield(Continuation continuation) {
inbox.push(continuation);
}
private T consumeNextOrNull() {
for (; ; ) {
Object top = poll();
if (top instanceof Continuation) {
((Continuation) top).evaluate();
} else {
return (T) top;
}
}
}
private Object poll() {
Object top = inbox.poll();
if (top == null) { // nothing in inbox
return outbox.poll();
}
// flush inbox -> outbox
for (Object second = inbox.poll();
second != null;
second = inbox.poll()) {
outbox.push(top);
top = second;
}
return top;
}
}
The poll()
private method does the "flush inbox into outbox" logic mentioned above.
The consumeNextOrNull()
method consumes the next element from the two-stack-queue, and evaluates Continuation
when it sees one.
Wrap it all up
If you are with me so far, all we are missing is the iterate()
method that wraps it all up as a Stream
.
I'll cheat by just using Mug's whileNotNull()
convenience method. But it's not hard to create your own by implementing a Spliterator
.
public Stream<T> iterate() {
return MoreStreams.whileNotNull(this::consumeNextOrNull);
}
And with that, our little generic recursive stream generator is complete.
Use it for real
Before we call it the day, let's see if we can use it to solve a more realistic problem.
Say, you are calling a ListAssets
API that supports pagination. Request and response definitions are:
ListAssetsRequest {
string userId;
int page_size;
string page_token; // start from this page
}
ListAssetsResponse {
List<Asset> assets;
string next_page_token; // empty if no more
}
If we were to naively fetch all pages and print them, it'll be as simple as sending request over and over again until the response.next_page_token
is empty:
void showAllAssets(String userId) {
ListAssetsRequest.Builder request =
ListAssetsRequest.newBuilder().setUserId(userId);
for (; ;) {
var response = assetsApi.listAssets(request.build());
for (Asset asset : response.getAssets()) {
System.out.println(asset);
}
if (response.getNextPageToken().isEmpty()) {
return; // no more pages
}
request.setPageToken(response.getNextPageToken());
}
}
But we can do better! Let's wrap it up as a lazy Stream<Asset>
to give callers more flexibility. For example they can consume any number of assets and stop when needed without over-fetching pages unnecessarily.
Stream<Asset> listAssets(String userId) {
class Pagination extends Iteration<ListAssetsResponse> {
Pagination startFrom(String pageToken) {
var response = assetsApi.listAssets(
ListAssetsRequest.newBuilder()
.setUserId(userId)
.setPageSize(100)
.setPageToken(pageToken)
.build());
generate(response);
if (!response.getNextPageToken().isEmpty()) {
this.yield(() -> startFrom(response.getNextPageToken()));
}
return this;
}
}
return new Pagination().startFrom("").iterate()
.flatMap(response -> response.getAssets().stream());
}
I trust you can read it alright, my friend. :)
Top comments (0)