Can you split a stream into two streams?

JavaJava 8Java Stream

Java Problem Overview


I have a data set represented by a Java 8 stream:

Stream<T> stream = ...;

I can see how to filter it to get a random subset - for example

Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();   
Stream<T> heads = stream.filter((x) -> (coin.nextInt() == 0));

I can also see how I could reduce this stream to get, for example, two lists representing two random halves of the data set, and then turn those back into streams. But, is there a direct way to generate two streams from the initial one? Something like

(heads, tails) = stream.[some kind of split based on filter]

Thanks for any insight.

Java Solutions


Solution 1 - Java

A collector can be used for this.

  • For two categories, use Collectors.partitioningBy() factory.

This will create a Map<Boolean, List>, and put items in one or the other list based on a Predicate.

Note: Since the stream needs to be consumed whole, this can't work on infinite streams. And because the stream is consumed anyway, this method simply puts them in Lists instead of making a new stream-with-memory. You can always stream those lists if you require streams as output.

Also, no need for the iterator, not even in the heads-only example you provided.

  • Binary splitting looks like this:

Random r = new Random();

Map<Boolean, List<String>> groups = stream
	.collect(Collectors.partitioningBy(x -> r.nextBoolean()));

System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
  • For more categories, use a Collectors.groupingBy() factory.

Map<Object, List<String>> groups = stream
	.collect(Collectors.groupingBy(x -> r.nextInt(3)));
System.out.println(groups.get(0).size());
System.out.println(groups.get(1).size());
System.out.println(groups.get(2).size());

In case the streams are not Stream, but one of the primitive streams like IntStream, then this .collect(Collectors) method is not available. You'll have to do it the manual way without a collector factory. It's implementation looks like this:

[Example 2.0 since 2020-04-16]

	IntStream    intStream = IntStream.iterate(0, i -> i + 1).limit(100000).parallel();
	IntPredicate predicate = ignored -> r.nextBoolean();

	Map<Boolean, List<Integer>> groups = intStream.collect(
			() -> Map.of(false, new ArrayList<>(100000),
		                 true , new ArrayList<>(100000)),
			(map, value) -> map.get(predicate.test(value)).add(value),
			(map1, map2) -> {
				map1.get(false).addAll(map2.get(false));
				map1.get(true ).addAll(map2.get(true ));
			});

In this example I initialize the ArrayLists with the full size of the initial collection (if this is known at all). This prevents resize events even in the worst-case scenario, but can potentially gobble up 2NT space (N = initial number of elements, T = number of threads). To trade-off space for speed, you can leave it out or use your best educated guess, like the expected highest number of elements in one partition (typically just over N/2 for a balanced split).

I hope I don't offend anyone by using a Java 9 method. For the Java 8 version, look at the edit history.

Solution 2 - Java

I stumbled across this question to my self and I feel that a forked stream has some use cases that could prove valid. I wrote the code below as a consumer so that it does not do anything but you could apply it to functions and anything else you might come across.

class PredicateSplitterConsumer<T> implements Consumer<T>
{
  private Predicate<T> predicate;
  private Consumer<T>  positiveConsumer;
  private Consumer<T>  negativeConsumer;

  public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative)
  {
    this.predicate = predicate;
    this.positiveConsumer = positive;
    this.negativeConsumer = negative;
  }
  
  @Override
  public void accept(T t)
  {
    if (predicate.test(t))
    {
      positiveConsumer.accept(t);
    }
    else
    {
      negativeConsumer.accept(t);
    }
  }
}

Now your code implementation could be something like this:

personsArray.forEach(
        new PredicateSplitterConsumer<>(
            person -> person.getDateOfBirth().isPresent(),
            person -> System.out.println(person.getName()),
            person -> System.out.println(person.getName() + " does not have Date of birth")));

Solution 3 - Java

Unfortunately, what you ask for is directly frowned upon in the JavaDoc of Stream:

> A stream should be operated on (invoking an intermediate or terminal > stream operation) only once. This rules out, for example, "forked" > streams, where the same source feeds two or more pipelines, or > multiple traversals of the same stream.

You can work around this using peek or other methods should you truly desire that type of behaviour. In this case, what you should do is instead of trying to back two streams from the same original Stream source with a forking filter, you would duplicate your stream and filter each of the duplicates appropriately.

However, you may wish to reconsider if a Stream is the appropriate structure for your use case.

Solution 4 - Java

Not exactly. You can't get two Streams out of one; this doesn't make sense -- how would you iterate over one without needing to generate the other at the same time? A stream can only be operated over once.

However, if you want to dump them into a list or something, you could do

stream.forEach((x) -> ((x == 0) ? heads : tails).add(x));

Solution 5 - Java

This is against the general mechanism of Stream. Say you can split Stream S0 to Sa and Sb like you wanted. Performing any terminal operation, say count(), on Sa will necessarily "consume" all elements in S0. Therefore Sb lost its data source.

Previously, Stream had a tee() method, I think, which duplicate a stream to two. It's removed now.

Stream has a peek() method though, you might be able to use it to achieve your requirements.

Solution 6 - Java

