java.util.stream with ResultSet

JavaJdbcLambdaJava StreamJooq

Java Problem Overview


I have few tables with big amount of data (about 100 million records). So I can't store this data in memory but I would like to stream this result set using java.util.stream class and pass this stream to another class. I read about Stream.of and Stream.Builder operators but they are buffered streams in memory. So is there any way to resolve this question? Thanks in advance.

UPDATE #1

Okay I googled and found jooq library. I'm not sure but looks like it could be applicable to my test case. To summarize I have few tables with big amount of data. I would like to stream my resultset and transfer this stream to another method. Something like this:

// why return Stream<String>? Because my result set has String type
private Stream<Record> writeTableToStream(DataSource dataSource, String table) {

    Stream<Record> record = null;
    try (Connection connection = dataSource.getConnection()) {
        String sql = "select * from " + table;

        try (PreparedStatement pSt = connection.prepareStatement(sql)) {
            connection.setAutoCommit(false);
            pSt.setFetchSize(5000);
            ResultSet resultSet = pSt.executeQuery();
            //
            record = DSL.using(connection)
                    .fetch(resultSet).stream();
        }
    } catch (SQLException sqlEx) {
        logger.error(sqlEx);
    }
    
    return record;
}

Could please someone advise, am I on correct way? Thanks.

UPDATE #2

I made some experiment on jooq and could say now that above decision is not suitable for me. This code record = DSL.using(connection).fetch(resultSet).stream(); takes too much time

Java Solutions


Solution 1 - Java

The first thing you have to understand is that code like

try (Connection connection = dataSource.getConnection()) {
    …
    try (PreparedStatement pSt = connection.prepareStatement(sql)) {
        …
        return stream;
    }
}

does not work as by the time you leave the try blocks, the resources are closed while the processing of the Stream hasn’t even started.

The resource management construct “try with resources” works for resources used within a block scope inside a method but you are creating a factory method returning a resource. Therefore you have to ensure that the closing of the returned stream will close the resources and the caller is responsible for closing the Stream.


Further, you need a function which produces an item out of a single line from the ResultSet. Supposing, you have a method like

Record createRecord(ResultSet rs) {
    …
}

you may create a Stream<Record> basically like

Stream<Record> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
    Long.MAX_VALUE,Spliterator.ORDERED) {
        @Override
        public boolean tryAdvance(Consumer<? super Record> action) {
            if(!resultSet.next()) return false;
            action.accept(createRecord(resultSet));
            return true;
        }
    }, false);

But to do it correctly you have to incorporate the exception handling and closing of resources. You can use Stream.onClose to register an action that will be performed when the Stream gets closed, but it has to be a Runnable which can not throw checked exceptions. Similarly the tryAdvance method is not allowed to throw checked exceptions. And since we can’t simply nest try(…) blocks here, the program logic of suppression exceptions thrown in close, when there is already a pending exception, doesn’t come for free.

To help us here, we introduce a new type which can wrap closing operations which may throw checked exceptions and deliver them wrapped in an unchecked exception. By implementing AutoCloseable itself, it can utilize the try(…) construct to chain close operations safely:

interface UncheckedCloseable extends Runnable, AutoCloseable {
    default void run() {
        try { close(); } catch(Exception ex) { throw new RuntimeException(ex); }
    }
    static UncheckedCloseable wrap(AutoCloseable c) {
        return c::close;
    }
    default UncheckedCloseable nest(AutoCloseable c) {
        return ()->{ try(UncheckedCloseable c1=this) { c.close(); } };
    }
}

With this, the entire operation becomes:

