Copy a stream to avoid "stream has already been operated upon or closed"

JavaLambdaJava 8Java Stream

Java Problem Overview


I'd like to duplicate a Java 8 stream so that I can deal with it twice. I can collect as a list and get new streams from that;

// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff

But I kind of think there should be a more efficient/elegant way.

Is there a way to copy the stream without turning it into a collection?

I'm actually working with a stream of Eithers, so want to process the left projection one way before moving onto the right projection and dealing with that another way. Kind of like this (which, so far, I'm forced to use the toList trick with).

List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());
    
Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );
    
Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );

Java Solutions


Solution 1 - Java

I think your assumption about efficiency is kind of backwards. You get this huge efficiency payback if you're only going to use the data once, because you don't have to store it, and streams give you powerful "loop fusion" optimizations that let you flow the whole data efficiently through the pipeline.

If you want to re-use the same data, then by definition you either have to generate it twice (deterministically) or store it. If it already happens to be in a collection, great; then iterating it twice is cheap.

We did experiment in the design with "forked streams". What we found was that supporting this had real costs; it burdened the common case (use once) at the expense of the uncommon case. The big problem was dealing with "what happens when the two pipelines don't consume data at the same rate." Now you're back to buffering anyway. This was a feature that clearly didn't carry its weight.

If you want to operate on the same data repeatedly, either store it, or structure your operations as Consumers and do the following:

stream()...stuff....forEach(e -> { consumerA(e); consumerB(e); });

You might also look into the RxJava library, as its processing model lends itself better to this kind of "stream forking".

Solution 2 - Java

You can use a local variable with a Supplier to set up common parts of the stream pipeline.

From http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/:

> Reusing Streams > > Java 8 streams cannot be reused. As soon as you call any terminal operation the stream is closed: > > Stream stream = Stream.of("d2", "a2", "b1", "b3", "c") > .filter(s -> s.startsWith("a")); > stream.anyMatch(s -> true); // ok > stream.noneMatch(s -> true); // exception > > Calling noneMatch after anyMatch on the same stream results in the following exception: > java.lang.IllegalStateException: stream has already been operated upon or closed > at > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) > at > java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) > at com.winterbe.java8.Streams5.test7(Streams5.java:38) > at com.winterbe.java8.Streams5.main(Streams5.java:28) > > To overcome this limitation we have to to create a new stream chain for every terminal operation we want to execute, e.g. we could create a stream supplier to construct a new stream with all intermediate operations already set up: > > Supplier> streamSupplier = > () -> Stream.of("d2", "a2", "b1", "b3", "c") > .filter(s -> s.startsWith("a")); > > streamSupplier.get().anyMatch(s -> true); // ok > streamSupplier.get().noneMatch(s -> true); // ok > > Each call to get() constructs a new stream on which we are save to call the desired terminal operation.

Solution 3 - Java

Use a Supplier to produce the stream for each termination operation.

Supplier<Stream<Integer>> streamSupplier = () -> list.stream();

Whenever you need a stream of that collection, use streamSupplier.get() to get a new stream.

Examples:

  1. streamSupplier.get().anyMatch(predicate);
  2. streamSupplier.get().allMatch(predicate2);

Solution 4 - Java

We've implemented a duplicate() method for streams in jOOλ, an Open Source library that we created to improve integration testing for jOOQ. Essentially, you can just write:

Tuple2<Seq<A>, Seq<A>> duplicates = Seq.seq(doSomething()).duplicate();

Internally, there is a buffer storing all values that have been consumed from one stream but not from the other. That's probably as efficient as it gets if your two streams are consumed about at the same rate, and if you can live with the lack of thread-safety.

Here's how the algorithm works:

static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
    final List<T> gap = new LinkedList<>();
    final Iterator<T> it = stream.iterator();

    @SuppressWarnings("unchecked")
    final Iterator<T>[] ahead = new Iterator[] { null };

    class Duplicate implements Iterator<T> {
        @Override
        public boolean hasNext() {
            if (ahead[0] == null || ahead[0] == this)
                return it.hasNext();

            return !gap.isEmpty();
        }

        @Override
        public T next() {
            if (ahead[0] == null)
                ahead[0] = this;

            if (ahead[0] == this) {
                T value = it.next();
                gap.offer(value);
                return value;
            }

            return gap.poll();
        }
    }

    return tuple(seq(new Duplicate()), seq(new Duplicate()));
}

More source code here

