Copy a stream to avoid "stream has already been operated upon or closed"
JavaLambdaJava 8Java StreamJava Problem Overview
I'd like to duplicate a Java 8 stream so that I can deal with it twice. I can collect
as a list and get new streams from that;
// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff
But I kind of think there should be a more efficient/elegant way.
Is there a way to copy the stream without turning it into a collection?
I'm actually working with a stream of Either
s, so want to process the left projection one way before moving onto the right projection and dealing with that another way. Kind of like this (which, so far, I'm forced to use the toList
trick with).
List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());
Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );
Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );
Java Solutions
Solution 1 - Java
I think your assumption about efficiency is kind of backwards. You get this huge efficiency payback if you're only going to use the data once, because you don't have to store it, and streams give you powerful "loop fusion" optimizations that let you flow the whole data efficiently through the pipeline.
If you want to re-use the same data, then by definition you either have to generate it twice (deterministically) or store it. If it already happens to be in a collection, great; then iterating it twice is cheap.
We did experiment in the design with "forked streams". What we found was that supporting this had real costs; it burdened the common case (use once) at the expense of the uncommon case. The big problem was dealing with "what happens when the two pipelines don't consume data at the same rate." Now you're back to buffering anyway. This was a feature that clearly didn't carry its weight.
If you want to operate on the same data repeatedly, either store it, or structure your operations as Consumers and do the following:
stream()...stuff....forEach(e -> { consumerA(e); consumerB(e); });
You might also look into the RxJava library, as its processing model lends itself better to this kind of "stream forking".
Solution 2 - Java
You can use a local variable with a Supplier
to set up common parts of the stream pipeline.
From http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/:
> Reusing Streams
>
> Java 8 streams cannot be reused. As soon as you call any terminal operation the stream is closed:
>
> StreamnoneMatch
after anyMatch
on the same stream results in the following exception:
> java.lang.IllegalStateException: stream has already been operated upon or closed
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
> at
> java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
> at com.winterbe.java8.Streams5.test7(Streams5.java:38)
> at com.winterbe.java8.Streams5.main(Streams5.java:28)
>
> To overcome this limitation we have to to create a new stream chain for every terminal operation we want to execute, e.g. we could create a stream supplier to construct a new stream with all intermediate operations already set up:
>
> Supplierget()
constructs a new stream on which we are save to call the desired terminal operation.
Solution 3 - Java
Use a Supplier
to produce the stream for each termination operation.
Supplier<Stream<Integer>> streamSupplier = () -> list.stream();
Whenever you need a stream of that collection,
use streamSupplier.get()
to get a new stream.
Examples:
streamSupplier.get().anyMatch(predicate);
streamSupplier.get().allMatch(predicate2);
Solution 4 - Java
We've implemented a duplicate()
method for streams in jOOλ, an Open Source library that we created to improve integration testing for jOOQ. Essentially, you can just write:
Tuple2<Seq<A>, Seq<A>> duplicates = Seq.seq(doSomething()).duplicate();
Internally, there is a buffer storing all values that have been consumed from one stream but not from the other. That's probably as efficient as it gets if your two streams are consumed about at the same rate, and if you can live with the lack of thread-safety.
Here's how the algorithm works:
static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
final List<T> gap = new LinkedList<>();
final Iterator<T> it = stream.iterator();
@SuppressWarnings("unchecked")
final Iterator<T>[] ahead = new Iterator[] { null };
class Duplicate implements Iterator<T> {
@Override
public boolean hasNext() {
if (ahead[0] == null || ahead[0] == this)
return it.hasNext();
return !gap.isEmpty();
}
@Override
public T next() {
if (ahead[0] == null)
ahead[0] = this;
if (ahead[0] == this) {
T value = it.next();
gap.offer(value);
return value;
}
return gap.poll();
}
}
return tuple(seq(new Duplicate()), seq(new Duplicate()));
}
Tuple2
is probably like your Pair
type, whereas Seq
is Stream
with some enhancements.
Solution 5 - Java
You could create a stream of runnables (for example):
results.stream()
.flatMap(either -> Stream.<Runnable> of(
() -> failure(either.left()),
() -> success(either.right())))
.forEach(Runnable::run);
Where failure
and success
are the operations to apply. This will however create quite a few temporary objects and may not be more efficient than starting from a collection and streaming/iterating it twice.
Solution 6 - Java
Another way to handle the elements multiple times is to use Stream.peek(Consumer):
doSomething().stream()
.peek(either -> handleFailure(either.left()))
.foreach(either -> handleSuccess(either.right()));
peek(Consumer)
can be chained as many times as needed.
doSomething().stream()
.peek(element -> handleFoo(element.foo()))
.peek(element -> handleBar(element.bar()))
.peek(element -> handleBaz(element.baz()))
.foreach(element-> handleQux(element.qux()));
Solution 7 - Java
cyclops-react, a library I contribute to, has a static method that will allow you duplicate a Stream (and returns a jOOλ Tuple of Streams).
Stream<Integer> stream = Stream.of(1,2,3);
Tuple2<Stream<Integer>,Stream<Integer>> streams = StreamUtils.duplicate(stream);
See comments, there is performance penalty that will be incurred when using duplicate on an existing Stream. A more performant alternative would be to use Streamable :-
There is also a (lazy) Streamable class that can be constructed from a Stream, Iterable or Array and replayed multiple times.
Streamable<Integer> streamable = Streamable.of(1,2,3);
streamable.stream().forEach(System.out::println);
streamable.stream().forEach(System.out::println);
AsStreamable.synchronizedFromStream(stream) - can be used to create a Streamable that will lazily populate it's backing collection, in a way such that can be shared across threads. Streamable.fromStream(stream) will not incur any synchronization overhead.
Solution 8 - Java
For this particular problem you can use also partitioning. Something like
// Partition Eighters into left and right
List<Either<Pair<A, Throwable>, A>> results = doSomething();
Map<Boolean, Object> passingFailing = results.collect(Collectors.partitioningBy(s -> s.isLeft()));
passingFailing.get(true) <- here will be all passing (left values)
passingFailing.get(false) <- here will be all failing (right values)
Solution 9 - Java
We can make use of Stream Builder at the time of reading or iterating a stream. Here's the document of Stream Builder.
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.Builder.html
Use case
Let's say we have employee stream and we need to use this stream to write employee data in excel file and then update the employee collection/table [This is just use case to show the use of Stream Builder]:
Stream.Builder<Employee> builder = Stream.builder();
employee.forEach( emp -> {
//store employee data to excel file
// and use the same object to build the stream.
builder.add(emp);
});
//Now this stream can be used to update the employee collection
Stream<Employee> newStream = builder.build();
Solution 10 - Java
I had a similar problem, and could think of three different intermediate structures from which to create a copy of the stream: a List
, an array and a Stream.Builder
. I wrote a little benchmark program, which suggested that from a performance point of view the List
was about 30% slower than the other two which were fairly similar.
The only drawback of converting to an array is that it is tricky if your element type is a generic type (which in my case it was); therefore I prefer to use a Stream.Builder
.
I ended up writing a little function that creates a Collector
:
private static <T> Collector<T, Stream.Builder<T>, Stream<T>> copyCollector()
{
return Collector.of(Stream::builder, Stream.Builder::add, (b1, b2) -> {
b2.build().forEach(b1);
return b1;
}, Stream.Builder::build);
}
I can then make a copy of any stream str
by doing str.collect(copyCollector())
which feels quite in keeping with the idiomatic usage of streams.