private Stream<Record> tableAsStream(DataSource dataSource, String table)
    throws SQLException {

    UncheckedCloseable close=null;
    try {
        Connection connection = dataSource.getConnection();
        close=UncheckedCloseable.wrap(connection);
        String sql = "select * from " + table;
        PreparedStatement pSt = connection.prepareStatement(sql);
        close=close.nest(pSt);
        connection.setAutoCommit(false);
        pSt.setFetchSize(5000);
        ResultSet resultSet = pSt.executeQuery();
        close=close.nest(resultSet);
        return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
            Long.MAX_VALUE,Spliterator.ORDERED) {
            @Override
            public boolean tryAdvance(Consumer<? super Record> action) {
                try {
                    if(!resultSet.next()) return false;
                    action.accept(createRecord(resultSet));
                    return true;
                } catch(SQLException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }, false).onClose(close);
    } catch(SQLException sqlEx) {
        if(close!=null)
            try { close.close(); } catch(Exception ex) { sqlEx.addSuppressed(ex); }
        throw sqlEx;
    }
}

This method wraps the necessary close operation for all resources, Connection, Statement and ResultSet within one instance of the utility class described above. If an exception happens during the initialization, the close operation is performed immediately and the exception is delivered to the caller. If the stream construction succeeds, the close operation is registered via onClose.

Therefore the caller has to ensure proper closing like

try(Stream<Record> s=tableAsStream(dataSource, table)) {
    // stream operation
}

Note that also the delivery of an SQLException via RuntimeException has been added to the tryAdvance method. Therefore you may now add throws SQLException to the createRecord method without problems.

Solution 2 - Java

jOOQ

I'm going to answer the jOOQ part of your question. As of jOOQ 3.8, there have now been quite a few additional features related to combining jOOQ with Stream. Other usages are also documented on this jOOQ page.

Your suggested usage:

You tried this:

Stream<Record> stream = DSL.using(connection).fetch(resultSet).stream();

Indeed, this doesn't work well for large result sets because fetch(ResultSet) fetches the entire result set into memory and then calls Collection.stream() on it.

Better (lazy) usage:

Instead, you could write this:

try (Stream<Record> stream = DSL.using(connection).fetchStream(resultSet)) {
    ...
}

... which is essentially convenience for this:

try (Cursor<Record> cursor = DSL.using(connection).fetchLazy(resultSet)) {
    Stream<Record> stream = cursor.stream();
    ...
}

See also DSLContext.fetchStream(ResultSet)

Of course, you could also let jOOQ execute your SQL string, rather than wrestling with JDBC:

try (Stream<Record> stream = 
     DSL.using(dataSource)
        .resultQuery("select * from {0}", DSL.name(table)) // Prevent SQL injection
        .fetchSize(5000)
        .fetchStream()) {
    ...
}
The dreaded SELECT *

As was criticised in the comments, their jOOQ usage seemed slow because of how jOOQ eagerly fetches LOB data into memory despite using fetchLazy(). The word "lazy" corresponds to fetching records lazily (one by one), not fetching column data lazily. A record is completely fetched in one go, assuming you actually want to project the entire row.

If you don't need some heavy rows, don't project them! SELECT * is almost always a bad idea in SQL. Drawbacks:

  • It causes a lot more I/O and memory overhead in the database server, the network, and the client.
  • It prevents covering index usage
  • It prevents join elimination transformations

More info in this blog post here.

On try-with-resources usage

Do note that a Stream produced by jOOQ is "resourceful", i.e. it contains a reference to an open ResultSet (and PreparedStatement). So, if you really want to return that stream outside of your method, make sure it is closed properly!

Solution 3 - Java

I'm not aware of any well-known library that will do it for you.

That said, this article shows how to wrap the resultset with an Iterator (ResultSetIterator) and pass it as the first parameter to Spliterators.spliteratorUnknownSize() in order to create a Spliterator.

The Spliterator can then be used by StreamSupport in order to create a Stream on top of it.

Their suggested implementation of ResultSetIterator class:

public class ResultSetIterator implements Iterator {

    private ResultSet rs;
    private PreparedStatement ps;
    private Connection connection;
    private String sql;

    public ResultSetIterator(Connection connection, String sql) {
        assert connection != null;
        assert sql != null;
        this.connection = connection;
        this.sql = sql;
    }

    public void init() {
        try {
            ps = connection.prepareStatement(sql);
            rs = ps.executeQuery();

        } catch (SQLException e) {
            close();
            throw new DataAccessException(e);
        }
    }