not exactly, but you may be able to accomplish what you need by invoking Collectors.groupingBy(). you create a new Collection, and can then instantiate streams on that new collection.

Solution 7 - Java

You can get two Streams out of one
since Java 12 with teeing
counting heads and tails in 100 coin flips

Random r = new Random();
PrimitiveIterator.OfInt coin = r.ints(0, 2).iterator();
List<Long> list = Stream.iterate(0, i -> coin.nextInt())
    .limit(100).collect(teeing(
        filtering(i -> i == 1, counting()),
        filtering(i -> i == 0, counting()),
        (heads, tails) -> {
          return(List.of(heads, tails));
        }));
System.err.println("heads:" + list.get(0) + " tails:" + list.get(1));

gets eg.: heads:51 tails:49

Solution 8 - Java

This was the least bad answer I could come up with.

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

public class Test {

	public static <T, L, R> Pair<L, R> splitStream(Stream<T> inputStream, Predicate<T> predicate,
			Function<Stream<T>, L> trueStreamProcessor, Function<Stream<T>, R> falseStreamProcessor) {

		Map<Boolean, List<T>> partitioned = inputStream.collect(Collectors.partitioningBy(predicate));
		L trueResult = trueStreamProcessor.apply(partitioned.get(Boolean.TRUE).stream());
		R falseResult = falseStreamProcessor.apply(partitioned.get(Boolean.FALSE).stream());

		return new ImmutablePair<L, R>(trueResult, falseResult);
	}

	public static void main(String[] args) {

		Stream<Integer> stream = Stream.iterate(0, n -> n + 1).limit(10);

		Pair<List<Integer>, String> results = splitStream(stream,
				n -> n > 5,
				s -> s.filter(n -> n % 2 == 0).collect(Collectors.toList()),
				s -> s.map(n -> n.toString()).collect(Collectors.joining("|")));

		System.out.println(results);
	}

}

This takes a stream of integers and splits them at 5. For those greater than 5 it filters only even numbers and puts them in a list. For the rest it joins them with |.

outputs:

 ([6, 8],0|1|2|3|4|5)

Its not ideal as it collects everything into intermediary collections breaking the stream (and has too many arguments!)

Solution 9 - Java

I stumbled across this question while looking for a way to filter certain elements out of a stream and log them as errors. So I did not really need to split the stream so much as attach a premature terminating action to a predicate with unobtrusive syntax. This is what I came up with:

public class MyProcess {
    /* Return a Predicate that performs a bail-out action on non-matching items. */
    private static <T> Predicate<T> withAltAction(Predicate<T> pred, Consumer<T> altAction) {
	return x -> {
		if (pred.test(x)) {
			return true;
		}
		altAction.accept(x);
		return false;
	};

    /* Example usage in non-trivial pipeline */
    public void processItems(Stream<Item> stream) {
        stream.filter(Objects::nonNull)
              .peek(this::logItem)
              .map(Item::getSubItems)
              .filter(withAltAction(SubItem::isValid,
                                    i -> logError(i, "Invalid")))
              .peek(this::logSubItem)
              .filter(withAltAction(i -> i.size() > 10,
                                    i -> logError(i, "Too large")))
              .map(SubItem::toDisplayItem)
              .forEach(this::display);
    }
}

Solution 10 - Java

Shorter version that uses Lombok

import java.util.function.Consumer;
import java.util.function.Predicate;

import lombok.RequiredArgsConstructor;

/**
 * Forks a Stream using a Predicate into postive and negative outcomes.
 */
@RequiredArgsConstructor
@FieldDefaults(makeFinal = true, level = AccessLevel.PROTECTED)
public class StreamForkerUtil<T> implements Consumer<T> {
    Predicate<T> predicate;
    Consumer<T> positiveConsumer;
    Consumer<T> negativeConsumer;

    @Override
    public void accept(T t) {
        (predicate.test(t) ? positiveConsumer : negativeConsumer).accept(t);
    }
}

Solution 11 - Java

How about:

Supplier<Stream<Integer>> randomIntsStreamSupplier =
    () -> (new Random()).ints(0, 2).boxed();

Stream<Integer> tails =
    randomIntsStreamSupplier.get().filter(x->x.equals(0));
Stream<Integer> heads =
    randomIntsStreamSupplier.get().filter(x->x.equals(1));

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
Questionuser1148758View Question on Stackoverflow
Solution 1 - JavaMark JeronimusView Answer on Stackoverflow
Solution 2 - JavaLudgerView Answer on Stackoverflow
Solution 3 - JavaTrevor FreemanView Answer on Stackoverflow
Solution 4 - JavaLouis WassermanView Answer on Stackoverflow
Solution 5 - JavaZhongYuView Answer on Stackoverflow
Solution 6 - JavaaepurnietView Answer on Stackoverflow
Solution 7 - JavaKaplanView Answer on Stackoverflow
Solution 8 - JavaIan JonesView Answer on Stackoverflow
Solution 9 - JavaSebastian HansView Answer on Stackoverflow
Solution 10 - JavaOneCricketeerView Answer on Stackoverflow
Solution 11 - JavaMatthewView Answer on Stackoverflow