Tuple2 is probably like your Pair type, whereas Seq is Stream with some enhancements.

Solution 5 - Java

You could create a stream of runnables (for example):

results.stream()
    .flatMap(either -> Stream.<Runnable> of(
            () -> failure(either.left()),
            () -> success(either.right())))
    .forEach(Runnable::run);

Where failure and success are the operations to apply. This will however create quite a few temporary objects and may not be more efficient than starting from a collection and streaming/iterating it twice.

Solution 6 - Java

Another way to handle the elements multiple times is to use Stream.peek(Consumer):

doSomething().stream()
.peek(either -> handleFailure(either.left()))
.foreach(either -> handleSuccess(either.right()));

peek(Consumer) can be chained as many times as needed.

doSomething().stream()
.peek(element -> handleFoo(element.foo()))
.peek(element -> handleBar(element.bar()))
.peek(element -> handleBaz(element.baz()))
.foreach(element-> handleQux(element.qux()));

Solution 7 - Java

cyclops-react, a library I contribute to, has a static method that will allow you duplicate a Stream (and returns a jOOλ Tuple of Streams).

    Stream<Integer> stream = Stream.of(1,2,3);
	Tuple2<Stream<Integer>,Stream<Integer>> streams =  StreamUtils.duplicate(stream);

See comments, there is performance penalty that will be incurred when using duplicate on an existing Stream. A more performant alternative would be to use Streamable :-

There is also a (lazy) Streamable class that can be constructed from a Stream, Iterable or Array and replayed multiple times.

    Streamable<Integer> streamable = Streamable.of(1,2,3);
	streamable.stream().forEach(System.out::println);
	streamable.stream().forEach(System.out::println);

AsStreamable.synchronizedFromStream(stream) - can be used to create a Streamable that will lazily populate it's backing collection, in a way such that can be shared across threads. Streamable.fromStream(stream) will not incur any synchronization overhead.

Solution 8 - Java

For this particular problem you can use also partitioning. Something like

     // Partition Eighters into left and right
     List<Either<Pair<A, Throwable>, A>> results = doSomething();
     Map<Boolean, Object> passingFailing = results.collect(Collectors.partitioningBy(s -> s.isLeft()));
     passingFailing.get(true) <- here will be all passing (left values)
     passingFailing.get(false) <- here will be all failing (right values)

Solution 9 - Java

We can make use of Stream Builder at the time of reading or iterating a stream. Here's the document of Stream Builder.

https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.Builder.html

Use case

Let's say we have employee stream and we need to use this stream to write employee data in excel file and then update the employee collection/table [This is just use case to show the use of Stream Builder]:

Stream.Builder<Employee> builder = Stream.builder();

employee.forEach( emp -> {
   //store employee data to excel file 
   // and use the same object to build the stream.
   builder.add(emp);
});

//Now this stream can be used to update the employee collection
Stream<Employee> newStream = builder.build();

Solution 10 - Java

I had a similar problem, and could think of three different intermediate structures from which to create a copy of the stream: a List, an array and a Stream.Builder. I wrote a little benchmark program, which suggested that from a performance point of view the List was about 30% slower than the other two which were fairly similar.

The only drawback of converting to an array is that it is tricky if your element type is a generic type (which in my case it was); therefore I prefer to use a Stream.Builder.

I ended up writing a little function that creates a Collector:

private static <T> Collector<T, Stream.Builder<T>, Stream<T>> copyCollector()
{
    return Collector.of(Stream::builder, Stream.Builder::add, (b1, b2) -> {
        b2.build().forEach(b1);
        return b1;
    }, Stream.Builder::build);
}

I can then make a copy of any stream str by doing str.collect(copyCollector()) which feels quite in keeping with the idiomatic usage of streams.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionTobyView Question on Stackoverflow
Solution 1 - JavaBrian GoetzView Answer on Stackoverflow
Solution 2 - Javauser4975679View Answer on Stackoverflow
Solution 3 - JavaRamsView Answer on Stackoverflow
Solution 4 - JavaLukas EderView Answer on Stackoverflow
Solution 5 - JavaassyliasView Answer on Stackoverflow
Solution 6 - JavaMartinView Answer on Stackoverflow
Solution 7 - JavaJohn McCleanView Answer on Stackoverflow
Solution 8 - JavaLubomir VargaView Answer on Stackoverflow
Solution 9 - JavaLokesh SingalView Answer on Stackoverflow
Solution 10 - JavaJeremy HicksView Answer on Stackoverflow