    @Override
    public boolean hasNext() {
        if (ps == null) {
            init();
        }
        try {
            boolean hasMore = rs.next();
            if (!hasMore) {
                close();
            }
            return hasMore;
        } catch (SQLException e) {
            close();
            throw new DataAccessException(e);
        }

    }

    private void close() {
        try {
            rs.close();
            try {
                ps.close();
            } catch (SQLException e) {
                //nothing we can do here
            }
        } catch (SQLException e) {
            //nothing we can do here
        }
    }

    @Override
    public Tuple next() {
        try {
            return SQL.rowAsTuple(sql, rs);
        } catch (DataAccessException e) {
            close();
            throw e;
        }
    }
}

and then:

public static Stream stream(final Connection connection, 
                                       final String sql, 
                                       final Object... parms) {
  return StreamSupport
                .stream(Spliterators.spliteratorUnknownSize(
                        new ResultSetIterator(connection, sql), 0), false);
}

Solution 4 - Java

Here is the simplest sample by abacus-jdbc.

final DataSource ds = JdbcUtil.createDataSource(url, user, password);
final SQLExecutor sqlExecutor = new SQLExecutor(ds);
sqlExecutor.stream(sql, parameters).filter(...).map(...).collect(...) // lazy execution&loading and auto-close Statement/Connection

Or:

JdbcUtil.prepareQuery(ds, sql).filter(...).map(...).collect(...)  // lazy execution&loading and auto-close Statement/Connection

This is totally lazy loading and auto-closure. The records will loaded from db by fetch size (default if not specified) and the Statement and Connection will automatically closed after the result/records are collected.

Disclosure: I'm the developer of AbacusUtil.

Solution 5 - Java

Using my library it would be done like this:

attach maven dependency:

<dependency>
    <groupId>com.github.buckelieg</groupId>
    <artifactId>db-fn</artifactId>
    <version>0.3.4</version>
</dependency>

use library in code:

Function<Stream<I>, O> processor = stream -> //process input stream
try (DB db = new DB("jdbc:postgresql://host:port/database?user=user&password=pass")) {
    processor.apply(
        db.select("SELECT * FROM my_table t1 JOIN my_table t2 ON t1.id = t2.id")
          .fetchSize(5000)
          .execute(rs -> /*ResultSet mapper*/)
    );
}

See more here

Solution 6 - Java

Some common module called Tools of a Ujorm framework offers a simple solution using the RowIterator class. Example of use:

    PreparedStatement ps = dbConnection.prepareStatement("SELECT * FROM myTable");
    new RowIterator(ps).toStream().forEach((RsConsumer)(resultSet) -> {
        int value = resultSet.getInt(1);
    });

Maven dependency on the Tools library (50KB):

    <dependency>
        <groupId>org.ujorm</groupId>
        <artifactId>ujo-tools</artifactId>
        <version>1.93</version>
    </dependency>

See jUnit test for more information.

Solution 7 - Java

I just did the summary to provide the real example about how to stream ResultSet and do the simple SQL query without using 3rd click here for detail

> Blockquote: Java 8 provided the Stream family and easy operation of it. The way of pipeline usage made the code clear and smart. However, ResultSet is still go with very legacy way to process. Per actual ResultSet usage, it is really helpful if converted as Stream.

.... StreamUtils.uncheckedConsumer is required to convert the the SQLException to runtimeException to make the Lamda clear.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionIuriiView Question on Stackoverflow
Solution 1 - JavaHolgerView Answer on Stackoverflow
Solution 2 - JavaLukas EderView Answer on Stackoverflow
Solution 3 - JavaNir AlfasiView Answer on Stackoverflow
Solution 4 - Javauser_3380739View Answer on Stackoverflow
Solution 5 - JavaAnatolyView Answer on Stackoverflow
Solution 6 - JavapopView Answer on Stackoverflow
Solution 7 - Javauser2276550View Answer on Stackoverflow