DEV Community

Enrique Zamudio
Enrique Zamudio

Posted on

Streaming large queries in Java

I've been using the JdbcTemplate class since version 1.0, and it's evolved nicely, but I was hoping that for version 5 it would include some streaming capabilities for queries with large results. Alas, that didn't happen.

Still, sometimes I need to perform queries that return millions of rows, and I can't use the JdbcTemplate methods that return lists for it. A RowCallbackHandler is perfect for it, but it would so much nicer to just receive a Stream, wouldn't it? Especially if you have custom RowMappers...

So, I decided to write my own Stream generator to use with a JdbcTemplate. In the process, I ended up creating a more generic Stream generator, which I think is good, and so I want to share it with anyone who needs something similar. I don't think it's enough material for a library, though, so I decided to write a post about it instead.

The challenge

First of all, we need to consider that streams are lazy, and when you get a stream and define the operations to be done on it, nothing is happening yet, until you realize a final operation, which needs to actually traverse the elements and apply the operations on it. There are operations which go through the entire stream (such as count, or collecting the elements into another collections), and there are short-circuit operations (such as determining if any element passes some filter).

So we want to get a stream, and define operations on it, and nothing happens, up until the moment when the stream needs to be traverse, then the query needs to be run (which implies having an open connection to the database). If something bad happens, the query needs to stop (and JdbcTemplate will take care of cleaning up the connection and other resources).

The only way I found I could make this work is by using two threads: a producer thread in which the query is run and the rows are somehow fed to the stream, and a consumer thread which is the reader of the stream.

We will need a buffer in which the producer will store elements and from which the consumer will take elements from. A LinkedBlockingQueue seems perfect for this.

So, without further ado, here it is:

    public static <T> Stream<T> streamForQuery(int bufferSize, T endOfStreamMarker,
                                               Consumer<Consumer<T>> query) {
        final LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(bufferSize);
        //This is the consumer that is usually passed to queries;
        //it will receive each item from the query and put it in the queue
        Consumer<T> filler = t -> {
            try {
                //Try to add to the queue, waiting up to 1 second
                //Honestly if after 1 second the queue is still full, either the stream consumer
                //needs some serious optimization or, more likely, a short-circuit terminal
                //operation was performed on the stream.
                if (!queue.offer(t, 1, TimeUnit.SECONDS)) {
                    //If the queue is full after 1 second, time out.
                    //Throw an exception to stop the producer queue.
                    log.error("Timeoud waiting to feed elements to stream");
                    throw new BufferOverflowException();
                }
            } catch (InterruptedException ex) {
                System.err.println("Interrupted trying to add item to stream");
                ex.printStackTrace();
            }
        };
        //For the stream that we return, we use a Spliterator.
        return StreamSupport.stream(() -> new Spliterators.AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED) {
            //We need to know if the producer thread has been started
            private boolean started = false;
            //If there's an exception in the producer, keep it here
            private volatile Throwable boom;
            /** This method is called once, before advancing to the first element.
             * It will start the producer thread, which runs the query, passing it our
             * queue filler.
             */
            private void startProducer() {
                //Get the consumer thread
                Thread interruptMe = Thread.currentThread();
                //First time this is called it will run the query in a separate thread
                //This is the producer thread
                new Thread(() -> {
                    try {
                        //Run the query, with our special consumer
                        query.accept(filler);
                    } catch (BufferOverflowException ignore) {
                        //The filler threw this, means the queue is not being consumed fast enough
                        //(or, more likely, not at all)
                    } catch (Throwable thr) {
                        //Something bad happened, store the exception and interrupt the reader
                        boom = thr;
                        interruptMe.interrupt();
                    }
                }).start();
                started = true;
            }
            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                if (!started) {
                    startProducer();
                }
                try {
                    //Take an item from the queue and if it's not the end of stream maker, pass it
                    //to the action consumer.
                    T t = queue.take();
                    if (t != endOfStreamMarker) {
                        action.accept(t);
                        return true;
                    }
                } catch (InterruptedException ex) {
                    if (boom == null) {
                        System.err.println("Interrupted reading from stream");
                        ex.printStackTrace();
                    } else {
                        //Throw the exception from the producer on the consumer side
                        throw new RuntimeException(boom);
                    }
                }
                return false;
            }
        }, Spliterator.IMMUTABLE, false);
    }

And this is how you use it, with a JdbcTemplate:

final MyRow marker = new MyRow();
Stream<MyRow> stream = streamForQuery(100, marker, callback -> {
    //Pass a RowCallbackHandler that passes a MyRow to the callback
    jdbcTemplate.query("SELECT * FROM really_big_table_with_millions_of_rows",
                       rs -> { callback.accept(myRowMapper.mapRow(rs, 0)); }
    );
    //Pass the marker to the callback, to signal end of stream
    callback.accept(marker);
});

At this point, the query hasn't been performed. You can do stuff such as:

stream = stream.filter(row -> row.isPretty());

And still nothing happens. When you do something like this though:

Optional<MyRow> row = stream.skip(100_000).limit(1000).findAny();

Then the query is executed, the first hundred thousand rows will be read (and skipped), and each row will be passed through the filter, until one is pretty or a thousand rows have been read.

With great power...

Please, please, PLEASE don't use this as a substitute for a good WHERE clause and properly indexing your tables. I've used this thing mainly to generate reports, concatenating streams of disjoint types by mapping the elements to a common type for further processing (basically, making up for the lack of union types in Java).

Having said that, it is pretty cool to be able to read rows from a database in a streaming fashion.
I guess this could be integrated into Spring's JdbcTemplate, and/or jOOQ...

Top comments (5)

Collapse
 
mcgowanb profile image
Brian McGowan

I found that the LinkedBlockingQueue fills up to whatever the limit is set to, say 100 in this example and then it throws a buffer overflow exception which is gracefully swallowed. Be nice if you could pop off the queue once you've consumed an element from the stream.

Collapse
 
chochos profile image
Enrique Zamudio

Not sure what you mean by poppin off the queue once the element's been consumed... queue.take() is called inside tryAdvance, which removes the first element from the queue and passes it to the stream's consumer...

Collapse
 
jayaerrabelli profile image
jayaerrabelli

I need full working code, it would be of great help to me! Please...

Collapse
 
jayaerrabelli profile image
jayaerrabelli • Edited

This is exactly what I was looking for. Thank you so much. What does MyRow and MyRow Wrapper have ? Where can I see full code ?

Collapse
 
chochos profile image
Enrique Zamudio

MyRow is the class of the objects you want to stream. It can be whatever. There's no MyRow Wrapper, but there's a RowMapper implementation that produces MyRow instances from the ResultSet. That's basic JdbcTemplate stuff that's out of the scope of this article.